From 380cc4dbedbd0d5add6579f4bc69d4e89056053f Mon Sep 17 00:00:00 2001 From: Junping Du Date: Thu, 12 Feb 2015 11:46:47 -0800 Subject: [PATCH] YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager restart. (Contributed by Jason Lowe) (cherry picked from commit 04f5ef18f7877ce30b12b1a3c1e851c420531b72) --- hadoop-yarn-project/CHANGES.txt | 3 + .../ContainerManagerImpl.java | 4 +- .../loghandler/NonAggregatingLogHandler.java | 63 +++++++++++++-- .../recovery/NMLeveldbStateStoreService.java | 67 +++++++++++++++- .../recovery/NMNullStateStoreService.java | 16 ++++ .../recovery/NMStateStoreService.java | 35 ++++++++ .../yarn_server_nodemanager_recovery.proto | 4 + .../TestNonAggregatingLogHandler.java | 79 +++++++++++++++++-- .../recovery/NMMemoryStateStoreService.java | 79 +++++++++++++------ .../TestNMLeveldbStateStoreService.java | 51 ++++++++++++ 10 files changed, 362 insertions(+), 39 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fc38eb98bed..f7828d664e4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -240,6 +240,9 @@ Release 2.7.0 - UNRELEASED YARN-3147. Clean up RM web proxy code. (Steve Loughran via xgong) + YARN-2079. Recover NonAggregatingLogHandler state upon nodemanager + restart. (Jason Lowe via junping_du) + OPTIMIZATIONS YARN-2990. FairScheduler's delay-scheduling always waits for node-local and diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index bb277d94b8b..acac600ac0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -135,7 +135,6 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import com.google.common.annotations.VisibleForTesting; @@ -362,7 +361,8 @@ public class ContainerManagerImpl extends CompositeService implements deletionService, dirsHandler); } else { return new NonAggregatingLogHandler(this.dispatcher, deletionService, - dirsHandler); + dirsHandler, + context.getNMStateStore()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 0422ef9eea4..471e994ae52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -62,15 +65,18 @@ public class NonAggregatingLogHandler extends AbstractService implements private final Map appOwners; private final LocalDirsHandlerService dirsHandler; + private final NMStateStoreService stateStore; private long deleteDelaySeconds; private ScheduledThreadPoolExecutor sched; public NonAggregatingLogHandler(Dispatcher dispatcher, - DeletionService delService, LocalDirsHandlerService dirsHandler) { + DeletionService delService, LocalDirsHandlerService dirsHandler, + NMStateStoreService stateStore) { super(NonAggregatingLogHandler.class.getName()); this.dispatcher = dispatcher; this.delService = delService; this.dirsHandler = dirsHandler; + this.stateStore = stateStore; this.appOwners = new ConcurrentHashMap(); } @@ -82,6 +88,7 @@ public class NonAggregatingLogHandler extends AbstractService implements YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS); sched = createScheduledThreadPoolExecutor(conf); super.serviceInit(conf); + recover(); } @Override @@ -110,6 +117,31 @@ public class NonAggregatingLogHandler extends AbstractService implements } } + private void recover() throws IOException { + if (stateStore.canRecover()) { + RecoveredLogDeleterState state = stateStore.loadLogDeleterState(); + long now = System.currentTimeMillis(); + for (Map.Entry entry : + state.getLogDeleterMap().entrySet()) { + ApplicationId appId = entry.getKey(); + LogDeleterProto proto = entry.getValue(); + long deleteDelayMsec = proto.getDeletionTime() - now; + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling deletion of " + appId + " logs in " + + deleteDelayMsec + " msec"); + } + LogDeleterRunnable logDeleter = + new LogDeleterRunnable(proto.getUser(), appId); + try { + sched.schedule(logDeleter, deleteDelayMsec, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + // Handling this event in local thread before starting threads + // or after calling sched.shutdownNow(). + logDeleter.run(); + } + } + } + } @SuppressWarnings("unchecked") @Override @@ -130,13 +162,28 @@ public class NonAggregatingLogHandler extends AbstractService implements case APPLICATION_FINISHED: LogHandlerAppFinishedEvent appFinishedEvent = (LogHandlerAppFinishedEvent) event; + ApplicationId appId = appFinishedEvent.getApplicationId(); // Schedule - so that logs are available on the UI till they're deleted. LOG.info("Scheduling Log Deletion for application: " - + appFinishedEvent.getApplicationId() + ", with delay of " + + appId + ", with delay of " + this.deleteDelaySeconds + " seconds"); - LogDeleterRunnable logDeleter = - new LogDeleterRunnable(appOwners.remove(appFinishedEvent - .getApplicationId()), appFinishedEvent.getApplicationId()); + String user = appOwners.remove(appId); + if (user == null) { + LOG.error("Unable to locate user for " + appId); + break; + } + LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId); + long deletionTimestamp = System.currentTimeMillis() + + this.deleteDelaySeconds * 1000; + LogDeleterProto deleterProto = LogDeleterProto.newBuilder() + .setUser(user) + .setDeletionTime(deletionTimestamp) + .build(); + try { + stateStore.storeLogDeleter(appId, deleterProto); + } catch (IOException e) { + LOG.error("Unable to record log deleter state", e); + } try { sched.schedule(logDeleter, this.deleteDelaySeconds, TimeUnit.SECONDS); @@ -198,6 +245,12 @@ public class NonAggregatingLogHandler extends AbstractService implements NonAggregatingLogHandler.this.delService.delete(user, null, (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); } + try { + NonAggregatingLogHandler.this.stateStore.removeLogDeleter( + this.applicationId); + } catch (IOException e) { + LOG.error("Error removing log deletion state", e); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 5f349dbe359..df5818222fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -115,6 +116,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_TOKENS_PREV_MASTER_KEY = CONTAINER_TOKENS_KEY_PREFIX + PREV_MASTER_KEY_SUFFIX; + private static final String LOG_DELETER_KEY_PREFIX = "LogDeleters/"; + private static final byte[] EMPTY_VALUE = new byte[0]; private DB db; @@ -851,6 +854,69 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } + @Override + public RecoveredLogDeleterState loadLogDeleterState() throws IOException { + RecoveredLogDeleterState state = new RecoveredLogDeleterState(); + state.logDeleterMap = new HashMap(); + LeveldbIterator iter = null; + try { + iter = new LeveldbIterator(db); + iter.seek(bytes(LOG_DELETER_KEY_PREFIX)); + final int logDeleterKeyPrefixLength = LOG_DELETER_KEY_PREFIX.length(); + while (iter.hasNext()) { + Entry entry = iter.next(); + String fullKey = asString(entry.getKey()); + if (!fullKey.startsWith(LOG_DELETER_KEY_PREFIX)) { + break; + } + + String appIdStr = fullKey.substring(logDeleterKeyPrefixLength); + ApplicationId appId = null; + try { + appId = ConverterUtils.toApplicationId(appIdStr); + } catch (IllegalArgumentException e) { + LOG.warn("Skipping unknown log deleter key " + fullKey); + continue; + } + + LogDeleterProto proto = LogDeleterProto.parseFrom(entry.getValue()); + state.logDeleterMap.put(appId, proto); + } + } catch (DBException e) { + throw new IOException(e); + } finally { + if (iter != null) { + iter.close(); + } + } + return state; + } + + @Override + public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) + throws IOException { + String key = getLogDeleterKey(appId); + try { + db.put(bytes(key), proto.toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void removeLogDeleter(ApplicationId appId) throws IOException { + String key = getLogDeleterKey(appId); + try { + db.delete(bytes(key)); + } catch (DBException e) { + throw new IOException(e); + } + } + + private String getLogDeleterKey(ApplicationId appId) { + return LOG_DELETER_KEY_PREFIX + appId; + } + @Override protected void initStorage(Configuration conf) throws IOException { @@ -966,5 +1032,4 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { + getCurrentVersion() + ", but loading version " + loadedVersion); } } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 66469697987..ab49543c403 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; // The state store to use when state isn't being stored @@ -191,6 +192,21 @@ public class NMNullStateStoreService extends NMStateStoreService { throws IOException { } + @Override + public RecoveredLogDeleterState loadLogDeleterState() throws IOException { + throw new UnsupportedOperationException( + "Recovery not supported by this state store"); + } + + @Override + public void storeLogDeleter(ApplicationId appId, LogDeleterProto proto) + throws IOException { + } + + @Override + public void removeLogDeleter(ApplicationId appId) throws IOException { + } + @Override protected void initStorage(Configuration conf) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index b6ca336d18a..fa663495bc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; @Private @@ -189,6 +190,14 @@ public abstract class NMStateStoreService extends AbstractService { } } + public static class RecoveredLogDeleterState { + Map logDeleterMap; + + public Map getLogDeleterMap() { + return logDeleterMap; + } + } + /** Initialize the state storage */ @Override public void serviceInit(Configuration conf) throws IOException { @@ -459,6 +468,32 @@ public abstract class NMStateStoreService extends AbstractService { throws IOException; + /** + * Load the state of log deleters + * @return recovered log deleter state + * @throws IOException + */ + public abstract RecoveredLogDeleterState loadLogDeleterState() + throws IOException; + + /** + * Store the state of a log deleter + * @param appId the application ID for the log deleter + * @param proto the serialized state of the log deleter + * @throws IOException + */ + public abstract void storeLogDeleter(ApplicationId appId, + LogDeleterProto proto) throws IOException; + + /** + * Remove the state of a log deleter + * @param appId the application ID for the log deleter + * @throws IOException + */ + public abstract void removeLogDeleter(ApplicationId appId) + throws IOException; + + protected abstract void initStorage(Configuration conf) throws IOException; protected abstract void startStorage() throws IOException; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto index d8fdd8b7212..ade8c1ac867 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto @@ -47,3 +47,7 @@ message LocalizedResourceProto { optional int64 size = 3; } +message LogDeleterProto { + optional string user = 1; + optional int64 deletionTime = 2; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java index d0f647235db..0bab5eabbd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java @@ -18,10 +18,12 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -65,10 +67,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.mockito.exceptions.verification.WantedButNotInvoked; @@ -123,7 +129,8 @@ public class TestNonAggregatingLogHandler { dirsHandler.init(conf); NonAggregatingLogHandler rawLogHandler = - new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler); + new NonAggregatingLogHandler(dispatcher, mockDelService, dirsHandler, + new NMNullStateStoreService()); NonAggregatingLogHandler logHandler = spy(rawLogHandler); AbstractFileSystem spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); @@ -209,7 +216,8 @@ public class TestNonAggregatingLogHandler { @Test public void testStop() throws Exception { NonAggregatingLogHandler aggregatingLogHandler = - new NonAggregatingLogHandler(null, null, null); + new NonAggregatingLogHandler(null, null, null, + new NMNullStateStoreService()); // It should not throw NullPointerException aggregatingLogHandler.stop(); @@ -232,7 +240,8 @@ public class TestNonAggregatingLogHandler { NonAggregatingLogHandler aggregatingLogHandler = new NonAggregatingLogHandler(new InlineDispatcher(), delService, - dirsHandler); + dirsHandler, + new NMNullStateStoreService()); dirsHandler.init(conf); dirsHandler.start(); @@ -258,7 +267,13 @@ public class TestNonAggregatingLogHandler { public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher, DeletionService delService, LocalDirsHandlerService dirsHandler) { - super(dispatcher, delService, dirsHandler); + this(dispatcher, delService, dirsHandler, new NMNullStateStoreService()); + } + + public NonAggregatingLogHandlerWithMockExecutor(Dispatcher dispatcher, + DeletionService delService, LocalDirsHandlerService dirsHandler, + NMStateStoreService stateStore) { + super(dispatcher, delService, dirsHandler, stateStore); } @Override @@ -303,7 +318,8 @@ public class TestNonAggregatingLogHandler { LocalDirsHandlerService mockDirsHandler = mock(LocalDirsHandlerService.class); NonAggregatingLogHandler rawLogHandler = - new NonAggregatingLogHandler(dispatcher, mockDelService, mockDirsHandler); + new NonAggregatingLogHandler(dispatcher, mockDelService, + mockDirsHandler, new NMNullStateStoreService()); NonAggregatingLogHandler logHandler = spy(rawLogHandler); AbstractFileSystem spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); @@ -316,7 +332,58 @@ public class TestNonAggregatingLogHandler { mockDirsHandler, conf, spylfs, lfs, localLogDirs); logHandler.close(); } - + + @Test + public void testRecovery() throws Exception { + File[] localLogDirs = getLocalLogDirFiles(this.getClass().getName(), 2); + String localLogDirsString = + localLogDirs[0].getAbsolutePath() + "," + + localLogDirs[1].getAbsolutePath(); + + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, + YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS); + + dirsHandler.init(conf); + + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + NonAggregatingLogHandlerWithMockExecutor logHandler = + new NonAggregatingLogHandlerWithMockExecutor(dispatcher, mockDelService, + dirsHandler, stateStore); + logHandler.init(conf); + logHandler.start(); + + logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, + ContainerLogsRetentionPolicy.ALL_CONTAINERS, null)); + logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0)); + logHandler.handle(new LogHandlerAppFinishedEvent(appId)); + + // simulate a restart and verify deletion is rescheduled + logHandler.close(); + logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher, + mockDelService, dirsHandler, stateStore); + logHandler.init(conf); + logHandler.start(); + ArgumentCaptor schedArg = ArgumentCaptor.forClass(Runnable.class); + verify(logHandler.mockSched).schedule(schedArg.capture(), + anyLong(), eq(TimeUnit.MILLISECONDS)); + + // execute the runnable and verify another restart has nothing scheduled + schedArg.getValue().run(); + logHandler.close(); + logHandler = new NonAggregatingLogHandlerWithMockExecutor(dispatcher, + mockDelService, dirsHandler, stateStore); + logHandler.init(conf); + logHandler.start(); + verify(logHandler.mockSched, never()).schedule(any(Runnable.class), + anyLong(), any(TimeUnit.class)); + logHandler.close(); + } + /** * Function to run a log handler with directories failing the getFileStatus * call. The function accepts the log handler, setup the mocks to fail with diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index d4040915ef7..e0487e7f033 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -48,6 +49,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { private Map deleteTasks; private RecoveredNMTokensState nmTokenState; private RecoveredContainerTokensState containerTokenState; + private Map logDeleterState; public NMMemoryStateStoreService() { super(NMMemoryStateStoreService.class.getName()); @@ -65,6 +67,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { containerTokenState.activeTokens = new HashMap(); trackerStates = new HashMap(); deleteTasks = new HashMap(); + logDeleterState = new HashMap(); } @Override @@ -77,7 +80,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override - public RecoveredApplicationsState loadApplicationsState() + public synchronized RecoveredApplicationsState loadApplicationsState() throws IOException { RecoveredApplicationsState state = new RecoveredApplicationsState(); state.applications = new ArrayList( @@ -87,7 +90,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeApplication(ApplicationId appId, + public synchronized void storeApplication(ApplicationId appId, ContainerManagerApplicationProto proto) throws IOException { ContainerManagerApplicationProto protoCopy = ContainerManagerApplicationProto.parseFrom(proto.toByteString()); @@ -95,18 +98,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeFinishedApplication(ApplicationId appId) { + public synchronized void storeFinishedApplication(ApplicationId appId) { finishedApps.add(appId); } @Override - public void removeApplication(ApplicationId appId) throws IOException { + public synchronized void removeApplication(ApplicationId appId) + throws IOException { apps.remove(appId); finishedApps.remove(appId); } @Override - public List loadContainersState() + public synchronized List loadContainersState() throws IOException { // return a copy so caller can't modify our state List result = @@ -124,7 +128,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeContainer(ContainerId containerId, + public synchronized void storeContainer(ContainerId containerId, StartContainerRequest startRequest) throws IOException { RecoveredContainerState rcs = new RecoveredContainerState(); rcs.startRequest = startRequest; @@ -132,14 +136,14 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeContainerDiagnostics(ContainerId containerId, + public synchronized void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); rcs.diagnostics = diagnostics.toString(); } @Override - public void storeContainerLaunched(ContainerId containerId) + public synchronized void storeContainerLaunched(ContainerId containerId) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); if (rcs.exitCode != ContainerExitStatus.INVALID) { @@ -149,22 +153,23 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeContainerKilled(ContainerId containerId) + public synchronized void storeContainerKilled(ContainerId containerId) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); rcs.killed = true; } @Override - public void storeContainerCompleted(ContainerId containerId, int exitCode) - throws IOException { + public synchronized void storeContainerCompleted(ContainerId containerId, + int exitCode) throws IOException { RecoveredContainerState rcs = getRecoveredContainerState(containerId); rcs.status = RecoveredContainerStatus.COMPLETED; rcs.exitCode = exitCode; } @Override - public void removeContainer(ContainerId containerId) throws IOException { + public synchronized void removeContainer(ContainerId containerId) + throws IOException { containerStates.remove(containerId); } @@ -252,7 +257,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override - public RecoveredDeletionServiceState loadDeletionServiceState() + public synchronized RecoveredDeletionServiceState loadDeletionServiceState() throws IOException { RecoveredDeletionServiceState result = new RecoveredDeletionServiceState(); @@ -274,7 +279,8 @@ public class NMMemoryStateStoreService extends NMStateStoreService { @Override - public RecoveredNMTokensState loadNMTokensState() throws IOException { + public synchronized RecoveredNMTokensState loadNMTokensState() + throws IOException { // return a copy so caller can't modify our state RecoveredNMTokensState result = new RecoveredNMTokensState(); result.currentMasterKey = nmTokenState.currentMasterKey; @@ -286,36 +292,36 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeNMTokenCurrentMasterKey(MasterKey key) + public synchronized void storeNMTokenCurrentMasterKey(MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto()); } @Override - public void storeNMTokenPreviousMasterKey(MasterKey key) + public synchronized void storeNMTokenPreviousMasterKey(MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto()); } @Override - public void storeNMTokenApplicationMasterKey(ApplicationAttemptId attempt, - MasterKey key) throws IOException { + public synchronized void storeNMTokenApplicationMasterKey( + ApplicationAttemptId attempt, MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; nmTokenState.applicationMasterKeys.put(attempt, new MasterKeyPBImpl(keypb.getProto())); } @Override - public void removeNMTokenApplicationMasterKey(ApplicationAttemptId attempt) - throws IOException { + public synchronized void removeNMTokenApplicationMasterKey( + ApplicationAttemptId attempt) throws IOException { nmTokenState.applicationMasterKeys.remove(attempt); } @Override - public RecoveredContainerTokensState loadContainerTokensState() + public synchronized RecoveredContainerTokensState loadContainerTokensState() throws IOException { // return a copy so caller can't modify our state RecoveredContainerTokensState result = @@ -328,7 +334,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeContainerTokenCurrentMasterKey(MasterKey key) + public synchronized void storeContainerTokenCurrentMasterKey(MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; containerTokenState.currentMasterKey = @@ -336,7 +342,7 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeContainerTokenPreviousMasterKey(MasterKey key) + public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key) throws IOException { MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; containerTokenState.previousMasterKey = @@ -344,18 +350,41 @@ public class NMMemoryStateStoreService extends NMStateStoreService { } @Override - public void storeContainerToken(ContainerId containerId, + public synchronized void storeContainerToken(ContainerId containerId, Long expirationTime) throws IOException { containerTokenState.activeTokens.put(containerId, expirationTime); } @Override - public void removeContainerToken(ContainerId containerId) + public synchronized void removeContainerToken(ContainerId containerId) throws IOException { containerTokenState.activeTokens.remove(containerId); } + @Override + public synchronized RecoveredLogDeleterState loadLogDeleterState() + throws IOException { + RecoveredLogDeleterState state = new RecoveredLogDeleterState(); + state.logDeleterMap = new HashMap( + logDeleterState); + return state; + } + + @Override + public synchronized void storeLogDeleter(ApplicationId appId, + LogDeleterProto proto) + throws IOException { + logDeleterState.put(appId, proto); + } + + @Override + public synchronized void removeLogDeleter(ApplicationId appId) + throws IOException { + logDeleterState.remove(appId); + } + + private static class TrackerState { Map inProgressMap = new HashMap(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index f7f43cc5aa4..180442499c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; @@ -66,6 +67,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.server.records.Version; @@ -831,6 +833,55 @@ public class TestNMLeveldbStateStoreService { assertEquals(expTime3, loadedActiveTokens.get(cid3)); } + @Test + public void testLogDeleterStorage() throws IOException { + // test empty when no state + RecoveredLogDeleterState state = stateStore.loadLogDeleterState(); + assertTrue(state.getLogDeleterMap().isEmpty()); + + // store log deleter state + final ApplicationId appId1 = ApplicationId.newInstance(1, 1); + LogDeleterProto proto1 = LogDeleterProto.newBuilder() + .setUser("user1") + .setDeletionTime(1234) + .build(); + stateStore.storeLogDeleter(appId1, proto1); + + // restart state store and verify recovered + restartStateStore(); + state = stateStore.loadLogDeleterState(); + assertEquals(1, state.getLogDeleterMap().size()); + assertEquals(proto1, state.getLogDeleterMap().get(appId1)); + + // store another log deleter + final ApplicationId appId2 = ApplicationId.newInstance(2, 2); + LogDeleterProto proto2 = LogDeleterProto.newBuilder() + .setUser("user2") + .setDeletionTime(5678) + .build(); + stateStore.storeLogDeleter(appId2, proto2); + + // restart state store and verify recovered + restartStateStore(); + state = stateStore.loadLogDeleterState(); + assertEquals(2, state.getLogDeleterMap().size()); + assertEquals(proto1, state.getLogDeleterMap().get(appId1)); + assertEquals(proto2, state.getLogDeleterMap().get(appId2)); + + // remove a deleter and verify removed after restart and recovery + stateStore.removeLogDeleter(appId1); + restartStateStore(); + state = stateStore.loadLogDeleterState(); + assertEquals(1, state.getLogDeleterMap().size()); + assertEquals(proto2, state.getLogDeleterMap().get(appId2)); + + // remove last deleter and verify empty after restart and recovery + stateStore.removeLogDeleter(appId2); + restartStateStore(); + state = stateStore.loadLogDeleterState(); + assertTrue(state.getLogDeleterMap().isEmpty()); + } + private static class NMTokenSecretManagerForTest extends BaseNMTokenSecretManager { public MasterKey generateKey() {