merge from trunk r1613787

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1613788 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Charles Lamb 2014-07-27 13:32:51 +00:00
commit 1d3e9ec935
70 changed files with 1388 additions and 254 deletions

View File

@ -503,6 +503,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-8943. Support multiple group mapping providers. (Kai Zheng via brandonli)
HADOOP-9361 Strictly define the expected behavior of filesystem APIs and
write tests to verify compliance (stevel)
IMPROVEMENTS
HADOOP-10451. Remove unused field and imports from SaslRpcServer.
@ -597,9 +600,6 @@ Release 2.5.0 - UNRELEASED
HADOOP-10747. Support configurable retries on SASL connection failures in
RPC client. (cnauroth)
HADOOP-10674. Improve PureJavaCrc32 performance and use java.util.zip.CRC32
for Java 7 and above. (szetszwo)
HADOOP-10754. Reenable several HA ZooKeeper-related tests on Windows.
(cnauroth)
@ -611,9 +611,6 @@ Release 2.5.0 - UNRELEASED
HADOOP-10767. Clean up unused code in Ls shell command. (cnauroth)
HADOOP-9361 Strictly define the expected behavior of filesystem APIs and
write tests to verify compliance (stevel)
HADOOP-9651 Filesystems to throw FileAlreadyExistsException in
createFile(path, overwrite=false) when the file exists (stevel)
@ -624,8 +621,14 @@ Release 2.5.0 - UNRELEASED
HADOOP-10782. Fix typo in DataChecksum class. (Jingguo Yao via suresh)
HADOOP-10896. Update compatibility doc to capture visibility of
un-annotated classes/ methods. (kasha)
OPTIMIZATIONS
HADOOP-10674. Improve PureJavaCrc32 performance and use java.util.zip.CRC32
for Java 7 and above. (szetszwo)
BUG FIXES
HADOOP-10378. Typo in help printed by hdfs dfs -help.
@ -780,27 +783,6 @@ Release 2.5.0 - UNRELEASED
HADOOP-10801 dead link in site.xml (Akira AJISAKA via stevel)
BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS
HADOOP-10520. Extended attributes definition and FileSystem APIs for
extended attributes. (Yi Liu via wang)
HADOOP-10546. Javadoc and other small fixes for extended attributes in
hadoop-common. (Charles Lamb via wang)
HADOOP-10521. FsShell commands for extended attributes. (Yi Liu via wang)
HADOOP-10548. Improve FsShell xattr error handling and other fixes. (Charles Lamb via umamahesh)
HADOOP-10567. Shift XAttr value encoding code out for reuse. (Yi Liu via umamahesh)
HADOOP-10621. Remove CRLF for xattr value base64 encoding for better display.(Yi Liu via umamahesh)
HADOOP-10575. Small fixes for XAttrCommands and test. (Yi Liu via umamahesh)
HADOOP-10561. Copy command with preserve option should handle Xattrs.
(Yi Liu via cnauroth)
HADOOP-10590. ServiceAuthorizationManager is not threadsafe. (Benoy Antony via vinayakumarb)
HADOOP-10711. Cleanup some extra dependencies from hadoop-auth. (rkanter via tucu)
@ -825,6 +807,27 @@ Release 2.5.0 - UNRELEASED
HADOOP-10894. Fix dead link in ToolRunner documentation. (Akira Ajisaka
via Arpit Agarwal)
BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS
HADOOP-10520. Extended attributes definition and FileSystem APIs for
extended attributes. (Yi Liu via wang)
HADOOP-10546. Javadoc and other small fixes for extended attributes in
hadoop-common. (Charles Lamb via wang)
HADOOP-10521. FsShell commands for extended attributes. (Yi Liu via wang)
HADOOP-10548. Improve FsShell xattr error handling and other fixes. (Charles Lamb via umamahesh)
HADOOP-10567. Shift XAttr value encoding code out for reuse. (Yi Liu via umamahesh)
HADOOP-10621. Remove CRLF for xattr value base64 encoding for better display.(Yi Liu via umamahesh)
HADOOP-10575. Small fixes for XAttrCommands and test. (Yi Liu via umamahesh)
HADOOP-10561. Copy command with preserve option should handle Xattrs.
(Yi Liu via cnauroth)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -72,9 +72,12 @@ Apache Hadoop Compatibility
* Private-Stable APIs can change across major releases,
but not within a major release.
* Classes not annotated are implicitly "Private". Class members not
annotated inherit the annotations of the enclosing class.
* Note: APIs generated from the proto files need to be compatible for
rolling-upgrades. See the section on wire-compatibility for more details. The
compatibility policies for APIs and wire-communication need to go
rolling-upgrades. See the section on wire-compatibility for more details.
The compatibility policies for APIs and wire-communication need to go
hand-in-hand to address this.
** Semantic compatibility

View File

