From 707b96fa5840873e6158ab49b7d66b72e5c0739d Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Wed, 5 Aug 2015 12:57:12 -0700 Subject: [PATCH] YARN-3736. Add RMStateStore apis to store and load accepted reservations for failover (adhoot via asuresh) (cherry picked from commit f271d377357ad680924d19f07e6c8315e7c89bae) --- hadoop-yarn-project/CHANGES.txt | 2 + .../recovery/FileSystemRMStateStore.java | 207 ++++++++++++++---- .../recovery/LeveldbRMStateStore.java | 114 +++++++++- .../recovery/MemoryRMStateStore.java | 57 +++++ .../recovery/NullRMStateStore.java | 23 ++ .../recovery/RMStateStore.java | 153 ++++++++++++- .../recovery/RMStateStoreEventType.java | 5 +- .../RMStateStoreStoreReservationEvent.java | 56 +++++ .../recovery/ZKRMStateStore.java | 124 ++++++++++- .../reservation/ReservationSystemUtil.java | 102 ++++++++- ...yarn_server_resourcemanager_recovery.proto | 18 +- .../recovery/RMStateStoreTestBase.java | 186 ++++++++++++++++ .../recovery/TestFSRMStateStore.java | 1 + .../recovery/TestLeveldbRMStateStore.java | 6 + .../recovery/TestZKRMStateStore.java | 1 + .../ReservationSystemTestUtil.java | 16 ++ .../TestInMemoryReservationAllocation.java | 36 +-- 17 files changed, 1026 insertions(+), 81 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3b82fd2827a..52bdf24607b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -101,6 +101,8 @@ Release 2.8.0 - UNRELEASED YARN-3853. Add docker container runtime support to LinuxContainterExecutor. (Sidharta Seethana via vvasudev) + YARN-3736. Add RMStateStore apis to store and load accepted reservations for + failover (adhoot via asuresh) IMPROVEMENTS diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java index b9727915280..5ee4709e65b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java @@ -26,6 +26,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import org.apache.commons.logging.Log; @@ -44,12 +45,14 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; @@ -76,6 +79,8 @@ import com.google.common.annotations.VisibleForTesting; * Changes from 1.1 to 1.2, AMRMTokenSecretManager state has been saved * separately. The currentMasterkey and nextMasterkey have been stored. * Also, AMRMToken has been removed from ApplicationAttemptState. + * + * Changes from 1.2 to 1.3, Addition of ReservationSystem state. */ public class FileSystemRMStateStore extends RMStateStore { @@ -83,7 +88,7 @@ public class FileSystemRMStateStore extends RMStateStore { protected static final String ROOT_DIR_NAME = "FSRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 2); + .newInstance(1, 3); protected static final String AMRMTOKEN_SECRET_MANAGER_NODE = "AMRMTokenSecretManagerNode"; @@ -108,6 +113,8 @@ public class FileSystemRMStateStore extends RMStateStore { Path fsWorkingPath; Path amrmTokenSecretManagerRoot; + private Path reservationRoot; + @Override public synchronized void initInternal(Configuration conf) throws Exception{ @@ -117,6 +124,7 @@ public class FileSystemRMStateStore extends RMStateStore { rmAppRoot = new Path(rootDirPath, RM_APP_ROOT); amrmTokenSecretManagerRoot = new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT); + reservationRoot = new Path(rootDirPath, RESERVATION_SYSTEM_ROOT); fsNumRetries = conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES); @@ -153,6 +161,7 @@ public class FileSystemRMStateStore extends RMStateStore { mkdirsWithRetries(rmDTSecretManagerRoot); mkdirsWithRetries(rmAppRoot); mkdirsWithRetries(amrmTokenSecretManagerRoot); + mkdirsWithRetries(reservationRoot); } @Override @@ -222,9 +231,24 @@ public class FileSystemRMStateStore extends RMStateStore { loadRMAppState(rmState); // recover AMRMTokenSecretManager loadAMRMTokenSecretManagerState(rmState); + // recover reservation state + loadReservationSystemState(rmState); return rmState; } + private void loadReservationSystemState(RMState rmState) throws Exception { + try { + final ReservationStateFileProcessor fileProcessor = new + ReservationStateFileProcessor(rmState); + final Path rootDirectory = this.reservationRoot; + + processDirectoriesOfFiles(fileProcessor, rootDirectory); + } catch (Exception e) { + LOG.error("Failed to load state.", e); + throw e; + } + } + private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception { checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot); @@ -248,50 +272,12 @@ public class FileSystemRMStateStore extends RMStateStore { private void loadRMAppState(RMState rmState) throws Exception { try { - List attempts = - new ArrayList(); + List attempts = new ArrayList<>(); + final RMAppStateFileProcessor rmAppStateFileProcessor = + new RMAppStateFileProcessor(rmState, attempts); + final Path rootDirectory = this.rmAppRoot; - for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) { - checkAndResumeUpdateOperation(appDir.getPath()); - for (FileStatus childNodeStatus : - listStatusWithRetries(appDir.getPath())) { - assert childNodeStatus.isFile(); - String childNodeName = childNodeStatus.getPath().getName(); - if (checkAndRemovePartialRecordWithRetries( - childNodeStatus.getPath())) { - continue; - } - byte[] childData = readFileWithRetries(childNodeStatus.getPath(), - childNodeStatus.getLen()); - // Set attribute if not already set - setUnreadableBySuperuserXattrib(childNodeStatus.getPath()); - if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { - // application - if (LOG.isDebugEnabled()) { - LOG.debug("Loading application from node: " + childNodeName); - } - ApplicationStateDataPBImpl appState = - new ApplicationStateDataPBImpl( - ApplicationStateDataProto.parseFrom(childData)); - ApplicationId appId = - appState.getApplicationSubmissionContext().getApplicationId(); - rmState.appState.put(appId, appState); - } else if (childNodeName - .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) { - // attempt - if (LOG.isDebugEnabled()) { - LOG.debug("Loading application attempt from node: " - + childNodeName); - } - ApplicationAttemptStateDataPBImpl attemptState = - new ApplicationAttemptStateDataPBImpl( - ApplicationAttemptStateDataProto.parseFrom(childData)); - attempts.add(attemptState); - } else { - LOG.info("Unknown child node with name: " + childNodeName); - } - } - } + processDirectoriesOfFiles(rmAppStateFileProcessor, rootDirectory); // go through all attempts and add them to their apps, Ideally, each // attempt node must have a corresponding app node, because remove @@ -309,6 +295,29 @@ public class FileSystemRMStateStore extends RMStateStore { } } + private void processDirectoriesOfFiles( + RMStateFileProcessor rmAppStateFileProcessor, Path rootDirectory) + throws Exception { + for (FileStatus dir : listStatusWithRetries(rootDirectory)) { + checkAndResumeUpdateOperation(dir.getPath()); + String dirName = dir.getPath().getName(); + for (FileStatus fileNodeStatus : listStatusWithRetries(dir.getPath())) { + assert fileNodeStatus.isFile(); + String fileName = fileNodeStatus.getPath().getName(); + if (checkAndRemovePartialRecordWithRetries(fileNodeStatus.getPath())) { + continue; + } + byte[] fileData = readFileWithRetries(fileNodeStatus.getPath(), + fileNodeStatus.getLen()); + // Set attribute if not already set + setUnreadableBySuperuserXattrib(fileNodeStatus.getPath()); + + rmAppStateFileProcessor.processChildNode(dirName, fileName, + fileData); + } + } + } + private boolean checkAndRemovePartialRecord(Path record) throws IOException { // If the file ends with .tmp then it shows that it failed // during saving state into state store. The file will be deleted as a @@ -843,6 +852,41 @@ public class FileSystemRMStateStore extends RMStateStore { } } + @Override + protected void storeReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + Path planCreatePath = getNodePath(reservationRoot, planName); + mkdirsWithRetries(planCreatePath); + Path reservationPath = getNodePath(planCreatePath, reservationIdName); + LOG.info("Storing state for reservation " + reservationIdName + " from " + + "plan " + planName + " at path " + reservationPath); + byte[] reservationData = reservationAllocation.toByteArray(); + writeFileWithRetries(reservationPath, reservationData, true); + } + + @Override + protected void updateReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + Path planCreatePath = getNodePath(reservationRoot, planName); + Path reservationPath = getNodePath(planCreatePath, reservationIdName); + LOG.info("Updating state for reservation " + reservationIdName + " from " + + "plan " + planName + " at path " + reservationPath); + byte[] reservationData = reservationAllocation.toByteArray(); + updateFile(reservationPath, reservationData, true); + } + + @Override + protected void removeReservationState( + String planName, String reservationIdName) throws Exception { + Path planCreatePath = getNodePath(reservationRoot, planName); + Path reservationPath = getNodePath(planCreatePath, reservationIdName); + LOG.info("Removing state for reservation " + reservationIdName + " from " + + "plan " + planName + " at path " + reservationPath); + deleteFileWithRetries(reservationPath); + } + @VisibleForTesting public int getNumRetries() { return fsNumRetries; @@ -853,8 +897,7 @@ public class FileSystemRMStateStore extends RMStateStore { return fsRetryInterval; } - private void setUnreadableBySuperuserXattrib(Path p) - throws IOException { + private void setUnreadableBySuperuserXattrib(Path p) throws IOException { if (fs.getScheme().toLowerCase().contains("hdfs") && intermediateEncryptionEnabled && !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) { @@ -862,4 +905,76 @@ public class FileSystemRMStateStore extends RMStateStore { EnumSet.of(XAttrSetFlag.CREATE)); } } + + private static class ReservationStateFileProcessor implements + RMStateFileProcessor { + private RMState rmState; + public ReservationStateFileProcessor(RMState state) { + this.rmState = state; + } + + @Override + public void processChildNode(String planName, String childNodeName, + byte[] childData) throws IOException { + ReservationAllocationStateProto allocationState = + ReservationAllocationStateProto.parseFrom(childData); + if (!rmState.getReservationState().containsKey(planName)) { + rmState.getReservationState().put(planName, + new HashMap()); + } + ReservationId reservationId = + ReservationId.parseReservationId(childNodeName); + rmState.getReservationState().get(planName).put(reservationId, + allocationState); + } + } + + private static class RMAppStateFileProcessor implements RMStateFileProcessor { + private RMState rmState; + private List attempts; + + public RMAppStateFileProcessor(RMState rmState, + List attempts) { + this.rmState = rmState; + this.attempts = attempts; + } + + @Override + public void processChildNode(String appDirName, String childNodeName, + byte[] childData) + throws com.google.protobuf.InvalidProtocolBufferException { + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { + // application + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application from node: " + childNodeName); + } + ApplicationStateDataPBImpl appState = + new ApplicationStateDataPBImpl( + ApplicationStateDataProto.parseFrom(childData)); + ApplicationId appId = + appState.getApplicationSubmissionContext().getApplicationId(); + rmState.appState.put(appId, appState); + } else if (childNodeName.startsWith( + ApplicationAttemptId.appAttemptIdStrPrefix)) { + // attempt + if (LOG.isDebugEnabled()) { + LOG.debug("Loading application attempt from node: " + + childNodeName); + } + ApplicationAttemptStateDataPBImpl attemptState = + new ApplicationAttemptStateDataPBImpl( + ApplicationAttemptStateDataProto.parseFrom(childData)); + attempts.add(attemptState); + } else { + LOG.info("Unknown child node with name: " + childNodeName); + } + } + } + + // Interface for common state processing of directory of file layout + private interface RMStateFileProcessor { + void processChildNode(String appDirName, String childNodeName, + byte[] childData) + throws IOException; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java index 46a3459f99b..faaadb8018d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java @@ -27,6 +27,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.Map.Entry; import org.apache.commons.logging.Log; @@ -39,6 +40,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; @@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; @@ -69,6 +72,9 @@ import org.iq80.leveldb.WriteBatch; import com.google.common.annotations.VisibleForTesting; +/** + * Changes from 1.0 to 1.1, Addition of ReservationSystem state. + */ public class LeveldbRMStateStore extends RMStateStore { public static final Log LOG = @@ -84,9 +90,11 @@ public class LeveldbRMStateStore extends RMStateStore { RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber"; private static final String RM_APP_KEY_PREFIX = RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix; + private static final String RM_RESERVATION_KEY_PREFIX = + RESERVATION_SYSTEM_ROOT + SEPARATOR; private static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 0); + .newInstance(1, 1); private DB db; @@ -112,6 +120,12 @@ public class LeveldbRMStateStore extends RMStateStore { return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber(); } + private String getReservationNodeKey(String planName, + String reservationId) { + return RESERVATION_SYSTEM_ROOT + SEPARATOR + planName + SEPARATOR + + reservationId; + } + @Override protected void initInternal(Configuration conf) throws Exception { } @@ -230,9 +244,51 @@ public class LeveldbRMStateStore extends RMStateStore { loadRMDTSecretManagerState(rmState); loadRMApps(rmState); loadAMRMTokenSecretManagerState(rmState); + loadReservationState(rmState); return rmState; } + private void loadReservationState(RMState rmState) throws IOException { + int numReservations = 0; + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(RM_RESERVATION_KEY_PREFIX)); + while (iter.hasNext()) { + Entry entry = iter.next(); + String key = asString(entry.getKey()); + + String planReservationString = + key.substring(RM_RESERVATION_KEY_PREFIX.length()); + String[] parts = planReservationString.split(SEPARATOR); + if (parts.length != 2) { + LOG.warn("Incorrect reservation state key " + key); + continue; + } + String planName = parts[0]; + String reservationName = parts[1]; + ReservationAllocationStateProto allocationState = + ReservationAllocationStateProto.parseFrom(entry.getValue()); + if (!rmState.getReservationState().containsKey(planName)) { + rmState.getReservationState().put(planName, + new HashMap()); + } + ReservationId reservationId = + ReservationId.parseReservationId(reservationName); + rmState.getReservationState().get(planName).put(reservationId, + allocationState); + numReservations++; + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + LOG.info("Recovered " + numReservations + " reservations"); + } + private void loadRMDTSecretManagerState(RMState state) throws IOException { int numKeys = loadRMDTSecretManagerKeys(state); LOG.info("Recovered " + numKeys + " RM delegation token master keys"); @@ -544,7 +600,59 @@ public class LeveldbRMStateStore extends RMStateStore { throw new IOException(e); } } - + + @Override + protected void storeReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + try { + WriteBatch batch = db.createWriteBatch(); + try { + String key = getReservationNodeKey(planName, reservationIdName); + if (LOG.isDebugEnabled()) { + LOG.debug("Storing state for reservation " + reservationIdName + + " plan " + planName + " at " + key); + } + batch.put(bytes(key), reservationAllocation.toByteArray()); + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + protected void updateReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + storeReservationState(reservationAllocation, planName, + reservationIdName); + } + + @Override + protected void removeReservationState(String planName, + String reservationIdName) throws Exception { + try { + WriteBatch batch = db.createWriteBatch(); + try { + String reservationKey = + getReservationNodeKey(planName, reservationIdName); + batch.delete(bytes(reservationKey)); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing state for reservation " + reservationIdName + + " plan " + planName + " at " + reservationKey); + } + db.write(batch); + } finally { + batch.close(); + } + } catch (DBException e) { + throw new IOException(e); + } + } + private void storeOrUpdateRMDT(RMDelegationTokenIdentifier tokenId, Long renewDate, boolean isUpdate) throws IOException { String tokenKey = getRMDTTokenNodeKey(tokenId); @@ -679,7 +787,7 @@ public class LeveldbRMStateStore extends RMStateStore { iter = new LeveldbIterator(db); iter.seekToFirst(); while (iter.hasNext()) { - Entry entry = iter.next(); + Entry entry = iter.next(); LOG.info("entry: " + asString(entry.getKey())); ++numEntries; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java index 609f4031d27..b4f88054487 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -28,7 +29,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; @@ -223,6 +226,60 @@ public class MemoryRMStateStore extends RMStateStore { rmDTMasterKeyState.remove(delegationKey); } + @Override + protected synchronized void storeReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + LOG.info("Storing reservationallocation for " + reservationIdName + " " + + "for plan " + planName); + Map planState = + state.getReservationState().get(planName); + if (planState == null) { + planState = new HashMap<>(); + state.getReservationState().put(planName, planState); + } + ReservationId reservationId = + ReservationId.parseReservationId(reservationIdName); + planState.put(reservationId, reservationAllocation); + } + + @Override + protected synchronized void updateReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + LOG.info("Updating reservationallocation for " + reservationIdName + " " + + "for plan " + planName); + Map planState = + state.getReservationState().get(planName); + if (planState == null) { + throw new YarnRuntimeException("State for plan " + planName + " does " + + "not exist"); + } + ReservationId reservationId = + ReservationId.parseReservationId(reservationIdName); + planState.put(reservationId, reservationAllocation); + } + + @Override + protected synchronized void removeReservationState( + String planName, String reservationIdName) throws Exception { + LOG.info("Removing reservationallocation " + reservationIdName + + " for plan " + planName); + + Map planState = + state.getReservationState().get(planName); + if (planState == null) { + throw new YarnRuntimeException("State for plan " + planName + " does " + + "not exist"); + } + ReservationId reservationId = + ReservationId.parseReservationId(reservationIdName); + planState.remove(reservationId); + if (planState.isEmpty()) { + state.getReservationState().remove(planName); + } + } + @Override protected Version loadVersion() throws Exception { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java index 92c07cd3ba0..ffb61536bfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; @@ -101,6 +102,26 @@ public class NullRMStateStore extends RMStateStore { // Do nothing } + @Override + protected void storeReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + // Do nothing + } + + @Override + protected void removeReservationState(String planName, + String reservationIdName) throws Exception { + // Do nothing + } + + @Override + protected void updateReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception { + // Do nothing + } + @Override public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception { // Do nothing @@ -155,4 +176,6 @@ public class NullRMStateStore extends RMStateStore { // Do nothing } + + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 9b17bf78d57..50364501236 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -43,11 +43,13 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent; @@ -87,6 +89,8 @@ public abstract class RMStateStore extends AbstractService { "RMDTSequenceNumber_"; protected static final String AMRMTOKEN_SECRET_MANAGER_ROOT = "AMRMTokenSecretManagerRoot"; + protected static final String RESERVATION_SYSTEM_ROOT = + "ReservationSystemRoot"; protected static final String VERSION_NODE = "RMVersionNode"; protected static final String EPOCH_NODE = "EpochNode"; private ResourceManager resourceManager; @@ -136,7 +140,16 @@ public abstract class RMStateStore extends AbstractService { new UpdateRMDTTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.UPDATE_AMRM_TOKEN, - new StoreOrUpdateAMRMTokenTransition()) + new StoreOrUpdateAMRMTokenTransition()) + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + RMStateStoreEventType.STORE_RESERVATION, + new StoreReservationAllocationTransition()) + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + RMStateStoreEventType.UPDATE_RESERVATION, + new UpdateReservationAllocationTransition()) + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + RMStateStoreEventType.REMOVE_RESERVATION, + new RemoveReservationAllocationTransition()) .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED, RMStateStoreEventType.FENCED) .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED, @@ -152,7 +165,10 @@ public abstract class RMStateStore extends AbstractService { RMStateStoreEventType.STORE_DELEGATION_TOKEN, RMStateStoreEventType.REMOVE_DELEGATION_TOKEN, RMStateStoreEventType.UPDATE_DELEGATION_TOKEN, - RMStateStoreEventType.UPDATE_AMRM_TOKEN)); + RMStateStoreEventType.UPDATE_AMRM_TOKEN, + RMStateStoreEventType.STORE_RESERVATION, + RMStateStoreEventType.UPDATE_RESERVATION, + RMStateStoreEventType.REMOVE_RESERVATION)); private final StateMachine { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreStoreReservationEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + RMStateStoreStoreReservationEvent reservationEvent = + (RMStateStoreStoreReservationEvent) event; + try { + LOG.info("Storing reservation allocation." + reservationEvent + .getReservationIdName()); + store.storeReservationState( + reservationEvent.getReservationAllocation(), + reservationEvent.getPlanName(), + reservationEvent.getReservationIdName()); + } catch (Exception e) { + LOG.error("Error while storing reservation allocation.", e); + store.notifyStoreOperationFailed(e); + } + } + } + + private static class UpdateReservationAllocationTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreStoreReservationEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + RMStateStoreStoreReservationEvent reservationEvent = + (RMStateStoreStoreReservationEvent) event; + try { + LOG.info("Updating reservation allocation." + reservationEvent + .getReservationIdName()); + store.updateReservationState( + reservationEvent.getReservationAllocation(), + reservationEvent.getPlanName(), + reservationEvent.getReservationIdName()); + } catch (Exception e) { + LOG.error("Error while updating reservation allocation.", e); + store.notifyStoreOperationFailed(e); + } + } + } + + private static class RemoveReservationAllocationTransition implements + SingleArcTransition { + @Override + public void transition(RMStateStore store, RMStateStoreEvent event) { + if (!(event instanceof RMStateStoreStoreReservationEvent)) { + // should never happen + LOG.error("Illegal event type: " + event.getClass()); + return; + } + RMStateStoreStoreReservationEvent reservationEvent = + (RMStateStoreStoreReservationEvent) event; + try { + LOG.info("Removing reservation allocation." + reservationEvent + .getReservationIdName()); + store.removeReservationState( + reservationEvent.getPlanName(), + reservationEvent.getReservationIdName()); + } catch (Exception e) { + LOG.error("Error while removing reservation allocation.", e); + store.notifyStoreOperationFailed(e); + } + } + } + public RMStateStore() { super(RMStateStore.class.getName()); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -457,6 +547,9 @@ public abstract class RMStateStore extends AbstractService { AMRMTokenSecretManagerState amrmTokenSecretManagerState = null; + private Map> + reservationState = new TreeMap<>(); + public Map getApplicationState() { return appState; } @@ -468,6 +561,11 @@ public abstract class RMStateStore extends AbstractService { public AMRMTokenSecretManagerState getAMRMTokenSecretManagerState() { return amrmTokenSecretManagerState; } + + public Map> + getReservationState() { + return reservationState; + } } private Dispatcher rmDispatcher; @@ -745,6 +843,57 @@ public abstract class RMStateStore extends AbstractService { RMStateStoreEventType.REMOVE_MASTERKEY)); } + /** + * Blocking Apis to maintain reservation state. + */ + public void storeNewReservation( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) { + handleStoreEvent(new RMStateStoreStoreReservationEvent( + reservationAllocation, RMStateStoreEventType.STORE_RESERVATION, + planName, reservationIdName)); + } + + public void updateReservation( + ReservationAllocationStateProto reservationAllocation, + String planName, String reservationIdName) { + handleStoreEvent(new RMStateStoreStoreReservationEvent( + reservationAllocation, RMStateStoreEventType.UPDATE_RESERVATION, + planName, reservationIdName)); + } + + public void removeReservation(String planName, String reservationIdName) { + handleStoreEvent(new RMStateStoreStoreReservationEvent( + null, RMStateStoreEventType.REMOVE_RESERVATION, + planName, reservationIdName)); + } + + /** + * Blocking API + * Derived classes must implement this method to store the state of + * a reservation allocation. + */ + protected abstract void storeReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception; + + /** + * Blocking API + * Derived classes must implement this method to remove the state of + * a reservation allocation. + */ + protected abstract void removeReservationState(String planName, + String reservationIdName) throws Exception; + + /** + * Blocking API + * Derived classes must implement this method to update the state of + * a reservation allocation. + */ + protected abstract void updateReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) throws Exception; + /** * Blocking API * Derived classes must implement this method to remove the state of diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index beba5eb8f5a..6f8dca75d0f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -32,5 +32,8 @@ public enum RMStateStoreEventType { STORE_DELEGATION_TOKEN, REMOVE_DELEGATION_TOKEN, UPDATE_DELEGATION_TOKEN, - UPDATE_AMRM_TOKEN + UPDATE_AMRM_TOKEN, + STORE_RESERVATION, + UPDATE_RESERVATION, + REMOVE_RESERVATION, } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java new file mode 100644 index 00000000000..ac309103d97 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreStoreReservationEvent.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; + +/** + * Event representing maintaining ReservationSystem state. + */ +public class RMStateStoreStoreReservationEvent extends RMStateStoreEvent { + + private ReservationAllocationStateProto reservationAllocation; + private String planName; + private String reservationIdName; + + public RMStateStoreStoreReservationEvent(RMStateStoreEventType type) { + super(type); + } + + public RMStateStoreStoreReservationEvent( + ReservationAllocationStateProto reservationAllocationState, + RMStateStoreEventType type, String planName, String reservationIdName) { + this(type); + this.reservationAllocation = reservationAllocationState; + this.planName = planName; + this.reservationIdName = reservationIdName; + } + + public ReservationAllocationStateProto getReservationAllocation() { + return reservationAllocation; + } + + public String getPlanName() { + return planName; + } + + public String getReservationIdName() { + return reservationIdName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index 8f096d882aa..055008723cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -26,6 +26,7 @@ import java.nio.charset.Charset; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import com.google.common.base.Preconditions; @@ -45,6 +46,7 @@ import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.util.ZKUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -53,6 +55,7 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRM import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; @@ -107,9 +110,18 @@ import com.google.common.annotations.VisibleForTesting; * |----- currentMasterKey * |----- nextMasterKey * + * |-- RESERVATION_SYSTEM_ROOT + * |------PLAN_1 + * | |------ RESERVATION_1 + * | |------ RESERVATION_2 + * | .... + * |------PLAN_2 + * .... * Note: Changes from 1.1 to 1.2 - AMRMTokenSecretManager state has been saved * separately. The currentMasterkey and nextMasterkey have been stored. * Also, AMRMToken has been removed from ApplicationAttemptState. + * + * Changes from 1.2 to 1.3, Addition of ReservationSystem state. */ @Private @Unstable @@ -120,7 +132,7 @@ public class ZKRMStateStore extends RMStateStore { protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot"; protected static final Version CURRENT_VERSION_INFO = Version - .newInstance(1, 2); + .newInstance(1, 3); private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME = "RMDelegationTokensRoot"; private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME = @@ -142,6 +154,7 @@ public class ZKRMStateStore extends RMStateStore { private String delegationTokensRootPath; private String dtSequenceNumberPath; private String amrmTokenSecretManagerRoot; + private String reservationRoot; @VisibleForTesting protected String znodeWorkingPath; @@ -258,6 +271,7 @@ public class ZKRMStateStore extends RMStateStore { RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME); amrmTokenSecretManagerRoot = getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT); + reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT); } @Override @@ -279,6 +293,7 @@ public class ZKRMStateStore extends RMStateStore { create(delegationTokensRootPath); create(dtSequenceNumberPath); create(amrmTokenSecretManagerRoot); + create(reservationRoot); } private void logRootNodeAcls(String prefix) throws Exception { @@ -375,9 +390,41 @@ public class ZKRMStateStore extends RMStateStore { loadRMAppState(rmState); // recover AMRMTokenSecretManager loadAMRMTokenSecretManagerState(rmState); + // recover reservation state + loadReservationSystemState(rmState); return rmState; } + private void loadReservationSystemState(RMState rmState) throws Exception { + List planNodes = getChildren(reservationRoot); + for (String planName : planNodes) { + if (LOG.isDebugEnabled()) { + LOG.debug("Loading plan from znode: " + planName); + } + String planNodePath = getNodePath(reservationRoot, planName); + + List reservationNodes = getChildren(planNodePath); + for (String reservationNodeName : reservationNodes) { + String reservationNodePath = getNodePath(planNodePath, + reservationNodeName); + if (LOG.isDebugEnabled()) { + LOG.debug("Loading reservation from znode: " + reservationNodePath); + } + byte[] reservationData = getData(reservationNodePath); + ReservationAllocationStateProto allocationState = + ReservationAllocationStateProto.parseFrom(reservationData); + if (!rmState.getReservationState().containsKey(planName)) { + rmState.getReservationState().put(planName, + new HashMap()); + } + ReservationId reservationId = + ReservationId.parseReservationId(reservationNodeName); + rmState.getReservationState().get(planName).put(reservationId, + allocationState); + } + } + } + private void loadAMRMTokenSecretManagerState(RMState rmState) throws Exception { byte[] data = getData(amrmTokenSecretManagerRoot); @@ -763,6 +810,81 @@ public class ZKRMStateStore extends RMStateStore { safeSetData(amrmTokenSecretManagerRoot, stateData, -1); } + @Override + protected synchronized void removeReservationState(String planName, + String reservationIdName) + throws Exception { + String planNodePath = + getNodePath(reservationRoot, planName); + String reservationPath = getNodePath(planNodePath, + reservationIdName); + if (LOG.isDebugEnabled()) { + LOG.debug("Removing reservationallocation " + reservationIdName + " for" + + " plan " + planName); + } + safeDelete(reservationPath); + + List reservationNodes = getChildren(planNodePath); + if (reservationNodes.isEmpty()) { + safeDelete(planNodePath); + } + } + + @Override + protected synchronized void storeReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) + throws Exception { + SafeTransaction trx = new SafeTransaction(); + addOrUpdateReservationState( + reservationAllocation, planName, reservationIdName, trx, false); + trx.commit(); + } + + @Override + protected synchronized void updateReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName) + throws Exception { + SafeTransaction trx = new SafeTransaction(); + addOrUpdateReservationState( + reservationAllocation, planName, reservationIdName, trx, true); + trx.commit(); + } + + private void addOrUpdateReservationState( + ReservationAllocationStateProto reservationAllocation, String planName, + String reservationIdName, SafeTransaction trx, boolean isUpdate) + throws Exception { + String planCreatePath = + getNodePath(reservationRoot, planName); + String reservationPath = getNodePath(planCreatePath, + reservationIdName); + byte[] reservationData = reservationAllocation.toByteArray(); + + if (!exists(planCreatePath)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating plan node: " + planName + " at: " + planCreatePath); + } + trx.create(planCreatePath, null, zkAcl, CreateMode.PERSISTENT); + } + + if (isUpdate) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating reservation: " + reservationIdName + " in plan:" + + planName + " at: " + reservationPath); + } + trx.setData(reservationPath, reservationData, -1); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing reservation: " + reservationIdName + " in plan:" + + planName + " at: " + reservationPath); + } + trx.create(reservationPath, reservationData, zkAcl, + CreateMode.PERSISTENT); + } + } + /** * Utility function to ensure that the configured base znode exists. * This recursively creates the znode as well as all of its parents. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java index 5562adcf95e..98466d537e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java @@ -18,11 +18,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationDefinitionProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ResourceAllocationRequestProto; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; - import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -52,4 +64,92 @@ public final class ReservationSystemUtil { } return resources; } + + public static ReservationAllocationStateProto buildStateProto( + ReservationAllocation allocation) { + ReservationAllocationStateProto.Builder builder = + ReservationAllocationStateProto.newBuilder(); + + builder.setAcceptanceTimestamp(allocation.getAcceptanceTime()); + builder.setContainsGangs(allocation.containsGangs()); + builder.setStartTime(allocation.getStartTime()); + builder.setEndTime(allocation.getEndTime()); + builder.setUser(allocation.getUser()); + ReservationDefinitionProto definitionProto = convertToProtoFormat( + allocation.getReservationDefinition()); + builder.setReservationDefinition(definitionProto); + + for (Map.Entry entry : + allocation.getAllocationRequests().entrySet()) { + ResourceAllocationRequestProto p = + ResourceAllocationRequestProto.newBuilder() + .setStartTime(entry.getKey().getStartTime()) + .setEndTime(entry.getKey().getEndTime()) + .setResource(convertToProtoFormat(entry.getValue())) + .build(); + builder.addAllocationRequests(p); + } + + ReservationAllocationStateProto allocationProto = builder.build(); + return allocationProto; + } + + private static ReservationDefinitionProto convertToProtoFormat( + ReservationDefinition reservationDefinition) { + return ((ReservationDefinitionPBImpl)reservationDefinition).getProto(); + } + + public static ResourceProto convertToProtoFormat(Resource e) { + return YarnProtos.ResourceProto.newBuilder() + .setMemory(e.getMemory()) + .setVirtualCores(e.getVirtualCores()) + .build(); + } + + public static Map toAllocations( + List allocationRequestsList) { + Map allocations = new HashMap<>(); + for (ResourceAllocationRequestProto proto : allocationRequestsList) { + allocations.put( + new ReservationInterval(proto.getStartTime(), proto.getEndTime()), + convertFromProtoFormat(proto.getResource())); + } + return allocations; + } + + private static ResourcePBImpl convertFromProtoFormat(ResourceProto resource) { + return new ResourcePBImpl(resource); + } + + public static ReservationDefinitionPBImpl convertFromProtoFormat( + ReservationDefinitionProto r) { + return new ReservationDefinitionPBImpl(r); + } + + public static ReservationIdPBImpl convertFromProtoFormat( + ReservationIdProto r) { + return new ReservationIdPBImpl(r); + } + + public static ReservationId toReservationId( + ReservationIdProto reservationId) { + return new ReservationIdPBImpl(reservationId); + } + + public static InMemoryReservationAllocation toInMemoryAllocation( + String planName, ReservationId reservationId, + ReservationAllocationStateProto allocationState, Resource minAlloc, + ResourceCalculator planResourceCalculator) { + ReservationDefinition definition = + convertFromProtoFormat( + allocationState.getReservationDefinition()); + Map allocations = toAllocations( + allocationState.getAllocationRequestsList()); + InMemoryReservationAllocation allocation = + new InMemoryReservationAllocation(reservationId, definition, + allocationState.getUser(), planName, allocationState.getStartTime(), + allocationState.getEndTime(), allocations, planResourceCalculator, + minAlloc, allocationState.getContainsGangs()); + return allocation; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto index 3c8ac340d77..a0bd99b1c37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_recovery.proto @@ -96,4 +96,20 @@ message AMRMTokenSecretManagerStateProto { message RMDelegationTokenIdentifierDataProto { optional YARNDelegationTokenIdentifierProto token_identifier = 1; optional int64 renewDate = 2; -} \ No newline at end of file +} + +message ResourceAllocationRequestProto { + optional int64 start_time = 1; + optional int64 end_time = 2; + optional ResourceProto resource = 3; +} + +message ReservationAllocationStateProto { + optional ReservationDefinitionProto reservation_definition = 1; + repeated ResourceAllocationRequestProto allocation_requests = 2; + optional int64 start_time = 3; + optional int64 end_time = 4; + optional string user = 5; + optional bool contains_gangs = 6; + optional int64 acceptance_timestamp = 7; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index 9e0d22bb244..7b05af3bcc4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -18,6 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -36,6 +39,11 @@ import java.util.Map; import javax.crypto.SecretKey; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,6 +71,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Applicatio import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.InMemoryReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; @@ -691,4 +701,180 @@ public class RMStateStoreTestBase { store.close(); } + + public void testReservationStateStore( + RMStateStoreHelper stateStoreHelper) throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + RMContext rmContext = mock(RMContext.class); + when(rmContext.getStateStore()).thenReturn(store); + + long ts = System.currentTimeMillis(); + ReservationId r1 = ReservationId.newInstance(ts, 1); + int start = 1; + int[] alloc = { 10, 10, 10, 10, 10 }; + ResourceCalculator res = new DefaultResourceCalculator(); + Resource minAlloc = Resource.newInstance(1024, 1); + boolean hasGang = true; + String planName = "dedicated"; + ReservationDefinition rDef = + ReservationSystemTestUtil.createSimpleReservationDefinition( + start, start + alloc.length + 1, alloc.length); + ReservationAllocation allocation = new InMemoryReservationAllocation( + r1, rDef, "u3", planName, 0, 0 + alloc.length, + ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res, + minAlloc, hasGang); + ReservationAllocationStateProto allocationStateProto = + ReservationSystemUtil.buildStateProto(allocation); + assertAllocationStateEqual(allocation, allocationStateProto); + + // 1. Load empty store and verify no errors + store = stateStoreHelper.getRMStateStore(); + when(rmContext.getStateStore()).thenReturn(store); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + Map> + reservationState = state.getReservationState(); + Assert.assertNotNull(reservationState); + + // 2. Store single reservation and verify + String reservationIdName = r1.toString(); + rmContext.getStateStore().storeNewReservation( + allocationStateProto, + planName, reservationIdName); + + + // load state and verify new state + validateStoredReservation( + stateStoreHelper, dispatcher, rmContext, r1, planName, allocation, + allocationStateProto); + + // 3. update state test + alloc = new int[]{6, 6, 6}; + hasGang = false; + allocation = new InMemoryReservationAllocation( + r1, rDef, "u3", planName, 2, 2 + alloc.length, + ReservationSystemTestUtil.generateAllocation(1L, 2L, alloc), res, + minAlloc, hasGang); + allocationStateProto = + ReservationSystemUtil.buildStateProto(allocation); + rmContext.getStateStore().updateReservation( + allocationStateProto, + planName, reservationIdName); + + // load state and verify updated reservation + validateStoredReservation( + stateStoreHelper, dispatcher, rmContext, r1, planName, allocation, + allocationStateProto); + + // 4. add a second one and remove the first one + ReservationId r2 = ReservationId.newInstance(ts, 2); + ReservationAllocation allocation2 = new InMemoryReservationAllocation( + r2, rDef, "u3", planName, 0, 0 + alloc.length, + ReservationSystemTestUtil.generateAllocation(0L, 1L, alloc), res, + minAlloc, hasGang); + ReservationAllocationStateProto allocationStateProto2 = + ReservationSystemUtil.buildStateProto(allocation2); + String reservationIdName2 = r2.toString(); + + rmContext.getStateStore().storeNewReservation( + allocationStateProto2, + planName, reservationIdName2); + rmContext.getStateStore().removeReservation(planName, reservationIdName); + + // load state and verify r1 is removed and r2 is still there + Map reservations; + + store = stateStoreHelper.getRMStateStore(); + when(rmContext.getStateStore()).thenReturn(store); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + reservationState = state.getReservationState(); + Assert.assertNotNull(reservationState); + reservations = reservationState.get(planName); + Assert.assertNotNull(reservations); + ReservationAllocationStateProto storedReservationAllocation = + reservations.get(r1); + Assert.assertNull("Removed reservation should not be available in store", + storedReservationAllocation); + + storedReservationAllocation = reservations.get(r2); + assertAllocationStateEqual( + allocationStateProto2, storedReservationAllocation); + assertAllocationStateEqual(allocation2, storedReservationAllocation); + + + // 5. remove last reservation removes the plan state + rmContext.getStateStore().removeReservation(planName, reservationIdName2); + + store = stateStoreHelper.getRMStateStore(); + when(rmContext.getStateStore()).thenReturn(store); + store.setRMDispatcher(dispatcher); + state = store.loadState(); + reservationState = state.getReservationState(); + Assert.assertNotNull(reservationState); + reservations = reservationState.get(planName); + Assert.assertNull(reservations); + } + + private void validateStoredReservation( + RMStateStoreHelper stateStoreHelper, TestDispatcher dispatcher, + RMContext rmContext, ReservationId r1, String planName, + ReservationAllocation allocation, + ReservationAllocationStateProto allocationStateProto) throws Exception { + RMStateStore store = stateStoreHelper.getRMStateStore(); + when(rmContext.getStateStore()).thenReturn(store); + store.setRMDispatcher(dispatcher); + RMState state = store.loadState(); + Map> + reservationState = state.getReservationState(); + Assert.assertNotNull(reservationState); + Map reservations = + reservationState.get(planName); + Assert.assertNotNull(reservations); + ReservationAllocationStateProto storedReservationAllocation = + reservations.get(r1); + Assert.assertNotNull(storedReservationAllocation); + + assertAllocationStateEqual( + allocationStateProto, storedReservationAllocation); + assertAllocationStateEqual(allocation, storedReservationAllocation); + } + + void assertAllocationStateEqual( + ReservationAllocationStateProto expected, + ReservationAllocationStateProto actual) { + + Assert.assertEquals( + expected.getAcceptanceTimestamp(), actual.getAcceptanceTimestamp()); + Assert.assertEquals(expected.getStartTime(), actual.getStartTime()); + Assert.assertEquals(expected.getEndTime(), actual.getEndTime()); + Assert.assertEquals(expected.getContainsGangs(), actual.getContainsGangs()); + Assert.assertEquals(expected.getUser(), actual.getUser()); + assertEquals( + expected.getReservationDefinition(), actual.getReservationDefinition()); + assertEquals(expected.getAllocationRequestsList(), + actual.getAllocationRequestsList()); + } + + void assertAllocationStateEqual( + ReservationAllocation expected, + ReservationAllocationStateProto actual) { + Assert.assertEquals( + expected.getAcceptanceTime(), actual.getAcceptanceTimestamp()); + Assert.assertEquals(expected.getStartTime(), actual.getStartTime()); + Assert.assertEquals(expected.getEndTime(), actual.getEndTime()); + Assert.assertEquals(expected.containsGangs(), actual.getContainsGangs()); + Assert.assertEquals(expected.getUser(), actual.getUser()); + assertEquals( + expected.getReservationDefinition(), + ReservationSystemUtil.convertFromProtoFormat( + actual.getReservationDefinition())); + assertEquals( + expected.getAllocationRequests(), + ReservationSystemUtil.toAllocations( + actual.getAllocationRequestsList())); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java index c8422940afd..bd3b62e8392 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java @@ -186,6 +186,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase { testDeleteStore(fsTester); testRemoveApplication(fsTester); testAMRMTokenSecretManagerStateStore(fsTester); + testReservationStateStore(fsTester); } finally { cluster.shutdown(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java index 17cffa02d8a..46661426c8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java @@ -102,6 +102,12 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase { testAMRMTokenSecretManagerStateStore(tester); } + @Test(timeout = 60000) + public void testReservation() throws Exception { + LeveldbStateStoreTester tester = new LeveldbStateStoreTester(); + testReservationStateStore(tester); + } + class LeveldbStateStoreTester implements RMStateStoreHelper { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 34a4492d729..df9665377ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -174,6 +174,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { testDeleteStore(zkTester); testRemoveApplication(zkTester); testAMRMTokenSecretManagerStateStore(zkTester); + testReservationStateStore(zkTester); ((TestZKRMStateStoreTester.TestZKRMStateStoreInternal) zkTester.getRMStateStore()).testRetryingCreateRootDir(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index adb9dcf8786..e07b33ea613 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -185,6 +185,22 @@ public class ReservationSystemTestUtil { return scheduler; } + public static ReservationDefinition createSimpleReservationDefinition( + long arrival, long deadline, long duration) { + // create a request with a single atomic ask + ReservationRequest r = + ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1, + duration); + ReservationDefinition rDef = new ReservationDefinitionPBImpl(); + ReservationRequests reqs = new ReservationRequestsPBImpl(); + reqs.setReservationResources(Collections.singletonList(r)); + reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); + rDef.setReservationRequests(reqs); + rDef.setArrival(arrival); + rDef.setDeadline(deadline); + return rDef; + } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java index 55224a9d1a7..9fd51134ca3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryReservationAllocation.java @@ -17,7 +17,6 @@ *******************************************************************************/ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -25,11 +24,7 @@ import java.util.Random; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; -import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.After; @@ -67,7 +62,8 @@ public class TestInMemoryReservationAllocation { int[] alloc = { 10, 10, 10, 10, 10, 10 }; int start = 100; ReservationDefinition rDef = - createSimpleReservationDefinition(start, start + alloc.length + 1, + ReservationSystemTestUtil.createSimpleReservationDefinition( + start, start + alloc.length + 1, alloc.length); Map allocations = generateAllocation(start, alloc, false, false); @@ -89,7 +85,8 @@ public class TestInMemoryReservationAllocation { int[] alloc = { 10, 10, 10, 10, 10, 10 }; int start = 100; ReservationDefinition rDef = - createSimpleReservationDefinition(start, start + alloc.length + 1, + ReservationSystemTestUtil.createSimpleReservationDefinition( + start, start + alloc.length + 1, alloc.length); Map allocations = generateAllocation(start, alloc, true, false); @@ -112,7 +109,8 @@ public class TestInMemoryReservationAllocation { int[] alloc = { 0, 5, 10, 10, 5, 0 }; int start = 100; ReservationDefinition rDef = - createSimpleReservationDefinition(start, start + alloc.length + 1, + ReservationSystemTestUtil.createSimpleReservationDefinition( + start, start + alloc.length + 1, alloc.length); Map allocations = generateAllocation(start, alloc, true, false); @@ -135,7 +133,8 @@ public class TestInMemoryReservationAllocation { int[] alloc = {}; long start = 0; ReservationDefinition rDef = - createSimpleReservationDefinition(start, start + alloc.length + 1, + ReservationSystemTestUtil.createSimpleReservationDefinition( + start, start + alloc.length + 1, alloc.length); Map allocations = new HashMap(); @@ -154,7 +153,8 @@ public class TestInMemoryReservationAllocation { int[] alloc = { 10, 10, 10, 10, 10, 10 }; int start = 100; ReservationDefinition rDef = - createSimpleReservationDefinition(start, start + alloc.length + 1, + ReservationSystemTestUtil.createSimpleReservationDefinition( + start, start + alloc.length + 1, alloc.length); boolean isGang = true; Map allocations = @@ -184,22 +184,6 @@ public class TestInMemoryReservationAllocation { Assert.assertEquals(start + alloc.length + 1, rAllocation.getEndTime()); } - private ReservationDefinition createSimpleReservationDefinition(long arrival, - long deadline, long duration) { - // create a request with a single atomic ask - ReservationRequest r = - ReservationRequest.newInstance(Resource.newInstance(1024, 1), 1, 1, - duration); - ReservationDefinition rDef = new ReservationDefinitionPBImpl(); - ReservationRequests reqs = new ReservationRequestsPBImpl(); - reqs.setReservationResources(Collections.singletonList(r)); - reqs.setInterpreter(ReservationRequestInterpreter.R_ALL); - rDef.setReservationRequests(reqs); - rDef.setArrival(arrival); - rDef.setDeadline(deadline); - return rDef; - } - private Map generateAllocation( int startTime, int[] alloc, boolean isStep, boolean isGang) { Map req =