diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 8c27474ecd3..06ad727b2be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -329,8 +329,248 @@ public class TestNMLeveldbStateStoreService { assertEquals(appProto1, apps.get(0)); } + + @Test + public void testContainerStorageWhenContainerIsRequested() + throws IOException { + final ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + final RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(0, rcs.getVersion()); + assertEquals(containerParams.getContainerStartTime().longValue(), + rcs.getStartTime()); + assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerParams.getContainerRequest(), rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerParams.getContainerResource(), rcs.getCapability()); + } + + + + @Test + public void testContainerStorageWhenContainerIsQueued() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + StartContainerRequest containerReq = containerParams.getContainerRequest(); + Resource containerResource = containerParams.getContainerResource(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + + stateStore.storeContainerQueued(containerId); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertTrue(rcs.getDiagnostics().isEmpty()); + assertEquals(containerResource, rcs.getCapability()); + } + + @Test + public void testContainerStorageWhenContainerIsLaunched() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + StartContainerRequest containerReq = containerParams.getContainerRequest(); + Resource containerResource = containerParams.getContainerResource(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + + StringBuilder diags = launchContainerWithDiagnostics(containerId); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + assertEquals(containerResource, rcs.getCapability()); + } + + @Test + public void testContainerStorageWhenContainerIsPaused() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + StartContainerRequest containerReq = containerParams.getContainerRequest(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + + stateStore.storeContainerPaused(containerId); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(containerReq, rcs.getStartRequest()); + + // Resume the container + stateStore.removeContainerPaused(containerId); + restartStateStore(); + recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + } + + @Test + public void testContainerStorageWhenContainerSizeIncreased() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + launchContainerWithDiagnostics(containerId); + + increaseContainerSize(containerId); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(0, rcs.getVersion()); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(Resource.newInstance(2468, 4), rcs.getCapability()); + } + + @Test + public void testContainerStorageWhenContainerMarkedAsKilled() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + StringBuilder diags = launchContainerWithDiagnostics(containerId); + ContainerTokenIdentifier updateTokenIdentifier = + increaseContainerSize(containerId); + + markContainerAsKilled(containerId, diags); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils + .newContainerTokenIdentifier(rcs.getStartRequest() + .getContainerToken()); + assertEquals(updateTokenIdentifier, tokenReadFromRequest); + assertEquals(diags.toString(), rcs.getDiagnostics()); + } + + @Test + public void testContainerStorageWhenContainerCompleted() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + ApplicationAttemptId appAttemptId = containerParams.getAppAttemptId(); + + storeNewContainerRecordWithoutStartContainerRequest(appAttemptId); + stateStore.storeContainerQueued(containerId); + StringBuilder diags = launchContainerWithDiagnostics(containerId); + markContainerAsKilled(containerId, diags); + + // add yet more diags, mark container completed + diags.append("some final diags"); + stateStore.storeContainerDiagnostics(containerId, diags); + stateStore.storeContainerCompleted(containerId, 21); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); + assertEquals(21, rcs.getExitCode()); + assertTrue(rcs.getKilled()); + assertEquals(diags.toString(), rcs.getDiagnostics()); + } + @Test public void testContainerStorage() throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + + // remaining retry attempts, work dir and log dir are stored + stateStore.storeContainerRemainingRetryAttempts(containerId, 6); + stateStore.storeContainerWorkDir(containerId, "/test/workdir"); + stateStore.storeContainerLogDir(containerId, "/test/logdir"); + restartStateStore(); + + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertEquals(1, recoveredContainers.size()); + RecoveredContainerState rcs = recoveredContainers.get(0); + assertEquals(6, rcs.getRemainingRetryAttempts()); + assertEquals("/test/workdir", rcs.getWorkDir()); + assertEquals("/test/logdir", rcs.getLogDir()); + validateRetryAttempts(containerId); + } + + @Test + public void testContainerStorageWhenContainerRemoved() + throws IOException { + ContainerStateConstructParams containerParams = + storeContainerInStateStore(); + ContainerId containerId = containerParams.getContainerId(); + + // remove the container and verify not recovered + stateStore.removeContainer(containerId); + restartStateStore(); + List recoveredContainers = + loadContainersState(stateStore.getContainerStateIterator()); + assertTrue(recoveredContainers.isEmpty()); + // recover again to check remove clears all containers + restartStateStore(); + NMStateStoreService nmStoreSpy = spy(stateStore); + loadContainersState(nmStoreSpy.getContainerStateIterator()); + verify(nmStoreSpy, times(0)).removeContainer(any(ContainerId.class)); + } + + private ContainerStateConstructParams storeContainerInStateStore() + throws IOException { // test empty when no state List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); @@ -345,162 +585,115 @@ public class TestNMLeveldbStateStoreService { StartContainerRequest containerReq = createContainerRequest(containerId, containerResource); - // store a container and verify recovered - long containerStartTime = System.currentTimeMillis(); - stateStore.storeContainer(containerId, 0, containerStartTime, containerReq); + long anyContainerStartTime = 1573155078494L; + stateStore.storeContainer(containerId, 0, anyContainerStartTime, + containerReq); // verify the container version key is not stored for new containers DB db = stateStore.getDB(); assertNull("version key present for new container", db.get(bytes( stateStore.getContainerVersionKey(containerId.toString())))); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - RecoveredContainerState rcs = recoveredContainers.get(0); - assertEquals(0, rcs.getVersion()); - assertEquals(containerStartTime, rcs.getStartTime()); - assertEquals(RecoveredContainerStatus.REQUESTED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - assertTrue(rcs.getDiagnostics().isEmpty()); - assertEquals(containerResource, rcs.getCapability()); + return new ContainerStateConstructParams() + .setContainerRequest(containerReq) + .setContainerResource(containerResource) + .setContainerStartTime(anyContainerStartTime) + .setAppAttemptId(appAttemptId) + .setContainerId(containerId); + } - // store a new container record without StartContainerRequest - ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); - stateStore.storeContainerLaunched(containerId1); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - // check whether the new container record is discarded - assertEquals(1, recoveredContainers.size()); + private static class ContainerStateConstructParams { + private StartContainerRequest containerRequest; + private Resource containerResource; + private Long containerStartTime; + private ApplicationAttemptId appAttemptId; + private ContainerId containerId; - // queue the container, and verify recovered - stateStore.storeContainerQueued(containerId); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - assertTrue(rcs.getDiagnostics().isEmpty()); - assertEquals(containerResource, rcs.getCapability()); + public ApplicationAttemptId getAppAttemptId() { + return appAttemptId; + } + public ContainerStateConstructParams setAppAttemptId(ApplicationAttemptId + theAppAttemptId) { + this.appAttemptId = theAppAttemptId; + return this; + } + public ContainerId getContainerId() { + return containerId; + } + public ContainerStateConstructParams setContainerId(ContainerId + theContainerId) { + this.containerId = theContainerId; + return this; + } - // launch the container, add some diagnostics, and verify recovered - StringBuilder diags = new StringBuilder(); - stateStore.storeContainerLaunched(containerId); - diags.append("some diags for container"); + public StartContainerRequest getContainerRequest() { + return containerRequest; + } + public ContainerStateConstructParams setContainerRequest( + StartContainerRequest theContainerRequest) { + this.containerRequest = theContainerRequest; + return this; + } + + public Resource getContainerResource() { + return containerResource; + } + + public ContainerStateConstructParams setContainerResource( + Resource theContainerResource) { + this.containerResource = theContainerResource; + return this; + } + + public Long getContainerStartTime() { + return containerStartTime; + } + + public ContainerStateConstructParams setContainerStartTime( + Long theContainerStartTime) { + this.containerStartTime = theContainerStartTime; + return this; + } + } + + private void markContainerAsKilled(ContainerId containerId, + StringBuilder diags) throws IOException { + // mark the container killed, add some more diags + diags.append("some more diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - assertEquals(diags.toString(), rcs.getDiagnostics()); - assertEquals(containerResource, rcs.getCapability()); + stateStore.storeContainerKilled(containerId); + } - // pause the container, and verify recovered - stateStore.storeContainerPaused(containerId); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - - // Resume the container - stateStore.removeContainerPaused(containerId); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - - // increase the container size, and verify recovered + private ContainerTokenIdentifier increaseContainerSize( + ContainerId containerId) throws IOException { ContainerTokenIdentifier updateTokenIdentifier = new ContainerTokenIdentifier(containerId, "host", "user", Resource.newInstance(2468, 4), 9876543210L, 42, 2468, Priority.newInstance(7), 13579); - stateStore .storeContainerUpdateToken(containerId, updateTokenIdentifier); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(0, rcs.getVersion()); - assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(Resource.newInstance(2468, 4), rcs.getCapability()); + return updateTokenIdentifier; + } - // mark the container killed, add some more diags, and verify recovered - diags.append("some more diags for container"); + private StringBuilder launchContainerWithDiagnostics(ContainerId containerId) + throws IOException { + StringBuilder diags = new StringBuilder(); + stateStore.storeContainerLaunched(containerId); + diags.append("some diags for container"); stateStore.storeContainerDiagnostics(containerId, diags); - stateStore.storeContainerKilled(containerId); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertTrue(rcs.getKilled()); - ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils - .newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken()); - assertEquals(updateTokenIdentifier, tokenReadFromRequest); - assertEquals(diags.toString(), rcs.getDiagnostics()); + return diags; + } - // add yet more diags, mark container completed, and verify recovered - diags.append("some final diags"); - stateStore.storeContainerDiagnostics(containerId, diags); - stateStore.storeContainerCompleted(containerId, 21); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); - assertEquals(21, rcs.getExitCode()); - assertTrue(rcs.getKilled()); - assertEquals(diags.toString(), rcs.getDiagnostics()); + private void storeNewContainerRecordWithoutStartContainerRequest( + ApplicationAttemptId appAttemptId) throws IOException { + // store a new container record without StartContainerRequest + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 6); + stateStore.storeContainerLaunched(containerId1); - // store remainingRetryAttempts, workDir and logDir - stateStore.storeContainerRemainingRetryAttempts(containerId, 6); - stateStore.storeContainerWorkDir(containerId, "/test/workdir"); - stateStore.storeContainerLogDir(containerId, "/test/logdir"); - restartStateStore(); - recoveredContainers = + List recoveredContainers = loadContainersState(stateStore.getContainerStateIterator()); + // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(6, rcs.getRemainingRetryAttempts()); - assertEquals("/test/workdir", rcs.getWorkDir()); - assertEquals("/test/logdir", rcs.getLogDir()); - - validateRetryAttempts(containerId); - // remove the container and verify not recovered - stateStore.removeContainer(containerId); - restartStateStore(); - recoveredContainers = - loadContainersState(stateStore.getContainerStateIterator()); - assertTrue(recoveredContainers.isEmpty()); - // recover again to check remove clears all containers - restartStateStore(); - NMStateStoreService nmStoreSpy = spy(stateStore); - loadContainersState(nmStoreSpy.getContainerStateIterator()); - verify(nmStoreSpy,times(0)).removeContainer(any(ContainerId.class)); } private void validateRetryAttempts(ContainerId containerId) @@ -524,11 +717,6 @@ public class TestNMLeveldbStateStoreService { return createContainerRequestInternal(containerId, res); } - private StartContainerRequest createContainerRequest( - ContainerId containerId) { - return createContainerRequestInternal(containerId, null); - } - private StartContainerRequest createContainerRequestInternal(ContainerId containerId, Resource res) { LocalResource lrsrc = LocalResource.newInstance( @@ -545,9 +733,9 @@ public class TestNMLeveldbStateStoreService { containerCmds.add("somearg"); Map serviceData = new HashMap(); serviceData.put("someservice", - ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3})); ByteBuffer containerTokens = - ByteBuffer.wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + ByteBuffer.wrap(new byte[] {0x7, 0x8, 0x9, 0xa}); Map acls = new HashMap(); acls.put(ApplicationAccessType.VIEW_APP, "viewuser"); @@ -676,7 +864,8 @@ public class TestNMLeveldbStateStoreService { } @Test - public void testStartResourceLocalization() throws IOException { + public void testStartResourceLocalizationForApplicationResource() + throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); @@ -730,10 +919,14 @@ public class TestNMLeveldbStateStoreService { assertEquals(1, startedResources.size()); assertEquals(appRsrcLocalPath, startedResources.get(appRsrcProto)); + } - // start some public and private resources + @Test + public void testStartResourceLocalizationForPublicResources() + throws IOException { Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(pubRsrcPath1), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 789L, 135L); @@ -750,23 +943,14 @@ public class TestNMLeveldbStateStoreService { Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); stateStore.startResourceLocalization(null, null, pubRsrcProto2, pubRsrcLocalPath2); - Path privRsrcPath = new Path("hdfs://some/private/resource"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( - URL.fromPath(privRsrcPath), - LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, - 789L, 680L, "*pattern*"); - LocalResourceProto privRsrcProto = rsrcPb.getProto(); - Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); - stateStore.startResourceLocalization(user, null, privRsrcProto, - privRsrcLocalPath); // restart and verify resources are marked in-progress restartStateStore(); - state = stateStore.loadLocalizationState(); - pubts = state.getPublicTrackerState(); - completedResources = loadCompletedResources( + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = loadCompletedResources( pubts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( + Map startedResources = loadStartedResources( pubts.getStartedResourcesIterator()); assertTrue(completedResources.isEmpty()); assertEquals(2, startedResources.size()); @@ -774,34 +958,49 @@ public class TestNMLeveldbStateStoreService { startedResources.get(pubRsrcProto1)); assertEquals(pubRsrcLocalPath2, startedResources.get(pubRsrcProto2)); - userResources = loadUserResources(state.getIterator()); + Map userResources = + loadUserResources(state.getIterator()); + assertEquals(0, userResources.size()); + } + + @Test + public void testStartResourceLocalizationForPrivateResource() + throws IOException { + Path privRsrcPath = new Path("hdfs://some/private/resource"); + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( + URL.fromPath(privRsrcPath), + LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, + 789L, 680L, "*pattern*"); + LocalResourceProto privRsrcProto = rsrcPb.getProto(); + Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); + String user = "somebody"; + stateStore.startResourceLocalization(user, null, privRsrcProto, + privRsrcLocalPath); + + // restart and verify resources are marked in-progress + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + Map userResources = + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); - rur = userResources.get(user); - privts = rur.getPrivateTrackerState(); + RecoveredUserResources rur = userResources.get(user); + LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); - completedResources = loadCompletedResources( + List completedResources = loadCompletedResources( privts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( + Map startedResources = loadStartedResources( privts.getStartedResourcesIterator()); assertTrue(completedResources.isEmpty()); assertEquals(1, startedResources.size()); assertEquals(privRsrcLocalPath, startedResources.get(privRsrcProto)); - assertEquals(1, rur.getAppTrackerStates().size()); - appts = rur.getAppTrackerStates().get(appId); - assertNotNull(appts); - completedResources = loadCompletedResources( - appts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( - appts.getStartedResourcesIterator()); - assertTrue(completedResources.isEmpty()); - assertEquals(1, startedResources.size()); - assertEquals(appRsrcLocalPath, - startedResources.get(appRsrcProto)); + assertEquals(0, rur.getAppTrackerStates().size()); } @Test - public void testFinishResourceLocalization() throws IOException { + public void testFinishResourceLocalizationForApplicationResource() + throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); @@ -862,10 +1061,14 @@ public class TestNMLeveldbStateStoreService { assertEquals(1, completedResources.size()); assertEquals(appLocalizedProto, completedResources.iterator().next()); + } - // start some public and private resources + @Test + public void testFinishResourceLocalizationForPublicResources() + throws IOException { Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(pubRsrcPath1), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 789L, 135L); @@ -882,15 +1085,6 @@ public class TestNMLeveldbStateStoreService { Path pubRsrcLocalPath2 = new Path("/some/local/dir/for/pubrsrc2"); stateStore.startResourceLocalization(null, null, pubRsrcProto2, pubRsrcLocalPath2); - Path privRsrcPath = new Path("hdfs://some/private/resource"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( - URL.fromPath(privRsrcPath), - LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, - 789L, 680L, "*pattern*"); - LocalResourceProto privRsrcProto = rsrcPb.getProto(); - Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); - stateStore.startResourceLocalization(user, null, privRsrcProto, - privRsrcLocalPath); // finish some of the resources LocalizedResourceProto pubLocalizedProto1 = @@ -900,6 +1094,43 @@ public class TestNMLeveldbStateStoreService { .setSize(pubRsrcProto1.getSize()) .build(); stateStore.finishResourceLocalization(null, null, pubLocalizedProto1); + + // restart and verify state + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = loadCompletedResources( + pubts.getCompletedResourcesIterator()); + Map startedResources = loadStartedResources( + pubts.getStartedResourcesIterator()); + assertEquals(1, completedResources.size()); + assertEquals(pubLocalizedProto1, + completedResources.iterator().next()); + assertEquals(1, startedResources.size()); + assertEquals(pubRsrcLocalPath2, + startedResources.get(pubRsrcProto2)); + Map userResources = + loadUserResources(state.getIterator()); + assertEquals(0, userResources.size()); + } + + @Test + public void testFinishResourceLocalizationForPrivateResource() + throws IOException { + String user = "somebody"; + ApplicationId appId = ApplicationId.newInstance(1, 1); + + Path privRsrcPath = new Path("hdfs://some/private/resource"); + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( + URL.fromPath(privRsrcPath), + LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, + 789L, 680L, "*pattern*"); + LocalResourceProto privRsrcProto = rsrcPb.getProto(); + Path privRsrcLocalPath = new Path("/some/local/dir/for/privrsrc"); + stateStore.startResourceLocalization(user, null, privRsrcProto, + privRsrcLocalPath); + LocalizedResourceProto privLocalizedProto = LocalizedResourceProto.newBuilder() .setResource(privRsrcProto) @@ -910,22 +1141,19 @@ public class TestNMLeveldbStateStoreService { // restart and verify state restartStateStore(); - state = stateStore.loadLocalizationState(); - pubts = state.getPublicTrackerState(); - completedResources = loadCompletedResources( + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = loadCompletedResources( pubts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( + Map startedResources = loadStartedResources( pubts.getStartedResourcesIterator()); - assertEquals(1, completedResources.size()); - assertEquals(pubLocalizedProto1, - completedResources.iterator().next()); - assertEquals(1, startedResources.size()); - assertEquals(pubRsrcLocalPath2, - startedResources.get(pubRsrcProto2)); - userResources = loadUserResources(state.getIterator()); + assertEquals(0, completedResources.size()); + assertEquals(0, startedResources.size()); + Map userResources = + loadUserResources(state.getIterator()); assertEquals(1, userResources.size()); - rur = userResources.get(user); - privts = rur.getPrivateTrackerState(); + RecoveredUserResources rur = userResources.get(user); + LocalResourceTrackerState privts = rur.getPrivateTrackerState(); assertNotNull(privts); completedResources = loadCompletedResources( privts.getCompletedResourcesIterator()); @@ -935,21 +1163,16 @@ public class TestNMLeveldbStateStoreService { assertEquals(privLocalizedProto, completedResources.iterator().next()); assertTrue(startedResources.isEmpty()); - assertEquals(1, rur.getAppTrackerStates().size()); - appts = rur.getAppTrackerStates().get(appId); - assertNotNull(appts); - completedResources = loadCompletedResources( - appts.getCompletedResourcesIterator()); - startedResources = loadStartedResources( - appts.getStartedResourcesIterator()); + assertEquals(0, rur.getAppTrackerStates().size()); + LocalResourceTrackerState appts = rur.getAppTrackerStates().get(appId); + assertNull(appts); assertTrue(startedResources.isEmpty()); assertEquals(1, completedResources.size()); - assertEquals(appLocalizedProto, - completedResources.iterator().next()); } @Test - public void testRemoveLocalizedResource() throws IOException { + public void testRemoveLocalizedResourceForApplicationResource() + throws IOException { String user = "somebody"; ApplicationId appId = ApplicationId.newInstance(1, 1); @@ -983,10 +1206,15 @@ public class TestNMLeveldbStateStoreService { restartStateStore(); verifyEmptyState(); + } - // add public and private resources and remove some + @Test + public void testRemoveLocalizedResourceForPublicResources() + throws IOException { + // add public resources and remove some Path pubRsrcPath1 = new Path("hdfs://some/public/resource1"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(pubRsrcPath1), LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 789L, 135L); @@ -1018,8 +1246,32 @@ public class TestNMLeveldbStateStoreService { .build(); stateStore.finishResourceLocalization(null, null, pubLocalizedProto2); stateStore.removeLocalizedResource(null, null, pubRsrcLocalPath2); + + // restart and verify state + restartStateStore(); + RecoveredLocalizationState state = stateStore.loadLocalizationState(); + LocalResourceTrackerState pubts = state.getPublicTrackerState(); + List completedResources = + loadCompletedResources(pubts.getCompletedResourcesIterator()); + Map startedResources = + loadStartedResources(pubts.getStartedResourcesIterator()); + assertTrue(startedResources.isEmpty()); + assertEquals(1, completedResources.size()); + assertEquals(pubLocalizedProto1, + completedResources.iterator().next()); + Map userResources = + loadUserResources(state.getIterator()); + assertTrue(userResources.isEmpty()); + } + + @Test + public void testRemoveLocalizedResourceForPrivateResource() + throws IOException { + String user = "somebody"; + Path privRsrcPath = new Path("hdfs://some/private/resource"); - rsrcPb = (LocalResourcePBImpl) LocalResource.newInstance( + LocalResourcePBImpl rsrcPb = (LocalResourcePBImpl) LocalResource + .newInstance( URL.fromPath(privRsrcPath), LocalResourceType.PATTERN, LocalResourceVisibility.PRIVATE, 789L, 680L, "*pattern*"); @@ -1038,9 +1290,7 @@ public class TestNMLeveldbStateStoreService { Map startedResources = loadStartedResources(pubts.getStartedResourcesIterator()); assertTrue(startedResources.isEmpty()); - assertEquals(1, completedResources.size()); - assertEquals(pubLocalizedProto1, - completedResources.iterator().next()); + assertEquals(0, completedResources.size()); Map userResources = loadUserResources(state.getIterator()); assertTrue(userResources.isEmpty()); @@ -1574,9 +1824,9 @@ public class TestNMLeveldbStateStoreService { containerCmds.add("somearg"); Map serviceData = new HashMap(); serviceData.put("someservice", - ByteBuffer.wrap(new byte[] { 0x1, 0x2, 0x3 })); + ByteBuffer.wrap(new byte[] {0x1, 0x2, 0x3})); ByteBuffer containerTokens = ByteBuffer - .wrap(new byte[] { 0x7, 0x8, 0x9, 0xa }); + .wrap(new byte[] {0x7, 0x8, 0x9, 0xa}); Map acls = new HashMap(); acls.put(ApplicationAccessType.VIEW_APP, "viewuser");