@ -317,6 +317,13 @@ Release 2.6.0 - UNRELEASED
HDFS-6701. Make seed optional in NetworkTopology#sortByDistance.
(Ashwin Shankar via wang)
HDFS-6755. There is an unnecessary sleep in the code path where
DFSOutputStream#close gives up its attempt to contact the namenode
(mitdesai21 via cmccabe)
HDFS-6750. The DataNode should use its shared memory segment to mark
short-circuit replicas that have been unlinked as stale (cmccabe)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -370,6 +377,15 @@ Release 2.6.0 - UNRELEASED
HDFS-6715. Webhdfs wont fail over when it gets java.io.IOException: Namenode
is in startup mode. (jing9)
HDFS-5919. FileJournalManager doesn't purge empty and corrupt inprogress edits
files (vinayakumarb)
HDFS-6752. Avoid Address bind errors in TestDatanodeConfig#testMemlockLimit
(vinayakumarb)
HDFS-6749. FSNamesystem methods should call resolvePath.
(Charles Lamb via cnauroth)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -392,6 +408,15 @@ Release 2.5.0 - UNRELEASED
HDFS-6406. Add capability for NFS gateway to reject connections from
unprivileged ports. (atm)
HDFS-2006. Ability to support storing extended attributes per file.
HDFS-5978. Create a tool to take fsimage and expose read-only WebHDFS API.
(Akira Ajisaka via wheat9)
HDFS-6278. Create HTML5-based UI for SNN. (wheat9)
HDFS-6279. Create new index page for JN / DN. (wheat9)
IMPROVEMENTS
HDFS-6007. Update documentation about short-circuit local reads (iwasakims
@ -409,9 +434,6 @@ Release 2.5.0 - UNRELEASED
HDFS-6158. Clean up dead code for OfflineImageViewer. (wheat9)
HDFS-5978. Create a tool to take fsimage and expose read-only WebHDFS API.
(Akira Ajisaka via wheat9)
HDFS-6164. Remove lsr in OfflineImageViewer. (wheat9)
HDFS-6167. Relocate the non-public API classes in the hdfs.client package.
@ -439,10 +461,6 @@ Release 2.5.0 - UNRELEASED
HDFS-6265. Prepare HDFS codebase for JUnit 4.11. (cnauroth)
HDFS-6278. Create HTML5-based UI for SNN. (wheat9)
HDFS-6279. Create new index page for JN / DN. (wheat9)
HDFS-5693. Few NN metrics data points were collected via JMX when NN
is under heavy load. (Ming Ma via jing9)
@ -814,9 +832,6 @@ Release 2.5.0 - UNRELEASED
HDFS-6464. Support multiple xattr.name parameters for WebHDFS getXAttrs.
(Yi Liu via umamahesh)
HDFS-6375. Listing extended attributes with the search permission.
(Charles Lamb via wang)
HDFS-6539. test_native_mini_dfs is skipped in hadoop-hdfs/pom.xml
(decstery via cmccabe)
@ -911,6 +926,18 @@ Release 2.5.0 - UNRELEASED
HDFS-6696. Name node cannot start if the path of a file under
construction contains ".snapshot". (wang)
HDFS-6312. WebHdfs HA failover is broken on secure clusters.
(daryn via tucu)
HDFS-6618. FSNamesystem#delete drops the FSN lock between removing INodes
from the tree and deleting them from the inode map (kihwal via cmccabe)
HDFS-6622. Rename and AddBlock may race and produce invalid edits (kihwal
via cmccabe)
HDFS-6723. New NN webUI no longer displays decommissioned state for dead node.
(Ming Ma via wheat9)
BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh)
@ -980,18 +1007,6 @@ Release 2.5.0 - UNRELEASED
HDFS-6492. Support create-time xattrs and atomically setting multiple
xattrs. (wang)
HDFS-6312. WebHdfs HA failover is broken on secure clusters.
(daryn via tucu)
HDFS-6618. FSNamesystem#delete drops the FSN lock between removing INodes
from the tree and deleting them from the inode map (kihwal via cmccabe)
HDFS-6622. Rename and AddBlock may race and produce invalid edits (kihwal
via cmccabe)
HDFS-6723. New NN webUI no longer displays decommissioned state for dead node.
(Ming Ma via wheat9)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -2135,12 +2135,12 @@ public class DFSOutputStream extends FSOutputSummer
throw new IOException(msg);
}
try {
Thread.sleep(localTimeout);
if (retries == 0) {
throw new IOException("Unable to close file because the last block"
+ " does not have enough number of replicas.");
}
retries--;
Thread.sleep(localTimeout);
localTimeout *= 2;
if (Time.now() - localstart > 5000) {
DFSClient.LOG.info("Could not complete " + src + " retrying...");

View File

@ -74,7 +74,7 @@ import com.google.common.collect.HashMultimap;
* DN also marks the block's slots as "unanchorable" to prevent additional
* clients from initiating these operations in the future.
*
* The counterpart fo this class on the client is {@link DfsClientShmManager}.
* The counterpart of this class on the client is {@link DfsClientShmManager}.
*/
public class ShortCircuitRegistry {
public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
@ -218,6 +218,31 @@ public class ShortCircuitRegistry {
return allowMunlock;
}
/**
* Invalidate any slot associated with a blockId that we are invalidating
* (deleting) from this DataNode. When a slot is invalid, the DFSClient will
* not use the corresponding replica for new read or mmap operations (although
* existing, ongoing read or mmap operations will complete.)
*
* @param blockId The block ID.
*/
public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
if (!enabled) return;
final Set<Slot> affectedSlots = slots.get(blockId);
if (!affectedSlots.isEmpty()) {
final StringBuilder bld = new StringBuilder();
String prefix = "";
bld.append("Block ").append(blockId).append(" has been invalidated. ").
append("Marking short-circuit slots as invalid: ");
for (Slot slot : affectedSlots) {
slot.makeInvalid();
bld.append(prefix).append(slot.toString());
prefix = ", ";
}
LOG.info(bld.toString());
}
}
public static class NewShmInfo implements Closeable {
public final ShmId shmId;
public final FileInputStream stream;

View File

@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@ -1232,8 +1233,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
volumeMap.remove(bpid, invalidBlks[i]);
}
// If a DFSClient has the replica in its cache of short-circuit file
// descriptors (and the client is using ShortCircuitShm), invalidate it.
datanode.getShortCircuitRegistry().processBlockInvalidation(
new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
// Delete the block asynchronously to make sure we can do it fast enough.
// It's ok to unlink the block file before the uncache operation
// finishes.

View File

@ -3928,8 +3928,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
StandbyException, IOException {
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkTraverse(pc, src);
@ -8391,9 +8393,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
nnConf.checkAclsConfigFlag();
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
checkOperation(OperationCategory.READ);
src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, null);
}
@ -8639,8 +8643,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.READ);
@ -8684,8 +8690,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
nnConf.checkXAttrsConfigFlag();
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
/* To access xattr names, you need EXECUTE in the owning directory. */

View File

@ -71,6 +71,8 @@ public class FileJournalManager implements JournalManager {
NameNodeFile.EDITS.getName() + "_(\\d+)-(\\d+)");
private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
private static final Pattern EDITS_INPROGRESS_STALE_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+).*(\\S+)");
private File currentInProgress = null;
@ -162,8 +164,7 @@ public class FileJournalManager implements JournalManager {
throws IOException {
LOG.info("Purging logs older than " + minTxIdToKeep);
File[] files = FileUtil.listFiles(sd.getCurrentDir());
List<EditLogFile> editLogs =
FileJournalManager.matchEditLogs(files);
List<EditLogFile> editLogs = matchEditLogs(files, true);
for (EditLogFile log : editLogs) {
if (log.getFirstTxId() < minTxIdToKeep &&
log.getLastTxId() < minTxIdToKeep) {
@ -246,6 +247,11 @@ public class FileJournalManager implements JournalManager {
}
static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
return matchEditLogs(filesInStorage, false);
}
private static List<EditLogFile> matchEditLogs(File[] filesInStorage,
boolean forPurging) {
List<EditLogFile> ret = Lists.newArrayList();
for (File f : filesInStorage) {
String name = f.getName();
@ -256,6 +262,7 @@ public class FileJournalManager implements JournalManager {
long startTxId = Long.parseLong(editsMatch.group(1));
long endTxId = Long.parseLong(editsMatch.group(2));
ret.add(new EditLogFile(f, startTxId, endTxId));
continue;
} catch (NumberFormatException nfe) {
LOG.error("Edits file " + f + " has improperly formatted " +
"transaction ID");
@ -270,12 +277,30 @@ public class FileJournalManager implements JournalManager {
long startTxId = Long.parseLong(inProgressEditsMatch.group(1));
ret.add(
new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
continue;
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
// skip
}
}
if (forPurging) {
// Check for in-progress stale edits
Matcher staleInprogressEditsMatch = EDITS_INPROGRESS_STALE_REGEX
.matcher(name);
if (staleInprogressEditsMatch.matches()) {
try {
long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1));
ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID,
true));
continue;
} catch (NumberFormatException nfe) {
LOG.error("In-progress stale edits file " + f + " has improperly "
+ "formatted transaction ID");
// skip
}
}
}
}
return ret;
}

View File

@ -32,11 +32,16 @@ import com.google.common.base.Preconditions;
* DfsClientShm is a subclass of ShortCircuitShm which is used by the
* DfsClient.
* When the UNIX domain socket associated with this shared memory segment
* closes unexpectedly, we mark the slots inside this segment as stale.
* ShortCircuitReplica objects that contain stale slots are themselves stale,
* closes unexpectedly, we mark the slots inside this segment as disconnected.
* ShortCircuitReplica objects that contain disconnected slots are stale,
* and will not be used to service new reads or mmap operations.
* However, in-progress read or mmap operations will continue to proceed.
* Once the last slot is deallocated, the segment can be safely munmapped.
*
* Slots may also become stale because the associated replica has been deleted
* on the DataNode. In this case, the DataNode will clear the 'valid' bit.
* The client will then see these slots as stale (see
* #{ShortCircuitReplica#isStale}).
*/
public class DfsClientShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
@ -58,7 +63,7 @@ public class DfsClientShm extends ShortCircuitShm
*
* {@link DfsClientShm#handle} sets this to true.
*/
private boolean stale = false;
private boolean disconnected = false;
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
DomainPeer peer) throws IOException {
@ -76,14 +81,14 @@ public class DfsClientShm extends ShortCircuitShm
}
/**
* Determine if the shared memory segment is stale.
* Determine if the shared memory segment is disconnected from the DataNode.
*
* This must be called with the DfsClientShmManager lock held.
*
* @return True if the shared memory segment is stale.
*/
public synchronized boolean isStale() {
return stale;
public synchronized boolean isDisconnected() {
return disconnected;
}
/**
@ -97,8 +102,8 @@ public class DfsClientShm extends ShortCircuitShm
public boolean handle(DomainSocket sock) {
manager.unregisterShm(getShmId());
synchronized (this) {
Preconditions.checkState(!stale);
stale = true;
Preconditions.checkState(!disconnected);
disconnected = true;
boolean hadSlots = false;
for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
Slot slot = iter.next();

View File

@ -271,12 +271,12 @@ public class DfsClientShmManager implements Closeable {
loading = false;
finishedLoading.signalAll();
}
if (shm.isStale()) {
if (shm.isDisconnected()) {
// If the peer closed immediately after the shared memory segment
// was created, the DomainSocketWatcher callback might already have
// fired and marked the shm as stale. In this case, we obviously
// don't want to add the SharedMemorySegment to our list of valid
// not-full segments.
// fired and marked the shm as disconnected. In this case, we
// obviously don't want to add the SharedMemorySegment to our list
// of valid not-full segments.
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": the UNIX domain socket associated with " +
"this short-circuit memory closed before we could make " +
@ -299,7 +299,7 @@ public class DfsClientShmManager implements Closeable {
void freeSlot(Slot slot) {
DfsClientShm shm = (DfsClientShm)slot.getShm();
shm.unregisterSlot(slot.getSlotIdx());
if (shm.isStale()) {
if (shm.isDisconnected()) {
// Stale shared memory segments should not be tracked here.
Preconditions.checkState(!full.containsKey(shm.getShmId()));
Preconditions.checkState(!notFull.containsKey(shm.getShmId()));

View File

@ -306,6 +306,13 @@ public class ShortCircuitShm {
(slotAddress - baseAddress) / BYTES_PER_SLOT);
}
/**
* Clear the slot.
*/
void clear() {
unsafe.putLongVolatile(null, this.slotAddress, 0);
}
private boolean isSet(long flag) {
long prev = unsafe.getLongVolatile(null, this.slotAddress);
return (prev & flag) != 0;
@ -535,6 +542,7 @@ public class ShortCircuitShm {
}
allocatedSlots.set(idx, true);
Slot slot = new Slot(calculateSlotAddress(idx), blockId);
slot.clear();
slot.makeValid();
slots[idx] = slot;
if (LOG.isTraceEnabled()) {
@ -583,7 +591,7 @@ public class ShortCircuitShm {
Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
if (!slot.isValid()) {
throw new InvalidRequestException(this + ": slot " + slotIdx +
" has not been allocated.");
" is not marked as valid.");
}
slots[slotIdx] = slot;
allocatedSlots.set(slotIdx, true);

View File

@ -53,6 +53,8 @@ public class TestDatanodeConfig {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY, 0);
conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "localhost:0");
conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "localhost:0");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
cluster.waitActive();
}

View File

@ -521,6 +521,7 @@ public class TestINodeFile {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
@ -569,6 +570,19 @@ public class TestINodeFile {
assertEquals(testFileBlockSize,
nnRpc.getPreferredBlockSize(testFileInodePath.toString()));
/*
* HDFS-6749 added missing calls to FSDirectory.resolvePath in the
* following four methods. The calls below ensure that
* /.reserved/.inodes paths work properly. No need to check return
* values as these methods are tested elsewhere.
*/
{
fs.isFileClosed(testFileInodePath);
fs.getAclStatus(testFileInodePath);
fs.getXAttrs(testFileInodePath);
fs.listXAttrs(testFileInodePath);
}
// symbolic link related tests
// Reserved path is not allowed as a target

View File

@ -212,18 +212,25 @@ public class TestNNStorageRetentionManager {
tc.addImage("/foo1/current/" + getImageFileName(300), false);
tc.addImage("/foo1/current/" + getImageFileName(400), false);
// Segments containing txns upto txId 250 are extra and should be purged.
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
// Without lowering the max segments to retain, we'd retain all segments
// going back to txid 150 (300 - 150).
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(176) + ".empty",
true);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(176, 200), true);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(201, 225), true);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(226) + ".corrupt",
true);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(226, 240), true);
// Only retain 2 extra segments. The 301-350 and 351-400 segments are
// considered required, not extra.
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(301) + ".empty",
false);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(301, 350), false);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(351) + ".corrupt",
false);
tc.addLog("/foo2/current/" + getFinalizedEditsFileName(351, 400), false);
tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
runTest(tc);

View File

@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY;
import static org.hamcrest.CoreMatchers.equalTo;
import java.io.DataOutputStream;
@ -30,7 +31,9 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableBoolean;
@ -462,6 +465,7 @@ public class TestShortCircuitCache {
}
}, 10, 60000);
cluster.shutdown();
sockDir.close();
}
@Test(timeout=60000)
@ -516,4 +520,98 @@ public class TestShortCircuitCache {
});
cluster.shutdown();
}
/**
* Test unlinking a file whose blocks we are caching in the DFSClient.
* The DataNode will notify the DFSClient that the replica is stale via the
* ShortCircuitShm.
*/
@Test(timeout=60000)
public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testUnlinkingReplicasInFileDescriptorCache", sockDir);
// We don't want the CacheCleaner to time out short-circuit shared memory
// segments during the test, so set the timeout really high.
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache();
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
// The ClientShmManager starts off empty.
Assert.assertEquals(0, info.size());
}
});
final Path TEST_PATH = new Path("/test_file");
final int TEST_FILE_LEN = 8193;
final int SEED = 0xFADE0;
DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LEN,
(short)1, SEED);
byte contents[] = DFSTestUtil.readFileBuffer(fs, TEST_PATH);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
// Loading this file brought the ShortCircuitReplica into our local
// replica cache.
final DatanodeInfo datanode =
new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId());
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
throws IOException {
Assert.assertTrue(info.get(datanode).full.isEmpty());
Assert.assertFalse(info.get(datanode).disabled);
Assert.assertEquals(1, info.get(datanode).notFull.values().size());
DfsClientShm shm =
info.get(datanode).notFull.values().iterator().next();
Assert.assertFalse(shm.isDisconnected());
}
});
// Remove the file whose blocks we just read.
fs.delete(TEST_PATH, false);
// Wait for the replica to be purged from the DFSClient's cache.
GenericTestUtils.waitFor(new Supplier<Boolean>() {
MutableBoolean done = new MutableBoolean(true);
@Override
public Boolean get() {
try {
done.setValue(true);
cache.getDfsClientShmManager().visit(new Visitor() {
@Override
public void visit(HashMap<DatanodeInfo,
PerDatanodeVisitorInfo> info) throws IOException {
Assert.assertTrue(info.get(datanode).full.isEmpty());
Assert.assertFalse(info.get(datanode).disabled);
Assert.assertEquals(1,
info.get(datanode).notFull.values().size());
DfsClientShm shm = info.get(datanode).notFull.values().
iterator().next();
// Check that all slots have been invalidated.
for (Iterator<Slot> iter = shm.slotIterator();
iter.hasNext(); ) {
Slot slot = iter.next();
if (slot.isValid()) {
done.setValue(false);
}
}
}
});
} catch (IOException e) {
LOG.error("error running visitor", e);
}
return done.booleanValue();
}
}, 10, 60000);
cluster.shutdown();
sockDir.close();
}
}

View File

@ -325,6 +325,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly
assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
MAPREDUCE-6002. Made MR task avoid reporting error to AM when the task process
is shutting down. (Wangda Tan via zjshen)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
@ -57,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -406,7 +408,9 @@ public class LocalContainerLauncher extends AbstractService implements
} catch (FSError e) {
LOG.fatal("FSError from child", e);
// umbilical: MRAppMaster creates (taskAttemptListener), passes to us
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fsError(classicAttemptID, e.getMessage());
}
throw new RuntimeException();
} catch (Exception exception) {
@ -429,11 +433,13 @@ public class LocalContainerLauncher extends AbstractService implements
} catch (Throwable throwable) {
LOG.fatal("Error running local (uberized) 'child' : "
+ StringUtils.stringifyException(throwable));
if (!ShutdownHookManager.get().isShutdownInProgress()) {
Throwable tCause = throwable.getCause();
String cause = (tCause == null)
? throwable.getMessage()
: StringUtils.stringifyException(tCause);
String cause =
(tCause == null) ? throwable.getMessage() : StringUtils
.stringifyException(tCause);
umbilical.fatalError(classicAttemptID, cause);
}
throw new RuntimeException();
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@ -176,7 +177,9 @@ class YarnChild {
});
} catch (FSError e) {
LOG.fatal("FSError from child", e);
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fsError(taskid, e.getMessage());
}
} catch (Exception exception) {
LOG.warn("Exception running child : "
+ StringUtils.stringifyException(exception));
@ -201,18 +204,23 @@ class YarnChild {
}
// Report back any failures, for diagnostic purposes
if (taskid != null) {
umbilical.fatalError(taskid, StringUtils.stringifyException(exception));
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fatalError(taskid,
StringUtils.stringifyException(exception));
}
}
} catch (Throwable throwable) {
LOG.fatal("Error running child : "
+ StringUtils.stringifyException(throwable));
if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) {
Throwable tCause = throwable.getCause();
String cause = tCause == null
? throwable.getMessage()
: StringUtils.stringifyException(tCause);
String cause =
tCause == null ? throwable.getMessage() : StringUtils
.stringifyException(tCause);
umbilical.fatalError(taskid, cause);
}
}
} finally {
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
@ -322,6 +323,11 @@ abstract public class Task implements Writable, Configurable {
protected void reportFatalError(TaskAttemptID id, Throwable throwable,
String logMsg) {
LOG.fatal(logMsg);
if (ShutdownHookManager.get().isShutdownInProgress()) {
return;
}
Throwable tCause = throwable.getCause();
String cause = tCause == null
? StringUtils.stringifyException(throwable)

View File

@ -21,6 +21,8 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectWriter;
@ -42,6 +44,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@Private
@Unstable
public class RumenToSLSConverter {
private static final String EOL = System.getProperty("line.separator");

View File

@ -32,6 +32,8 @@ import java.util.Iterator;
import java.util.Random;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob;
@ -66,6 +68,8 @@ import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.ObjectMapper;
@Private
@Unstable
public class SLSRunner {
// RM, Runner
private ResourceManager rm;

View File

@ -29,6 +29,8 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -61,6 +63,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
import org.apache.log4j.Logger;
@ -70,6 +74,8 @@ import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
@Private
@Unstable
public abstract class AMSimulator extends TaskRunner.Task {
// resource manager
protected ResourceManager rm;
@ -129,8 +135,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
* register with RM
*/
@Override
public void firstStep()
throws YarnException, IOException, InterruptedException {
public void firstStep() throws Exception {
simulateStartTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS();
@ -145,8 +150,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
}
@Override
public void middleStep()
throws InterruptedException, YarnException, IOException {
public void middleStep() throws Exception {
// process responses in the queue
processResponseQueue();
@ -158,7 +162,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
}
@Override
public void lastStep() {
public void lastStep() throws Exception {
LOG.info(MessageFormat.format("Application {0} is shutting down.", appId));
// unregister tracking
if (isTracked) {
@ -169,11 +173,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
.newRecordInstance(FinishApplicationMasterRequest.class);
finishAMRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
try {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
Token<AMRMTokenIdentifier> token =
rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps().get(appId)
.getRMAppAttempt(appAttemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@ -184,11 +186,6 @@ public abstract class AMSimulator extends TaskRunner.Task {
return null;
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
simulateFinishTimeMS = System.currentTimeMillis() -
SLSRunner.getRunner().getStartTimeMS();
@ -226,11 +223,9 @@ public abstract class AMSimulator extends TaskRunner.Task {
return createAllocateRequest(ask, new ArrayList<ContainerId>());
}
protected abstract void processResponseQueue()
throws InterruptedException, YarnException, IOException;
protected abstract void processResponseQueue() throws Exception;
protected abstract void sendContainerRequest()
throws YarnException, IOException, InterruptedException;
protected abstract void sendContainerRequest() throws Exception;
protected abstract void checkStop();
@ -276,11 +271,18 @@ public abstract class AMSimulator extends TaskRunner.Task {
// waiting until application ACCEPTED
RMApp app = rm.getRMContext().getRMApps().get(appId);
while(app.getState() != RMAppState.ACCEPTED) {
Thread.sleep(50);
Thread.sleep(10);
}
appAttemptId = rm.getRMContext().getRMApps().get(appId)
// Waiting until application attempt reach LAUNCHED
// "Unmanaged AM must register after AM attempt reaches LAUNCHED state"
this.appAttemptId = rm.getRMContext().getRMApps().get(appId)
.getCurrentAppAttempt().getAppAttemptId();
RMAppAttempt rmAppAttempt = rm.getRMContext().getRMApps().get(appId)
.getCurrentAppAttempt();
while (rmAppAttempt.getAppAttemptState() != RMAppAttemptState.LAUNCHED) {
Thread.sleep(10);
}
}
private void registerAM()
@ -294,8 +296,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
Token<AMRMTokenIdentifier> token =
rm.getRMContext().getRMApps().get(appAttemptId.getApplicationId())
Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps().get(appId)
.getRMAppAttempt(appAttemptId).getAMRMToken();
ugi.addTokenIdentifier(token.decodeIdentifier());

View File

@ -27,6 +27,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -45,6 +47,8 @@ import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.log4j.Logger;
@Private
@Unstable
public class MRAMSimulator extends AMSimulator {
/*
Vocabulary Used:
@ -141,8 +145,7 @@ public class MRAMSimulator extends AMSimulator {
}
@Override
public void firstStep()
throws YarnException, IOException, InterruptedException {
public void firstStep() throws Exception {
super.firstStep();
requestAMContainer();
@ -386,7 +389,7 @@ public class MRAMSimulator extends AMSimulator {
}
@Override
public void lastStep() {
public void lastStep() throws Exception {
super.lastStep();
// clear data structures

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.sls.conf;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@Private
@Unstable
public class SLSConfiguration {
// sls
public static final String PREFIX = "yarn.sls.";

View File

@ -27,6 +27,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -54,6 +57,8 @@ import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
@Private
@Unstable
public class NMSimulator extends TaskRunner.Task {
// node resource
private RMNode node;
@ -103,12 +108,12 @@ public class NMSimulator extends TaskRunner.Task {
}
@Override
public void firstStep() throws YarnException, IOException {
public void firstStep() {
// do nothing
}
@Override
public void middleStep() {
public void middleStep() throws Exception {
// we check the lifetime for each running containers
ContainerSimulator cs = null;
synchronized(completedContainerList) {
@ -132,7 +137,6 @@ public class NMSimulator extends TaskRunner.Task {
ns.setResponseId(RESPONSE_ID ++);
ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
beatRequest.setNodeStatus(ns);
try {
NodeHeartbeatResponse beatResponse =
rm.getResourceTrackerService().nodeHeartbeat(beatRequest);
if (! beatResponse.getContainersToCleanup().isEmpty()) {
@ -159,11 +163,6 @@ public class NMSimulator extends TaskRunner.Task {
if (beatResponse.getNodeAction() == NodeAction.SHUTDOWN) {
lastStep();
}
} catch (YarnException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
@ -258,4 +257,19 @@ public class NMSimulator extends TaskRunner.Task {
completedContainerList.add(containerId);
}
}
@VisibleForTesting
Map<ContainerId, ContainerSimulator> getRunningContainers() {
return runningContainers;
}
@VisibleForTesting
List<ContainerId> getAMContainers() {
return amContainerList;
}
@VisibleForTesting
List<ContainerId> getCompletedContainers() {
return completedContainerList;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.sls.nodemanager;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -36,6 +38,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
.UpdatedContainerInfo;
@Private
@Unstable
public class NodeInfo {
private static int NODE_ID = 0;
@ -43,6 +47,8 @@ public class NodeInfo {
return NodeId.newInstance(host, port);
}
@Private
@Unstable
private static class FakeRMNodeImpl implements RMNode {
private NodeId nodeId;
private String hostName;

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@Private
@Unstable
public class CapacitySchedulerMetrics extends SchedulerMetrics {
public CapacitySchedulerMetrics() {

View File

@ -21,9 +21,13 @@ package org.apache.hadoop.yarn.sls.scheduler;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
@Private
@Unstable
public class ContainerSimulator implements Delayed {
// id
private ContainerId id;

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.AppSchedulable;
@ -28,6 +30,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
import com.codahale.metrics.Gauge;
import org.apache.hadoop.yarn.sls.SLSRunner;
@Private
@Unstable
public class FairSchedulerMetrics extends SchedulerMetrics {
private int totalMemoryMB = Integer.MAX_VALUE;

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
.FifoScheduler;
import com.codahale.metrics.Gauge;
@Private
@Unstable
public class FifoSchedulerMetrics extends SchedulerMetrics {
public FifoSchedulerMetrics() {

View File

@ -18,9 +18,13 @@
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.NodeUpdateSchedulerEvent;
@Private
@Unstable
public class NodeUpdateSchedulerEventWrapper extends NodeUpdateSchedulerEvent {
public NodeUpdateSchedulerEventWrapper(NodeUpdateSchedulerEvent event) {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -33,6 +35,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode
import java.util.Collections;
import java.util.List;
@Private
@Unstable
public class RMNodeWrapper implements RMNode {
private RMNode node;
private List<UpdatedContainerInfo> updates;

View File

@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
@ -92,13 +94,14 @@ import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;
@Private
@Unstable
final public class ResourceSchedulerWrapper
extends AbstractYarnScheduler<SchedulerApplicationAttempt, SchedulerNode>
implements SchedulerWrapper, ResourceScheduler, Configurable {
private static final String EOL = System.getProperty("line.separator");
private static final int SAMPLING_SIZE = 60;
private ScheduledExecutorService pool;
private RMContext rmContext;
// counters for scheduler allocate/handle operations
private Counter schedulerAllocateCounter;
private Counter schedulerHandleCounter;
@ -720,7 +723,7 @@ final public class ResourceSchedulerWrapper
public void addAMRuntime(ApplicationId appId,
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS) {
if (metricsON) {
try {
// write job runtime information
StringBuilder sb = new StringBuilder();
@ -733,6 +736,7 @@ final public class ResourceSchedulerWrapper
e.printStackTrace();
}
}
}
private void updateQueueMetrics(String queue,
int releasedMemory, int releasedVCores) {
@ -916,4 +920,17 @@ final public class ResourceSchedulerWrapper
public Resource getClusterResource() {
return null;
}
@Override
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
return new ArrayList<Container>();
}
@Override
public Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>>
getSchedulerApplications() {
return new HashMap<ApplicationId,
SchedulerApplication<SchedulerApplicationAttempt>>();
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.sls.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
@ -100,6 +102,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Private
@Unstable
public class SLSCapacityScheduler extends CapacityScheduler implements
SchedulerWrapper,Configurable {
private static final String EOL = System.getProperty("line.separator");
@ -725,6 +729,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
long traceStartTimeMS, long traceEndTimeMS,
long simulateStartTimeMS, long simulateEndTimeMS) {
if (metricsON) {
try {
// write job runtime information
StringBuilder sb = new StringBuilder();
@ -737,6 +742,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
e.printStackTrace();
}
}
}
private void updateQueueMetrics(String queue,
int releasedMemory, int releasedVCores) {

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.sls.scheduler;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceScheduler;
@ -30,6 +32,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
@Private
@Unstable
public abstract class SchedulerMetrics {
protected ResourceScheduler scheduler;
protected Set<String> trackedQueues;

View File

@ -19,11 +19,15 @@ package org.apache.hadoop.yarn.sls.scheduler;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.codahale.metrics.MetricRegistry;
@Private
@Unstable
public interface SchedulerWrapper {
public MetricRegistry getMetrics();

View File

@ -25,9 +25,15 @@ import java.util.concurrent.Delayed;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.exceptions.YarnException;
@Private
@Unstable
public class TaskRunner {
@Private
@Unstable
public abstract static class Task implements Runnable, Delayed {
private long start;
private long end;
@ -93,12 +99,10 @@ public class TaskRunner {
} else {
lastStep();
}
} catch (YarnException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
Thread.getDefaultUncaughtExceptionHandler()
.uncaughtException(Thread.currentThread(), e);
}
}
@ -118,13 +122,11 @@ public class TaskRunner {
}
public abstract void firstStep()
throws YarnException, IOException, InterruptedException;
public abstract void firstStep() throws Exception;
public abstract void middleStep()
throws YarnException, InterruptedException, IOException;
public abstract void middleStep() throws Exception;
public abstract void lastStep() throws YarnException;
public abstract void lastStep() throws Exception;
public void setEndTime(long et) {
endTime = et;

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.yarn.sls.utils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.JobTraceReader;
@ -36,6 +38,8 @@ import java.util.Map;
import java.util.List;
import java.util.Iterator;
@Private
@Unstable
public class SLSUtils {
public static String[] getRackHostName(String hostname) {

View File

@ -30,6 +30,8 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
.SchedulerEventType;
import org.mortbay.jetty.Handler;
@ -49,6 +51,8 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import org.mortbay.jetty.handler.ResourceHandler;
@Private
@Unstable
public class SLSWebApp extends HttpServlet {
private static final long serialVersionUID = 1905162041950251407L;
private transient Server server;

View File

@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.sls;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
public class TestSLSRunner {
@ -30,6 +33,15 @@ public class TestSLSRunner {
@SuppressWarnings("all")
public void testSimulatorRunning() throws Exception {
File tempDir = new File("target", UUID.randomUUID().toString());
final List<Throwable> exceptionList =
Collections.synchronizedList(new ArrayList<Throwable>());
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
exceptionList.add(e);
}
});
// start the simulator
File slsOutputDir = new File(tempDir.getAbsolutePath() + "/slsoutput/");
@ -38,8 +50,20 @@ public class TestSLSRunner {
"-output", slsOutputDir.getAbsolutePath()};
SLSRunner.main(args);
// wait for 45 seconds before stop
Thread.sleep(45 * 1000);
// wait for 20 seconds before stop
int count = 20;
while (count >= 0) {
Thread.sleep(1000);
if (! exceptionList.isEmpty()) {
SLSRunner.getRunner().stop();
Assert.fail("TestSLSRunner catched exception from child thread " +
"(TaskRunner.Task): " + exceptionList.get(0).getMessage());
break;
}
count--;
}
SLSRunner.getRunner().stop();
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.appmaster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestAMSimulator {
private ResourceManager rm;
private YarnConfiguration conf;
@Before
public void setup() {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
conf.set(SLSConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false);
rm = new ResourceManager();
rm.init(conf);
rm.start();
}
class MockAMSimulator extends AMSimulator {
@Override
protected void processResponseQueue()
throws InterruptedException, YarnException, IOException {
}
@Override
protected void sendContainerRequest()
throws YarnException, IOException, InterruptedException {
}
@Override
protected void checkStop() {
}
}
@Test
public void testAMSimulator() throws Exception {
// Register one app
MockAMSimulator app = new MockAMSimulator();
List<ContainerSimulator> containers = new ArrayList<ContainerSimulator>();
app.init(1, 1000, containers, rm, null, 0, 1000000l, "user1", "default",
false, "app1");
app.firstStep();
Assert.assertEquals(1, rm.getRMContext().getRMApps().size());
Assert.assertNotNull(rm.getRMContext().getRMApps().get(app.appId));
// Finish this app
app.lastStep();
}
@After
public void tearDown() {
rm.stop();
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.nodemanager;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestNMSimulator {
private final int GB = 1024;
private ResourceManager rm;
private YarnConfiguration conf;
@Before
public void setup() {
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper");
conf.set(SLSConfiguration.RM_SCHEDULER,
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
conf.setBoolean(SLSConfiguration.METRICS_SWITCH, false);
rm = new ResourceManager();
rm.init(conf);
rm.start();
}
@Test
public void testNMSimulator() throws Exception {
// Register one node
NMSimulator node1 = new NMSimulator();
node1.init("rack1/node1", GB * 10, 10, 0, 1000, rm);
node1.middleStep();
Assert.assertEquals(1, rm.getResourceScheduler().getNumClusterNodes());
Assert.assertEquals(GB * 10,
rm.getResourceScheduler().getRootQueueMetrics().getAvailableMB());
Assert.assertEquals(10,
rm.getResourceScheduler().getRootQueueMetrics()
.getAvailableVirtualCores());
// Allocate one container on node1
ContainerId cId1 = newContainerId(1, 1, 1);
Container container1 = Container.newInstance(cId1, null, null,
Resources.createResource(GB, 1), null, null);
node1.addNewContainer(container1, 100000l);
Assert.assertTrue("Node1 should have one running container.",
node1.getRunningContainers().containsKey(cId1));
// Allocate one AM container on node1
ContainerId cId2 = newContainerId(2, 1, 1);
Container container2 = Container.newInstance(cId2, null, null,
Resources.createResource(GB, 1), null, null);
node1.addNewContainer(container2, -1l);
Assert.assertTrue("Node1 should have one running AM container",
node1.getAMContainers().contains(cId2));
// Remove containers
node1.cleanupContainer(cId1);
Assert.assertTrue("Container1 should be removed from Node1.",
node1.getCompletedContainers().contains(cId1));
node1.cleanupContainer(cId2);
Assert.assertFalse("Container2 should be removed from Node1.",
node1.getAMContainers().contains(cId2));
}
private ContainerId newContainerId(int appId, int appAttemptId, int cId) {
return BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(System.currentTimeMillis(), appId),
appAttemptId), cId);
}
@After
public void tearDown() throws Exception {
rm.stop();
}
}

View File

@ -65,6 +65,12 @@ Release 2.6.0 - UNRELEASED
YARN-1342. Recover container tokens upon nodemanager restart. (Jason Lowe via
devaraj)
YARN-2214. FairScheduler: preemptContainerPreCheck() in FSParentQueue delays
convergence towards fairness. (Ashwin Shankar via kasha)
YARN-2211. Persist AMRMToken master key in RMStateStore for RM recovery.
(Xuan Gong via jianhe)
OPTIMIZATIONS
BUG FIXES
@ -100,6 +106,8 @@ Release 2.6.0 - UNRELEASED
YARN-2147. client lacks delegation token exception details when
application submit fails (Chen He via jlowe)
YARN-1796. container-executor shouldn't require o-r permissions (atm)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -436,6 +444,11 @@ Release 2.5.0 - UNRELEASED
YARN-2319. Made the MiniKdc instance start/close before/after the class of
TestRMWebServicesDelegationTokens. (Wenwu Peng via zjshen)
YARN-2335. Annotate all hadoop-sls APIs as @Private. (Wei Yan via kasha)
YARN-1726. ResourceSchedulerWrapper broken due to AbstractYarnScheduler.
(Wei Yan via kasha)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -267,6 +267,7 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes{
protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
boolean overrideRTS, boolean overrideApplicationMasterService)
throws Exception {
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster =
new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,

View File

@ -54,11 +54,9 @@ public class TestApplicationMasterServiceOnHA extends ProtocolHATestBase{
amClient = ClientRMProxy
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
AMRMTokenIdentifier id =
new AMRMTokenIdentifier(attemptId);
Token<AMRMTokenIdentifier> appToken =
new Token<AMRMTokenIdentifier>(id, this.cluster.getResourceManager()
.getRMContext().getAMRMTokenSecretManager());
this.cluster.getResourceManager().getRMContext()
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
appToken.setService(new Text("appToken service"));
UserGroupInformation.setLoginUser(UserGroupInformation
.createRemoteUser(UserGroupInformation.getCurrentUser()

View File

@ -111,16 +111,16 @@ int check_executor_permissions(char *executable_file) {
return -1;
}
// check others do not have read/write/execute permissions
if ((filestat.st_mode & S_IROTH) == S_IROTH || (filestat.st_mode & S_IWOTH)
== S_IWOTH || (filestat.st_mode & S_IXOTH) == S_IXOTH) {
// check others do not have write/execute permissions
if ((filestat.st_mode & S_IWOTH) == S_IWOTH ||
(filestat.st_mode & S_IXOTH) == S_IXOTH) {
fprintf(LOGFILE,
"The container-executor binary should not have read or write or"
" execute for others.\n");
"The container-executor binary should not have write or execute "
"for others.\n");
return -1;
}
// Binary should be setuid/setgid executable
// Binary should be setuid executable
if ((filestat.st_mode & S_ISUID) == 0) {
fprintf(LOGFILE, "The container-executor binary should be set setuid.\n");
return -1;

View File

@ -244,6 +244,37 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../../hadoop-yarn-api/src/main/proto</param>
<param>${basedir}/../hadoop-yarn-server-common/src/main/proto</param>
<param>${basedir}/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>yarn_server_resourcemanager_recovery.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@ -60,7 +60,7 @@ public class RMSecretManagerService extends AbstractService {
clientToAMSecretManager = createClientToAMTokenSecretManager();
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf, this.rmContext);
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
rmDTSecretManager =
@ -115,8 +115,8 @@ public class RMSecretManagerService extends AbstractService {
}
protected AMRMTokenSecretManager createAMRMTokenSecretManager(
Configuration conf) {
return new AMRMTokenSecretManager(conf);
Configuration conf, RMContext rmContext) {
return new AMRMTokenSecretManager(conf, rmContext);
}
protected ClientToAMTokenSecretManagerInRM createClientToAMTokenSecretManager() {

View File

@ -1026,6 +1026,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
// recover RMdelegationTokenSecretManager
rmContext.getRMDelegationTokenSecretManager().recover(state);
// recover AMRMTokenSecretManager
rmContext.getAMRMTokenSecretManager().recover(state);
// recover applications
rmAppManager.recover(state);
}

View File

@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -43,16 +44,18 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
@ -76,6 +79,8 @@ public class FileSystemRMStateStore extends RMStateStore {
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
.newInstance(1, 1);
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
"AMRMTokenSecretManagerNode";
protected FileSystem fs;
@ -89,6 +94,7 @@ public class FileSystemRMStateStore extends RMStateStore {
@VisibleForTesting
Path fsWorkingPath;
Path amrmTokenSecretManagerRoot;
@Override
public synchronized void initInternal(Configuration conf)
throws Exception{
@ -96,6 +102,8 @@ public class FileSystemRMStateStore extends RMStateStore {
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
amrmTokenSecretManagerRoot =
new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
}
@Override
@ -113,6 +121,7 @@ public class FileSystemRMStateStore extends RMStateStore {
fs = fsWorkingPath.getFileSystem(conf);
fs.mkdirs(rmDTSecretManagerRoot);
fs.mkdirs(rmAppRoot);
fs.mkdirs(amrmTokenSecretManagerRoot);
}
@Override
@ -180,9 +189,32 @@ public class FileSystemRMStateStore extends RMStateStore {
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
// recover AMRMTokenSecretManager
loadAMRMTokenSecretManagerState(rmState);
return rmState;
}
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
Path amrmTokenSecretManagerStateDataDir =
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
FileStatus status;
try {
status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
assert status.isFile();
} catch (FileNotFoundException ex) {
return;
}
byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
AMRMTokenSecretManagerStatePBImpl stateData =
new AMRMTokenSecretManagerStatePBImpl(
AMRMTokenSecretManagerStateProto.parseFrom(data));
rmState.amrmTokenSecretManagerState =
AMRMTokenSecretManagerState.newInstance(
stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
}
private void loadRMAppState(RMState rmState) throws Exception {
try {
List<ApplicationAttemptState> attempts =
@ -597,4 +629,25 @@ public class FileSystemRMStateStore extends RMStateStore {
return new Path(root, nodeName);
}
@Override
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
boolean isUpdate){
Path nodeCreatePath =
getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
AMRMTokenSecretManagerState data =
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray();
try {
if (isUpdate) {
updateFile(nodeCreatePath, stateData);
} else {
writeFile(nodeCreatePath, stateData);
}
} catch (Exception ex) {
LOG.info("Error storing info for AMRMTokenSecretManager", ex);
notifyStoreOperationFailed(ex);
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
@ -72,6 +73,10 @@ public class MemoryRMStateStore extends RMStateStore {
state.rmSecretManagerState.getTokenState());
returnState.rmSecretManagerState.dtSequenceNumber =
state.rmSecretManagerState.dtSequenceNumber;
returnState.amrmTokenSecretManagerState =
state.amrmTokenSecretManagerState == null ? null
: AMRMTokenSecretManagerState
.newInstance(state.amrmTokenSecretManagerState);
return returnState;
}
@ -267,6 +272,16 @@ public class MemoryRMStateStore extends RMStateStore {
return null;
}
@Override
public void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
boolean isUpdate) {
if (amrmTokenSecretManagerState != null) {
state.amrmTokenSecretManagerState = AMRMTokenSecretManagerState
.newInstance(amrmTokenSecretManagerState);
}
}
@Override
public void deleteStore() throws Exception {
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
@ -138,6 +139,12 @@ public class NullRMStateStore extends RMStateStore {
return null;
}
@Override
public void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState state, boolean isUpdate) {
//DO Nothing
}
@Override
public void deleteStore() throws Exception {
// Do nothing

View File

@ -45,16 +45,14 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPB
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -85,6 +83,8 @@ public abstract class RMStateStore extends AbstractService {
protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
"RMDTSequenceNumber_";
protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT =
"AMRMTokenSecretManagerRoot";
protected static final String VERSION_NODE = "RMVersionNode";
protected static final String EPOCH_NODE = "EpochNode";
@ -412,6 +412,8 @@ public abstract class RMStateStore extends AbstractService {
RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
AMRMTokenSecretManagerState amrmTokenSecretManagerState = null;
public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState;
}
@ -419,6 +421,10 @@ public abstract class RMStateStore extends AbstractService {
public RMDTSecretManagerState getRMDTSecretManagerState() {
return rmSecretManagerState;
}
public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() {
return amrmTokenSecretManagerState;
}
}
private Dispatcher rmDispatcher;
@ -713,6 +719,14 @@ public abstract class RMStateStore extends AbstractService {
protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
throws Exception;
/**
* Blocking API Derived classes must implement this method to store or update
* the state of AMRMToken Master Key
*/
public abstract void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
boolean isUpdate);
/**
* Non-blocking API
* ResourceManager services call this to remove an application from the state

View File

@ -44,18 +44,19 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
@ -128,6 +129,9 @@ public class ZKRMStateStore extends RMStateStore {
* | |----- Key_1
* | |----- Key_2
* ....
* |--- AMRMTOKEN_SECRET_MANAGER_ROOT
* |----- currentMasterKey
* |----- nextMasterKey
*
*/
private String zkRootNodePath;
@ -136,6 +140,7 @@ public class ZKRMStateStore extends RMStateStore {
private String dtMasterKeysRootPath;
private String delegationTokensRootPath;
private String dtSequenceNumberPath;
private String amrmTokenSecretManagerRoot;
@VisibleForTesting
protected String znodeWorkingPath;
@ -255,6 +260,8 @@ public class ZKRMStateStore extends RMStateStore {
RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
amrmTokenSecretManagerRoot =
getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
}
@Override
@ -275,6 +282,7 @@ public class ZKRMStateStore extends RMStateStore {
createRootDir(dtMasterKeysRootPath);
createRootDir(delegationTokensRootPath);
createRootDir(dtSequenceNumberPath);
createRootDir(amrmTokenSecretManagerRoot);
}
private void createRootDir(final String rootPath) throws Exception {
@ -427,9 +435,27 @@ public class ZKRMStateStore extends RMStateStore {
loadRMDTSecretManagerState(rmState);
// recover RM applications
loadRMAppState(rmState);
// recover AMRMTokenSecretManager
loadAMRMTokenSecretManagerState(rmState);
return rmState;
}
private void loadAMRMTokenSecretManagerState(RMState rmState)
throws Exception {
byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
if (data == null) {
LOG.warn("There is no data saved");
return;
}
AMRMTokenSecretManagerStatePBImpl stateData =
new AMRMTokenSecretManagerStatePBImpl(
AMRMTokenSecretManagerStateProto.parseFrom(data));
rmState.amrmTokenSecretManagerState =
AMRMTokenSecretManagerState.newInstance(
stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
}
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
loadRMDelegationKeyState(rmState);
@ -1112,4 +1138,19 @@ public class ZKRMStateStore extends RMStateStore {
return zk;
}
@Override
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
boolean isUpdate) {
AMRMTokenSecretManagerState data =
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray();
try {
setDataWithRetries(amrmTokenSecretManagerRoot, stateData, -1);
} catch (Exception ex) {
LOG.info("Error storing info for AMRMTokenSecretManager", ex);
notifyStoreOperationFailed(ex);
}
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.util.Records;
/**
* Contains all the state data that needs to be stored persistently
* for {@link AMRMTokenSecretManager}
*/
@Public
@Unstable
public abstract class AMRMTokenSecretManagerState {
public static AMRMTokenSecretManagerState newInstance(
MasterKey currentMasterKey, MasterKey nextMasterKey) {
AMRMTokenSecretManagerState data =
Records.newRecord(AMRMTokenSecretManagerState.class);
data.setCurrentMasterKey(currentMasterKey);
data.setNextMasterKey(nextMasterKey);
return data;
}
public static AMRMTokenSecretManagerState newInstance(
AMRMTokenSecretManagerState state) {
AMRMTokenSecretManagerState data =
Records.newRecord(AMRMTokenSecretManagerState.class);
data.setCurrentMasterKey(state.getCurrentMasterKey());
data.setNextMasterKey(state.getNextMasterKey());
return data;
}
/**
* {@link AMRMTokenSecretManager} current Master key
*/
@Public
@Unstable
public abstract MasterKey getCurrentMasterKey();
@Public
@Unstable
public abstract void setCurrentMasterKey(MasterKey currentMasterKey);
/**
* {@link AMRMTokenSecretManager} next Master key
*/
@Public
@Unstable
public abstract MasterKey getNextMasterKey();
@Public
@Unstable
public abstract void setNextMasterKey(MasterKey nextMasterKey);
public abstract AMRMTokenSecretManagerStateProto getProto();
}

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
public class AMRMTokenSecretManagerStatePBImpl extends AMRMTokenSecretManagerState{
AMRMTokenSecretManagerStateProto proto =
AMRMTokenSecretManagerStateProto.getDefaultInstance();
AMRMTokenSecretManagerStateProto.Builder builder = null;
boolean viaProto = false;
private MasterKey currentMasterKey = null;
private MasterKey nextMasterKey = null;
public AMRMTokenSecretManagerStatePBImpl() {
builder = AMRMTokenSecretManagerStateProto.newBuilder();
}
public AMRMTokenSecretManagerStatePBImpl(AMRMTokenSecretManagerStateProto proto) {
this.proto = proto;
viaProto = true;
}
public AMRMTokenSecretManagerStateProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.currentMasterKey != null) {
builder.setCurrentMasterKey(convertToProtoFormat(this.currentMasterKey));
}
if (this.nextMasterKey != null) {
builder.setNextMasterKey(convertToProtoFormat(this.nextMasterKey));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = AMRMTokenSecretManagerStateProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public MasterKey getCurrentMasterKey() {
AMRMTokenSecretManagerStateProtoOrBuilder p = viaProto ? proto : builder;
if (this.currentMasterKey != null) {
return this.currentMasterKey;
}
if (!p.hasCurrentMasterKey()) {
return null;
}
this.currentMasterKey = convertFromProtoFormat(p.getCurrentMasterKey());
return this.currentMasterKey;
}
@Override
public void setCurrentMasterKey(MasterKey currentMasterKey) {
maybeInitBuilder();
if (currentMasterKey == null)
builder.clearCurrentMasterKey();
this.currentMasterKey = currentMasterKey;
}
@Override
public MasterKey getNextMasterKey() {
AMRMTokenSecretManagerStateProtoOrBuilder p = viaProto ? proto : builder;
if (this.nextMasterKey != null) {
return this.nextMasterKey;
}
if (!p.hasNextMasterKey()) {
return null;
}
this.nextMasterKey = convertFromProtoFormat(p.getNextMasterKey());
return this.nextMasterKey;
}
@Override
public void setNextMasterKey(MasterKey nextMasterKey) {
maybeInitBuilder();
if (nextMasterKey == null)
builder.clearNextMasterKey();
this.nextMasterKey = nextMasterKey;
}
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl) t).getProto();
}
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
return new MasterKeyPBImpl(p);
}
}

View File

@ -224,16 +224,17 @@ public class FSLeafQueue extends FSQueue {
@Override
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
if (LOG.isDebugEnabled()) {
LOG.debug("Queue " + getName() + " is going to preempt a container " +
"from its applications.");
}
// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
return toBePreempted;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Queue " + getName() + " is going to preempt a container " +
"from its applications.");
}
// Choose the app that is most over fair share
Comparator<Schedulable> comparator = policy.getComparator();
AppSchedulable candidateSched = null;
@ -328,4 +329,14 @@ public class FSLeafQueue extends FSQueue {
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
// TODO Auto-generated method stub
}
/**
* Helper method to check if the queue should preempt containers
*
* @return true if check passes (can preempt) or false otherwise
*/
private boolean preemptContainerPreCheck() {
return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
getFairShare());
}
}

View File

@ -164,11 +164,6 @@ public class FSParentQueue extends FSQueue {
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
return toBePreempted;
}
// Find the childQueue which is most over fair share
FSQueue candidateQueue = null;
Comparator<Schedulable> comparator = policy.getComparator();

View File

@ -187,17 +187,4 @@ public abstract class FSQueue extends Schedulable implements Queue {
}
return true;
}
/**
* Helper method to check if the queue should preempt containers
*
* @return true if check passes (can preempt) or false otherwise
*/
protected boolean preemptContainerPreCheck() {
if (this == scheduler.getQueueManager().getRootQueue()) {
return true;
}
return parent.getPolicy()
.checkIfUsageOverFairShare(getResourceUsage(), getFairShare());
}
}

View File

@ -38,6 +38,10 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import com.google.common.annotations.VisibleForTesting;
@ -66,6 +70,7 @@ public class AMRMTokenSecretManager extends
private final Timer timer;
private final long rollingInterval;
private final long activationDelay;
private RMContext rmContext;
private final Set<ApplicationAttemptId> appAttemptSet =
new HashSet<ApplicationAttemptId>();
@ -73,7 +78,8 @@ public class AMRMTokenSecretManager extends
/**
* Create an {@link AMRMTokenSecretManager}
*/
public AMRMTokenSecretManager(Configuration conf) {
public AMRMTokenSecretManager(Configuration conf, RMContext rmContext) {
this.rmContext = rmContext;
this.timer = new Timer();
this.rollingInterval =
conf
@ -98,6 +104,11 @@ public class AMRMTokenSecretManager extends
public void start() {
if (this.currentMasterKey == null) {
this.currentMasterKey = createNewMasterKey();
AMRMTokenSecretManagerState state =
AMRMTokenSecretManagerState.newInstance(
this.currentMasterKey.getMasterKey(), null);
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
false);
}
this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
rollingInterval);
@ -130,6 +141,12 @@ public class AMRMTokenSecretManager extends
try {
LOG.info("Rolling master-key for amrm-tokens");
this.nextMasterKey = createNewMasterKey();
AMRMTokenSecretManagerState state =
AMRMTokenSecretManagerState.newInstance(
this.currentMasterKey.getMasterKey(),
this.nextMasterKey.getMasterKey());
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
true);
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
} finally {
this.writeLock.unlock();
@ -225,8 +242,8 @@ public class AMRMTokenSecretManager extends
LOG.debug("Trying to retrieve password for " + applicationAttemptId);
}
if (!appAttemptSet.contains(applicationAttemptId)) {
throw new InvalidToken("Password not found for ApplicationAttempt "
+ applicationAttemptId);
throw new InvalidToken(applicationAttemptId
+ " not found in AMRMTokenSecretManager.");
}
if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
.getKeyId()) {
@ -238,9 +255,7 @@ public class AMRMTokenSecretManager extends
return createPassword(identifier.getBytes(),
this.nextMasterKey.getSecretKey());
}
throw new InvalidToken("Given AMRMToken for application : "
+ applicationAttemptId.toString()
+ " seems to have been generated illegally.");
throw new InvalidToken("Invalid AMRMToken from " + applicationAttemptId);
} finally {
this.readLock.unlock();
}
@ -291,4 +306,25 @@ public class AMRMTokenSecretManager extends
this.readLock.unlock();
}
}
public void recover(RMState state) {
if (state.getAMRMTokenSecretManagerState() != null) {
// recover the current master key
MasterKey currentKey =
state.getAMRMTokenSecretManagerState().getCurrentMasterKey();
this.currentMasterKey =
new MasterKeyData(currentKey, createSecretKey(currentKey.getBytes()
.array()));
// recover the next master key if not null
MasterKey nextKey =
state.getAMRMTokenSecretManagerState().getNextMasterKey();
if (nextKey != null) {
this.nextMasterKey =
new MasterKeyData(nextKey, createSecretKey(nextKey.getBytes()
.array()));
this.timer.schedule(new NextKeyActivator(), this.activationDelay);
}
}
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnServerResourceManagerRecoveryProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_server_common_protos.proto";
message AMRMTokenSecretManagerStateProto {
optional MasterKeyProto current_master_key = 1;
optional MasterKeyProto next_master_key = 2;
}

View File

@ -1250,11 +1250,10 @@ public class TestRMRestart {
.getEncoded());
// assert AMRMTokenSecretManager also knows about the AMRMToken password
// TODO: fix this on YARN-2211
// Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
// Assert.assertArrayEquals(amrmToken.getPassword(),
// rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
// amrmToken.decodeIdentifier()));
Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
Assert.assertArrayEquals(amrmToken.getPassword(),
rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
amrmToken.decodeIdentifier()));
rm1.stop();
rm2.stop();
}

View File

@ -55,10 +55,12 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -176,8 +178,12 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
AMRMTokenSecretManager appTokenMgr = spy(
new AMRMTokenSecretManager(conf));
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(store);
AMRMTokenSecretManager appTokenMgr =
spy(new AMRMTokenSecretManager(conf, rmContext));
MasterKeyData masterKeyData = appTokenMgr.createNewMasterKey();
when(appTokenMgr.getMasterKey()).thenReturn(masterKeyData);
@ -576,4 +582,65 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
}
public void testAMRMTokenSecretManagerStateStore(
RMStateStoreHelper stateStoreHelper) throws Exception {
System.out.println("Start testing");
RMStateStore store = stateStoreHelper.getRMStateStore();
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
RMContext rmContext = mock(RMContext.class);
when(rmContext.getStateStore()).thenReturn(store);
Configuration conf = new YarnConfiguration();
AMRMTokenSecretManager appTokenMgr =
new AMRMTokenSecretManager(conf, rmContext);
//create and save the first masterkey
MasterKeyData firstMasterKeyData = appTokenMgr.createNewMasterKey();
AMRMTokenSecretManagerState state1 =
AMRMTokenSecretManagerState.newInstance(
firstMasterKeyData.getMasterKey(), null);
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state1,
false);
// load state
store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
Assert.assertNotNull(state.getAMRMTokenSecretManagerState());
Assert.assertEquals(firstMasterKeyData.getMasterKey(), state
.getAMRMTokenSecretManagerState().getCurrentMasterKey());
Assert.assertNull(state
.getAMRMTokenSecretManagerState().getNextMasterKey());
//create and save the second masterkey
MasterKeyData secondMasterKeyData = appTokenMgr.createNewMasterKey();
AMRMTokenSecretManagerState state2 =
AMRMTokenSecretManagerState
.newInstance(firstMasterKeyData.getMasterKey(),
secondMasterKeyData.getMasterKey());
rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state2,
true);
// load state
store = stateStoreHelper.getRMStateStore();
store.setRMDispatcher(dispatcher);
RMState state_2 = store.loadState();
Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState());
Assert.assertEquals(firstMasterKeyData.getMasterKey(), state_2
.getAMRMTokenSecretManagerState().getCurrentMasterKey());
Assert.assertEquals(secondMasterKeyData.getMasterKey(), state_2
.getAMRMTokenSecretManagerState().getNextMasterKey());
// re-create the masterKeyData based on the recovered masterkey
// should have the same secretKey
appTokenMgr.recover(state_2);
Assert.assertEquals(appTokenMgr.getCurrnetMasterKeyData().getSecretKey(),
firstMasterKeyData.getSecretKey());
Assert.assertEquals(appTokenMgr.getNextMasterKeyData().getSecretKey(),
secondMasterKeyData.getSecretKey());
store.close();
}
}

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@ -161,6 +160,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
testEpoch(fsTester);
testAppDeletion(fsTester);
testDeleteStore(fsTester);
testAMRMTokenSecretManagerStateStore(fsTester);
} finally {
cluster.shutdown();
}

View File

@ -123,6 +123,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
testEpoch(zkTester);
testAppDeletion(zkTester);
testDeleteStore(zkTester);
testAMRMTokenSecretManagerStateStore(zkTester);
}
private Configuration createHARMConf(

View File

@ -193,7 +193,7 @@ public class TestRMAppTransitions {
this.rmContext =
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
null, new AMRMTokenSecretManager(conf, this.rmContext),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(),

View File

@ -134,7 +134,8 @@ public class TestRMAppAttemptTransitions {
private RMAppAttempt applicationAttempt;
private Configuration conf = new Configuration();
private AMRMTokenSecretManager amRMTokenManager = spy(new AMRMTokenSecretManager(conf));
private AMRMTokenSecretManager amRMTokenManager =
spy(new AMRMTokenSecretManager(conf, rmContext));
private ClientToAMTokenSecretManagerInRM clientToAMTokenManager =
spy(new ClientToAMTokenSecretManagerInRM());
private NMTokenSecretManagerInRM nmTokenManager =

View File

@ -86,13 +86,12 @@ public class TestUtils {
Configuration conf = new Configuration();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext =
RMContextImpl rmContext =
new RMContextImpl(nullDispatcher, cae, null, null, null,
new AMRMTokenSecretManager(conf),
new AMRMTokenSecretManager(conf, null),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), writer);
return rmContext;
}

View File

@ -1221,6 +1221,79 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
}
@Test
public void testPreemptionIsNotDelayedToNextRound() throws Exception {
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>8</weight>");
out.println("<queue name=\"queueA1\" />");
out.println("<queue name=\"queueA2\" />");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>");
out.println("</queue>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node of 8G
RMNode node1 = MockNodes.newNodeInfo(1,
Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Run apps in queueA.A1 and queueB
ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
"queueA.queueA1", "user1", 7, 1);
// createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
"user2", 1, 1);
scheduler.update();
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
for (int i = 0; i < 8; i++) {
scheduler.handle(nodeUpdate1);
}
// verify if the apps got the containers they requested
assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
// Now submit an app in queueA.queueA2
ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
"queueA.queueA2", "user3", 7, 1);
scheduler.update();
// Let 11 sec pass
clock.tick(11);
scheduler.update();
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
.getLeafQueue("queueA.queueA2", false), clock.getTime());
assertEquals(2980, toPreempt.getMemory());
// verify if the 3 containers required by queueA2 are preempted in the same
// round
scheduler.preemptResources(toPreempt);
assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
.size());
}
@Test (timeout = 5000)
/**
* Tests the timing of decision to preempt tasks.

View File

@ -184,8 +184,8 @@ public class TestAMRMTokens {
// The exception will still have the earlier appAttemptId as it picks it
// up from the token.
Assert.assertTrue(t.getCause().getMessage().contains(
"Password not found for ApplicationAttempt " +
applicationAttemptId.toString()));
applicationAttemptId.toString()
+ " not found in AMRMTokenSecretManager."));
}
} finally {