diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 78b389a0ff3..64c36bcbffc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -177,21 +177,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -205,7 +203,7 @@ public class TestRMContainerAllocator { MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -220,7 +218,7 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); @@ -232,7 +230,7 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests // as nodes are not added, no allocations assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size()); @@ -240,18 +238,18 @@ public class TestRMContainerAllocator { nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size()); checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, assigned, false); // check that the assigned container requests are cancelled - assigned = allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); + allocator.schedule(); + rm.drainEvents(); + Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size()); } @Test @@ -267,21 +265,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -295,7 +291,7 @@ public class TestRMContainerAllocator { MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map - dispatcher.await(); + rm.drainEvents(); // create the container requests for maps ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -311,7 +307,7 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // update resources in scheduler @@ -321,10 +317,10 @@ public class TestRMContainerAllocator { // Node heartbeat from node-local next. This allocates 2 node local // containers for task1 and task2. These should be matched with those tasks. nodeManager1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 }, assigned, false); // remove the rack-local assignment that should have happened for task3 @@ -348,21 +344,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -376,7 +370,7 @@ public class TestRMContainerAllocator { MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -391,17 +385,17 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); checkAssignments(new ContainerRequestEvent[] { event1, event2 }, assigned, false); } @@ -414,19 +408,17 @@ public class TestRMContainerAllocator { conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - final DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); final RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); final String host = "host1"; final MockNM nm = rm.registerNode(String.format("%s:1234", host), 2048); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); final JobId jobId = MRBuilderUtils .newJobId(appAttemptId.getApplicationId(), 0); final Job mockJob = mock(Job.class); @@ -436,20 +428,20 @@ public class TestRMContainerAllocator { final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, appAttemptId, mockJob, SystemClock.getInstance()); // add resources to scheduler - dispatcher.await(); + rm.drainEvents(); // create the container request final String[] locations = new String[] { host }; allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); for (int i = 0; i < 1;) { - dispatcher.await(); + rm.drainEvents(); i += allocator.schedule().size(); nm.nodeHeartbeat(true); } allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); while (allocator.getTaskAttemptKillEvents().size() == 0) { - dispatcher.await(); + rm.drainEvents(); allocator.schedule().size(); nm.nodeHeartbeat(true); } @@ -466,21 +458,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -519,21 +509,19 @@ public class TestRMContainerAllocator { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -582,21 +570,19 @@ public class TestRMContainerAllocator { MyResourceManager rm = new MyResourceManager(conf); rm.start(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(8192, 8)); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -637,18 +623,16 @@ public class TestRMContainerAllocator { conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.0f); final MyResourceManager2 rm = new MyResourceManager2(conf); rm.start(); - final DrainDispatcher dispatcher = (DrainDispatcher)rm.getRMContext() - .getDispatcher(); final RMApp app = rm.submitApp(2048); - dispatcher.await(); + rm.drainEvents(); final String host = "host1"; final MockNM nm = rm.registerNode(String.format("%s:1234", host), 4096); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); final JobId jobId = MRBuilderUtils .newJobId(appAttemptId.getApplicationId(), 0); final Job mockJob = mock(Job.class); @@ -664,14 +648,14 @@ public class TestRMContainerAllocator { allocator.scheduleAllReduces(); allocator.makeRemoteRequest(); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); int assignedContainer; for (assignedContainer = 0; assignedContainer < 1;) { assignedContainer += allocator.schedule().size(); nm.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); } // only 1 allocated container should be assigned Assert.assertEquals(assignedContainer, 1); @@ -771,21 +755,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -799,7 +781,7 @@ public class TestRMContainerAllocator { MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request // send MAP request @@ -820,17 +802,17 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); checkAssignments(new ContainerRequestEvent[] { event1, event3 }, assigned, false); @@ -861,11 +843,6 @@ public class TestRMContainerAllocator { MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp); } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - @Override protected EventHandler createSchedulerEventDispatcher() { // Dispatch inline for test sanity @@ -910,16 +887,16 @@ public class TestRMContainerAllocator { // Submit the application RMApp rmApp = rm.submitApp(1024); - rmDispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 21504); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - rmDispatcher.await(); + rm.drainEvents(); MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { @@ -957,11 +934,11 @@ public class TestRMContainerAllocator { amDispatcher.await(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Wait for all map-tasks to be running for (Task t : job.getTasks().values()) { @@ -971,7 +948,7 @@ public class TestRMContainerAllocator { } allocator.schedule(); // Send heartbeat - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.05f, job.getProgress(), 0.001f); Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); @@ -979,14 +956,14 @@ public class TestRMContainerAllocator { Iterator it = job.getTasks().values().iterator(); finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.095f, job.getProgress(), 0.001f); Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); // Finish off 7 more so that map-progress is 80% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 7); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.41f, job.getProgress(), 0.001f); Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); @@ -994,11 +971,11 @@ public class TestRMContainerAllocator { finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Wait for all reduce-tasks to be running for (Task t : job.getTasks().values()) { @@ -1011,14 +988,14 @@ public class TestRMContainerAllocator { finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 2); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.59f, job.getProgress(), 0.001f); Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off the remaining 8 reduces. finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 8); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Remaining is JobCleanup Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); @@ -1062,16 +1039,16 @@ public class TestRMContainerAllocator { // Submit the application RMApp rmApp = rm.submitApp(1024); - rmDispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 11264); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - rmDispatcher.await(); + rm.drainEvents(); MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId( appAttemptId, 0), 10, 0, false, this.getClass().getName(), true, 1) { @@ -1107,11 +1084,11 @@ public class TestRMContainerAllocator { amDispatcher.await(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); // Wait for all map-tasks to be running for (Task t : job.getTasks().values()) { @@ -1119,7 +1096,7 @@ public class TestRMContainerAllocator { } allocator.schedule(); // Send heartbeat - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.05f, job.getProgress(), 0.001f); Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); @@ -1128,21 +1105,21 @@ public class TestRMContainerAllocator { // Finish off 1 map so that map-progress is 10% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 1); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.14f, job.getProgress(), 0.001f); Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); // Finish off 5 more map so that map-progress is 60% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 5); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.59f, job.getProgress(), 0.001f); Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); // Finish off remaining map so that map-progress is 100% finishNextNTasks(rmDispatcher, amNodeManager, mrApp, it, 4); allocator.schedule(); - rmDispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0.95f, job.getProgress(), 0.001f); Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } @@ -1152,21 +1129,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); - + rm.drainEvents(); + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, @@ -1175,7 +1150,7 @@ public class TestRMContainerAllocator { // add resources to scheduler MockNM nm1 = rm.registerNode("h1:1234", 10240); MockNM nm2 = rm.registerNode("h2:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the map container request ContainerRequestEvent event = createReq(jobId, 1, 1024, @@ -1191,16 +1166,16 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); Assert.assertEquals(3, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); allocator.getJobUpdatedNodeEvents().clear(); // get the assignment assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(1, assigned.size()); Assert.assertEquals(nm1.getNodeId(), assigned.get(0).getContainer().getNodeId()); // no updated nodes reported @@ -1210,11 +1185,11 @@ public class TestRMContainerAllocator { // mark nodes bad nm1.nodeHeartbeat(false); nm2.nodeHeartbeat(false); - dispatcher.await(); - + rm.drainEvents(); + // schedule response returns updated nodes assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0, assigned.size()); // updated nodes are reported Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); @@ -1225,7 +1200,7 @@ public class TestRMContainerAllocator { allocator.getTaskAttemptKillEvents().clear(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals(0, assigned.size()); // no updated nodes reported Assert.assertTrue(allocator.getJobUpdatedNodeEvents().isEmpty()); @@ -1245,21 +1220,19 @@ public class TestRMContainerAllocator { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -1273,7 +1246,7 @@ public class TestRMContainerAllocator { MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager2 = rm.registerNode("h2:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); // create the container request ContainerRequestEvent event1 = createReq(jobId, 1, 1024, @@ -1293,7 +1266,7 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Send events to blacklist nodes h1 and h2 @@ -1305,28 +1278,28 @@ public class TestRMContainerAllocator { // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat nodeManager2.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); assertBlacklistAdditionsAndRemovals(2, 0, rm); // mark h1/h2 as bad nodes nodeManager1.nodeHeartbeat(false); nodeManager2.nodeHeartbeat(false); - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertTrue("No of assignments must be 3", assigned.size() == 3); @@ -1350,24 +1323,22 @@ public class TestRMContainerAllocator { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM[] nodeManagers = new MockNM[10]; int nmNum = 0; List assigned = null; - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); nodeManagers[0].nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -1380,7 +1351,7 @@ public class TestRMContainerAllocator { // Known=1, blacklisted=0, ignore should be false - assign first container assigned = getContainerOnHost(jobId, 1, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); LOG.info("Failing container _1 on H1 (Node should be blacklisted and" @@ -1395,47 +1366,47 @@ public class TestRMContainerAllocator { // The current call will send blacklisted node "h1" to RM assigned = getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 1, 0, 0, 1, rm); + nodeManagers[0], allocator, 1, 0, 0, 1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=1, blacklisted=1, ignore should be true - assign 1 assigned = getContainerOnHost(jobId, 2, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=2, blacklisted=1, ignore should be true - assign 1 anyway. assigned = getContainerOnHost(jobId, 3, 1024, new String[] { "h2" }, - nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=3, blacklisted=1, ignore should be true - assign 1 anyway. assigned = getContainerOnHost(jobId, 4, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Known=3, blacklisted=1, ignore should be true - assign 1 assigned = getContainerOnHost(jobId, 5, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=4, blacklisted=1, ignore should be false - assign 1 anyway assigned = getContainerOnHost(jobId, 6, 1024, new String[] { "h4" }, - nodeManagers[3], dispatcher, allocator, 0, 0, 1, 0, rm); + nodeManagers[3], allocator, 0, 0, 1, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklisting re-enabled. // Known=4, blacklisted=1, ignore should be false - no assignment on h1 assigned = getContainerOnHost(jobId, 7, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // RMContainerRequestor would have created a replacement request. @@ -1448,61 +1419,61 @@ public class TestRMContainerAllocator { // container for the same reason above. assigned = getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 1, 0, 0, 2, rm); + nodeManagers[0], allocator, 1, 0, 0, 2, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Known=4, blacklisted=2, ignore should be true. Should assign 2 // containers. assigned = getContainerOnHost(jobId, 8, 1024, new String[] { "h1" }, - nodeManagers[0], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[0], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 2", 2, assigned.size()); // Known=4, blacklisted=2, ignore should be true. assigned = getContainerOnHost(jobId, 9, 1024, new String[] { "h2" }, - nodeManagers[1], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[1], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Test blacklist while ignore blacklisting enabled ContainerFailedEvent f3 = createFailEvent(jobId, 4, "h3", false); allocator.sendFailure(f3); - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); // Known=5, blacklisted=3, ignore should be true. assigned = getContainerOnHost(jobId, 10, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); // Assign on 5 more nodes - to re-enable blacklisting for (int i = 0; i < 5; i++) { - nodeManagers[nmNum] = registerNodeManager(nmNum++, rm, dispatcher); + nodeManagers[nmNum] = registerNodeManager(nmNum++, rm); assigned = getContainerOnHost(jobId, 11 + i, 1024, new String[] { String.valueOf(5 + i) }, nodeManagers[4 + i], - dispatcher, allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); + allocator, 0, 0, (i == 4 ? 3 : 0), 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); } // Test h3 (blacklisted while ignoring blacklisting) is blacklisted. assigned = getContainerOnHost(jobId, 20, 1024, new String[] { "h3" }, - nodeManagers[2], dispatcher, allocator, 0, 0, 0, 0, rm); + nodeManagers[2], allocator, 0, 0, 0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); } - private MockNM registerNodeManager(int i, MyResourceManager rm, - DrainDispatcher dispatcher) throws Exception { + private MockNM registerNodeManager(int i, MyResourceManager rm) + throws Exception { MockNM nm = rm.registerNode("h" + (i + 1) + ":1234", 10240); - dispatcher.await(); + rm.drainEvents(); return nm; } private List getContainerOnHost(JobId jobId, int taskAttemptId, int memory, String[] hosts, MockNM mockNM, - DrainDispatcher dispatcher, MyContainerAllocator allocator, + MyContainerAllocator allocator, int expectedAdditions1, int expectedRemovals1, int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) throws Exception { @@ -1512,17 +1483,17 @@ public class TestRMContainerAllocator { // Send the request to the RM List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals( expectedAdditions1, expectedRemovals1, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); // Heartbeat from the required nodeManager mockNM.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals( expectedAdditions2, expectedRemovals2, rm); return assigned; @@ -1540,21 +1511,19 @@ public class TestRMContainerAllocator { MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -1567,7 +1536,7 @@ public class TestRMContainerAllocator { // add resources to scheduler MockNM nodeManager1 = rm.registerNode("h1:1234", 10240); MockNM nodeManager3 = rm.registerNode("h3:1234", 10240); - dispatcher.await(); + rm.drainEvents(); LOG.info("Requesting 1 Containers _1 on H1"); // create the container request @@ -1579,17 +1548,17 @@ public class TestRMContainerAllocator { // this tells the scheduler about the requests // as nodes are not added, no allocations List assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); LOG.info("h1 Heartbeat (To actually schedule the containers)"); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 1", 1, assigned.size()); @@ -1606,7 +1575,7 @@ public class TestRMContainerAllocator { //Update the Scheduler with the new requests. assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(1, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); @@ -1621,11 +1590,11 @@ public class TestRMContainerAllocator { LOG.info("h1 Heartbeat (To actually schedule the containers)"); // update resources in scheduler nodeManager1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); LOG.info("RM Heartbeat (To process the scheduled containers)"); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); @@ -1634,19 +1603,19 @@ public class TestRMContainerAllocator { //Send a release for the p:5 container + another request. LOG.info("RM Heartbeat (To process the re-scheduled containers)"); assigned = allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); assertBlacklistAdditionsAndRemovals(0, 0, rm); Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); //Hearbeat from H3 to schedule on this host. LOG.info("h3 Heartbeat (To re-schedule the containers)"); nodeManager3.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm.drainEvents(); LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)"); assigned = allocator.schedule(); assertBlacklistAdditionsAndRemovals(0, 0, rm); - dispatcher.await(); + rm.drainEvents(); // For debugging for (TaskAttemptContainerAssignedEvent assig : assigned) { @@ -2225,22 +2194,20 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); // Make a node to register so as to launch the AM. MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job job = mock(Job.class); @@ -2375,21 +2342,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher rmDispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp rmApp = rm.submitApp(1024); - rmDispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264); amNodeManager.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - rmDispatcher.await(); + rm.drainEvents(); MRApp mrApp = new MRApp(appAttemptId, ContainerId.newContainerId(appAttemptId, 0), 10, @@ -2448,22 +2413,20 @@ public class TestRMContainerAllocator { MyResourceManager rm1 = new MyResourceManager(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -2492,7 +2455,7 @@ public class TestRMContainerAllocator { // send allocate request and 1 blacklisted nodes List assignedContainers = allocator.schedule(); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, assignedContainers.size()); // Why ask is 3, not 4? --> ask from blacklisted node h2 is removed @@ -2500,11 +2463,11 @@ public class TestRMContainerAllocator { assertBlacklistAdditionsAndRemovals(1, 0, rm1); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); // Step-2 : 2 containers are allocated by RM. assignedContainers = allocator.schedule(); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 2", 2, assignedContainers.size()); assertAsksAndReleases(0, 0, rm1); @@ -2539,7 +2502,6 @@ public class TestRMContainerAllocator { rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); allocator.updateSchedulerProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -2549,7 +2511,7 @@ public class TestRMContainerAllocator { nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); // Step-4 : On RM restart, AM(does not know RM is restarted) sends // additional containerRequest(event4) and blacklisted nodes. @@ -2570,7 +2532,7 @@ public class TestRMContainerAllocator { // send allocate request to 2nd RM and get resync command allocator.schedule(); - dispatcher.await(); + rm2.drainEvents(); // Step-5 : On Resync,AM sends all outstanding // asks,release,blacklistAaddition @@ -2581,16 +2543,16 @@ public class TestRMContainerAllocator { // send all outstanding request again. assignedContainers = allocator.schedule(); - dispatcher.await(); + rm2.drainEvents(); assertAsksAndReleases(3, 2, rm2); assertBlacklistAdditionsAndRemovals(2, 0, rm2); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); // Step-6 : RM allocates containers i.e event3,event4 and cRequest5 assignedContainers = allocator.schedule(); - dispatcher.await(); + rm2.drainEvents(); Assert.assertEquals("Number of container should be 3", 3, assignedContainers.size()); @@ -2693,20 +2655,19 @@ public class TestRMContainerAllocator { MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0); MyResourceManager rm1 = new MyResourceManager(conf); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); + RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -2722,7 +2683,7 @@ public class TestRMContainerAllocator { } catch (RMContainerAllocationException e) { Assert.assertTrue(e.getMessage().contains("Could not contact RM after")); } - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("Should Have 1 Job Event", 1, allocator.jobEvents.size()); JobEvent event = allocator.jobEvents.get(0); @@ -2744,22 +2705,20 @@ public class TestRMContainerAllocator { rm.start(); AMRMTokenSecretManager secretMgr = rm.getRMContext().getAMRMTokenSecretManager(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); final ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); final ApplicationId appId = app.getApplicationId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); final Job mockJob = mock(Job.class); @@ -2947,21 +2906,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -2983,21 +2940,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -3012,7 +2967,7 @@ public class TestRMContainerAllocator { // Register nodes to RM. MockNM nodeManager = rm.registerNode("h1:1234", 1024); - dispatcher.await(); + rm.drainEvents(); // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = @@ -3028,7 +2983,7 @@ public class TestRMContainerAllocator { // This will tell the scheduler about the requests but there will be no // allocations as nodes are not added. allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Advance clock so that maps can be considered as hanging. clock.setTime(System.currentTimeMillis() + 500000L); @@ -3039,15 +2994,15 @@ public class TestRMContainerAllocator { allocator.sendRequest(event4); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Update resources in scheduler through node heartbeat from h1. nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // One map is assigned. Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); @@ -3081,7 +3036,7 @@ public class TestRMContainerAllocator { // On next allocate request to scheduler, headroom reported will be 0. rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(0, 0)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // After allocate response from scheduler, all scheduled reduces are ramped // down and move to pending. 3 asks are also updated with 0 containers to // indicate ramping down of reduces to scheduler. @@ -3148,21 +3103,19 @@ public class TestRMContainerAllocator { Configuration conf = new Configuration(); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -3177,7 +3130,7 @@ public class TestRMContainerAllocator { // Register nodes to RM. MockNM nodeManager = rm.registerNode("h1:1234", 1024); - dispatcher.await(); + rm.drainEvents(); // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = @@ -3193,7 +3146,7 @@ public class TestRMContainerAllocator { // This will tell the scheduler about the requests but there will be no // allocations as nodes are not added. allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Advance clock so that maps can be considered as hanging. clock.setTime(System.currentTimeMillis() + 500000L); @@ -3204,15 +3157,15 @@ public class TestRMContainerAllocator { allocator.sendRequest(event4); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Update resources in scheduler through node heartbeat from h1. nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // One map is assigned. Assert.assertEquals(1, allocator.getAssignedRequests().maps.size()); @@ -3249,7 +3202,7 @@ public class TestRMContainerAllocator { // On next allocate request to scheduler, headroom reported will be 2048. rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 0)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // After allocate response from scheduler, all scheduled reduces are ramped // down and move to pending. 3 asks are also updated with 0 containers to // indicate ramping down of reduces to scheduler. @@ -3278,21 +3231,19 @@ public class TestRMContainerAllocator { conf.setInt(MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC, -1); MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm.getRMContext().getDispatcher(); // Submit the application RMApp app = rm.submitApp(1024); - dispatcher.await(); + rm.drainEvents(); MockNM amNodeManager = rm.registerNode("amNM:1234", 1260); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm.drainEvents(); JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); Job mockJob = mock(Job.class); @@ -3308,10 +3259,10 @@ public class TestRMContainerAllocator { appAttemptId, mockJob); MockNM nodeManager = rm.registerNode("h1:1234", 4096); - dispatcher.await(); + rm.drainEvents(); // Register nodes to RM. MockNM nodeManager2 = rm.registerNode("h2:1234", 1024); - dispatcher.await(); + rm.drainEvents(); // Request 2 maps and 1 reducer(sone on nodes which are not registered). ContainerRequestEvent event1 = @@ -3327,7 +3278,7 @@ public class TestRMContainerAllocator { // This will tell the scheduler about the requests but there will be no // allocations as nodes are not added. allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Request for another reducer on h3 which has not registered. ContainerRequestEvent event4 = @@ -3335,15 +3286,15 @@ public class TestRMContainerAllocator { allocator.sendRequest(event4); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Update resources in scheduler through node heartbeat from h1. nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(3072, 3)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // Two maps are assigned. Assert.assertEquals(2, allocator.getAssignedRequests().maps.size()); @@ -3356,15 +3307,15 @@ public class TestRMContainerAllocator { Assert.assertEquals(0, allocator.getAssignedRequests().maps.size()); nodeManager.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1024, 1)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // h2 heartbeats. nodeManager2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Send request for one more mapper. ContainerRequestEvent event5 = @@ -3373,7 +3324,7 @@ public class TestRMContainerAllocator { rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // One reducer is assigned and one map is scheduled Assert.assertEquals(1, allocator.getScheduledRequests().maps.size()); Assert.assertEquals(1, allocator.getAssignedRequests().reduces.size()); @@ -3381,7 +3332,7 @@ public class TestRMContainerAllocator { // enough if scheduled reducers resources are deducted. rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(1260, 2)); allocator.schedule(); - dispatcher.await(); + rm.drainEvents(); // After allocate response, the one assigned reducer is preempted and killed Assert.assertEquals(1, MyContainerAllocator.getTaskAttemptKillEvents().size()); Assert.assertEquals(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index ac774469416..073b93176d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -125,22 +123,20 @@ public class TestAMRMClientOnRMRestart { // Phase-1 Start 1st RM MyResourceManager rm1 = new MyResourceManager(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); org.apache.hadoop.security.token.Token token = rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) @@ -175,7 +171,7 @@ public class TestAMRMClientOnRMRestart { blacklistAdditions.remove("h2");// remove from local list AllocateResponse allocateResponse = amClient.allocate(0.1f); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); @@ -188,10 +184,10 @@ public class TestAMRMClientOnRMRestart { // Step-2 : NM heart beat is sent. // On 2nd AM allocate request, RM allocates 3 containers to AM nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); allocateResponse = amClient.allocate(0.2f); - dispatcher.await(); + rm1.drainEvents(); // 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3. Assert.assertEquals("No of assignments must be 0", 3, allocateResponse .getAllocatedContainers().size()); @@ -206,7 +202,7 @@ public class TestAMRMClientOnRMRestart { amClient.removeContainerRequest(cRequest3); allocateResponse = amClient.allocate(0.2f); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); assertAsksAndReleases(4, 0, rm1); @@ -232,13 +228,13 @@ public class TestAMRMClientOnRMRestart { // request nm1.nodeHeartbeat(containerId.getApplicationAttemptId(), containerId.getContainerId(), ContainerState.RUNNING); - dispatcher.await(); + rm1.drainEvents(); amClient.requestContainerResourceChange( container, Resource.newInstance(2048, 1)); it.remove(); allocateResponse = amClient.allocate(0.3f); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); assertAsksAndReleases(3, pendingRelease, rm1); @@ -254,7 +250,6 @@ public class TestAMRMClientOnRMRestart { rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -270,7 +265,7 @@ public class TestAMRMClientOnRMRestart { Collections.singletonList( containerId.getApplicationAttemptId().getApplicationId())); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); blacklistAdditions.add("h3"); amClient.updateBlacklist(blacklistAdditions, null); @@ -292,7 +287,7 @@ public class TestAMRMClientOnRMRestart { // containerRequest and blacklisted nodes. // Intern RM send resync command,AMRMClient resend allocate request allocateResponse = amClient.allocate(0.3f); - dispatcher.await(); + rm2.drainEvents(); completedContainer = allocateResponse.getCompletedContainersStatuses().size(); @@ -309,7 +304,7 @@ public class TestAMRMClientOnRMRestart { // Step-5 : Allocater after resync command allocateResponse = amClient.allocate(0.5f); - dispatcher.await(); + rm2.drainEvents(); Assert.assertEquals("No of assignments must be 0", 0, allocateResponse .getAllocatedContainers().size()); @@ -322,10 +317,10 @@ public class TestAMRMClientOnRMRestart { int count = 5; while (count-- > 0) { nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); allocateResponse = amClient.allocate(0.5f); - dispatcher.await(); + rm2.drainEvents(); noAssignedContainer += allocateResponse.getAllocatedContainers().size(); if (noAssignedContainer == 3) { break; @@ -354,22 +349,20 @@ public class TestAMRMClientOnRMRestart { // Phase-1 Start 1st RM MyResourceManager rm1 = new MyResourceManager(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); org.apache.hadoop.security.token.Token token = rm1.getRMContext().getRMApps().get(appAttemptId.getApplicationId()) @@ -389,7 +382,6 @@ public class TestAMRMClientOnRMRestart { rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -405,7 +397,7 @@ public class TestAMRMClientOnRMRestart { Priority.newInstance(0), 0); nm1.registerNode(Arrays.asList(containerReport), null); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm2.drainEvents(); amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null, null); @@ -417,7 +409,6 @@ public class TestAMRMClientOnRMRestart { amClient.stop(); rm1.stop(); rm2.stop(); - } @@ -435,22 +426,20 @@ public class TestAMRMClientOnRMRestart { // start first RM MyResourceManager2 rm1 = new MyResourceManager2(conf, memStore); rm1.start(); - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); Long startTime = System.currentTimeMillis(); // Submit the application RMApp app = rm1.submitApp(1024); - dispatcher.await(); + rm1.drainEvents(); MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); nm1.nodeHeartbeat(true); // Node heartbeat - dispatcher.await(); + rm1.drainEvents(); ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt().getAppAttemptId(); rm1.sendAMLaunched(appAttemptId); - dispatcher.await(); + rm1.drainEvents(); AMRMTokenSecretManager amrmTokenSecretManagerForRM1 = rm1.getRMContext().getAMRMTokenSecretManager(); @@ -509,7 +498,6 @@ public class TestAMRMClientOnRMRestart { rm2.start(); nm1.setResourceTrackerService(rm2.getResourceTrackerService()); ((MyAMRMClientImpl) amClient).updateRMProxy(rm2); - dispatcher = (DrainDispatcher) rm2.getRMContext().getDispatcher(); AMRMTokenSecretManager amrmTokenSecretManagerForRM2 = rm2.getRMContext().getAMRMTokenSecretManager(); @@ -611,11 +599,6 @@ public class TestAMRMClientOnRMRestart { MyResourceManager.setClusterTimeStamp(fakeClusterTimeStamp); } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - @Override protected EventHandler createSchedulerEventDispatcher() { // Dispatch inline for test sanity diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java index fbd5ac38dc0..100eb7f21a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ACLsTestBase.java @@ -30,14 +30,9 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.junit.Before; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public abstract class ACLsTestBase { protected static final String COMMON_USER = "common_user"; @@ -80,11 +75,6 @@ public abstract class ACLsTestBase { .getRMDelegationTokenSecretManager()); } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - @Override protected void doSecureLogin() throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index c9ce7d7a061..c95bcdfca97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -26,22 +26,17 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -108,20 +103,9 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{ } protected void startRMs() throws IOException { - rm1 = new MockRM(confForRM1, null, false, false){ - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - }; - rm2 = new MockRM(confForRM2, null, false, false){ - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - }; + rm1 = new MockRM(confForRM1, null, false, false); + rm2 = new MockRM(confForRM2, null, false, false); startRMs(rm1, confForRM1, rm2, confForRM2); - } protected void startRMsWithCustomizedRMAppManager() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java index 03bc8897ab4..c8ee00e60bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/ReservationACLsTestBase.java @@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.ReservationRequest; import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; import org.apache.hadoop.yarn.api.records.ReservationRequests; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -463,9 +462,7 @@ public class ReservationACLsTestBase extends ACLsTestBase { int attempts = 10; Collection plans; do { - DrainDispatcher dispatcher = - (DrainDispatcher) resourceManager.getRMContext().getDispatcher(); - dispatcher.await(); + resourceManager.drainEvents(); LOG.info("Waiting for node capacity to be added to plan"); plans = resourceManager.getRMContext().getReservationSystem() .getAllPlans().values(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index c4197a1c1f8..422b7eb88a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -40,8 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -161,13 +159,7 @@ public class TestApplicationCleanup { Logger rootLogger = LogManager.getRootLogger(); rootLogger.setLevel(Level.DEBUG); - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = new MockRM() { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(); rm.start(); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000); @@ -185,8 +177,8 @@ public class TestApplicationCleanup { int request = 2; am.allocate("127.0.0.1" , 1000, request, new ArrayList()); - dispatcher.await(); - + rm.drainEvents(); + //kick the scheduler nm1.nodeHeartbeat(true); List conts = am.allocate(new ArrayList(), @@ -199,7 +191,7 @@ public class TestApplicationCleanup { Thread.sleep(100); conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers(); - dispatcher.await(); + rm.drainEvents(); contReceived += conts.size(); nm1.nodeHeartbeat(true); } @@ -209,7 +201,7 @@ public class TestApplicationCleanup { ArrayList release = new ArrayList(); release.add(conts.get(0).getId()); am.allocate(new ArrayList(), release); - dispatcher.await(); + rm.drainEvents(); // Send one more heartbeat with a fake running container. This is to // simulate the situation that can happen if the NM reports that container @@ -224,7 +216,7 @@ public class TestApplicationCleanup { containerStatuses.put(app.getApplicationId(), containerStatusList); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); - waitForContainerCleanup(dispatcher, nm1, resp); + waitForContainerCleanup(rm, nm1, resp); // Now to test the case when RM already gave cleanup, and NM suddenly // realizes that the container is running. @@ -240,17 +232,17 @@ public class TestApplicationCleanup { resp = nm1.nodeHeartbeat(containerStatuses, true); // The cleanup list won't be instantaneous as it is given out by scheduler // and not RMNodeImpl. - waitForContainerCleanup(dispatcher, nm1, resp); + waitForContainerCleanup(rm, nm1, resp); rm.stop(); } - protected void waitForContainerCleanup(DrainDispatcher dispatcher, MockNM nm, + protected void waitForContainerCleanup(MockRM rm, MockNM nm, NodeHeartbeatResponse resp) throws Exception { int waitCount = 0, cleanedConts = 0; List contsToClean; do { - dispatcher.await(); + rm.drainEvents(); contsToClean = resp.getContainersToCleanup(); cleanedConts += contsToClean.size(); if (cleanedConts >= 1) { @@ -400,13 +392,7 @@ public class TestApplicationCleanup { memStore.init(conf); // start RM - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); @@ -419,13 +405,7 @@ public class TestApplicationCleanup { rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING); // start new RM - final DrainDispatcher dispatcher2 = new DrainDispatcher(); - MockRM rm2 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher2; - } - }; + MockRM rm2 = new MockRM(conf, memStore); rm2.start(); // nm1 register to rm2, and do a heartbeat @@ -437,7 +417,7 @@ public class TestApplicationCleanup { NodeHeartbeatResponse response = nm1.nodeHeartbeat(am0 .getApplicationAttemptId(), 2, ContainerState.RUNNING); - waitForContainerCleanup(dispatcher2, nm1, response); + waitForContainerCleanup(rm2, nm1, response); rm1.stop(); rm2.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index ad247080cfe..92d6d2cb549 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -59,8 +59,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; @@ -261,7 +259,6 @@ public class TestApplicationMasterLauncher { Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setInt(YarnConfiguration.CLIENT_NM_CONNECT_RETRY_INTERVAL_MS, 1); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, null) { @Override protected ApplicationMasterLauncher createAMLauncher() { @@ -285,12 +282,8 @@ public class TestApplicationMasterLauncher { } }; } - - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } }; + rm.start(); MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5120); @@ -298,7 +291,7 @@ public class TestApplicationMasterLauncher { // kick the scheduling nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(app.getCurrentAppAttempt(), RMAppAttemptState.LAUNCHED, 500); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index 23bed228e19..18c49bdddcd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -42,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -327,10 +325,8 @@ public class TestApplicationMasterService { @Test(timeout=1200000) public void testAllocateAfterUnregister() throws Exception { - MyResourceManager rm = new MyResourceManager(conf); + MockRM rm = new MockRM(conf); rm.start(); - DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() - .getDispatcher(); // Register node1 MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); @@ -351,7 +347,7 @@ public class TestApplicationMasterService { AllocateResponse alloc1Response = am1.schedule(); nm1.nodeHeartbeat(true); - rmDispatcher.await(); + rm.drainEvents(); alloc1Response = am1.schedule(); Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size()); } @@ -474,17 +470,6 @@ public class TestApplicationMasterService { rm.stop(); } - private static class MyResourceManager extends MockRM { - - public MyResourceManager(YarnConfiguration conf) { - super(conf); - } - @Override - protected Dispatcher createDispatcher() { - return new DrainDispatcher(); - } - } - private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); RMContainer rmContainer = cs.getRMContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java index b4adf480b35..75ef5c775be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestNodeBlacklistingOnAMFailures.java @@ -33,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -65,8 +63,7 @@ public class TestNodeBlacklistingOnAMFailures { conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -122,7 +119,7 @@ public class TestNodeBlacklistingOnAMFailures { // Try the current node a few times for (int i = 0; i <= 2; i++) { currentNode.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals( "AppAttemptState should still be SCHEDULED if currentNode is " @@ -132,7 +129,7 @@ public class TestNodeBlacklistingOnAMFailures { // Now try the other node otherNode.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -169,8 +166,7 @@ public class TestNodeBlacklistingOnAMFailures { conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -227,7 +223,7 @@ public class TestNodeBlacklistingOnAMFailures { System.out.println("New AppAttempt launched " + attempt.getAppAttemptId()); nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -257,8 +253,7 @@ public class TestNodeBlacklistingOnAMFailures { conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, true); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); // Register 5 nodes, so that we can blacklist atleast one if AM container @@ -319,7 +314,7 @@ public class TestNodeBlacklistingOnAMFailures { nm3.nodeHeartbeat(true); nm4.nodeHeartbeat(true); nm5.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); // Now the AM container should be allocated MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); @@ -352,8 +347,7 @@ public class TestNodeBlacklistingOnAMFailures { 1.5f); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 100); - DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = startRM(conf, dispatcher); + MockRM rm = startRM(conf); MockNM node = new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService()); @@ -367,7 +361,7 @@ public class TestNodeBlacklistingOnAMFailures { // Now the AM container should be allocated RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm); node.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); @@ -394,7 +388,7 @@ public class TestNodeBlacklistingOnAMFailures { .println("New AppAttempt launched " + attempt.getAppAttemptId()); node.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000); rm.sendAMLaunched(attempt.getAppAttemptId()); @@ -418,20 +412,13 @@ public class TestNodeBlacklistingOnAMFailures { rm.waitForState(amAttemptID.getApplicationId(), RMAppState.ACCEPTED); } - private MockRM startRM(YarnConfiguration conf, - final DrainDispatcher dispatcher) { - + private MockRM startRM(YarnConfiguration conf) { MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(conf, memStore); - rm1.start(); - return rm1; + rm.start(); + return rm; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java index 5a6fe67e2b1..f746dc2f188 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; @@ -186,9 +185,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase { rm.registerNode("127.0.0.1:1", memory, vCores); int attempts = 10; do { - DrainDispatcher dispatcher = - (DrainDispatcher) rm1.getRMContext().getDispatcher(); - dispatcher.await(); + rm1.drainEvents(); rm.getRMContext().getReservationSystem() .synchronizePlan(ReservationSystemTestUtil.reservationQ, false); if (rm.getRMContext().getReservationSystem() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index c8baa607bf0..f9f0b746233 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -47,12 +45,10 @@ import org.junit.Test; public class TestAMRMRPCNodeUpdates { private MockRM rm; - ApplicationMasterService amService = null; - DrainDispatcher dispatcher = null; + private ApplicationMasterService amService; @Before public void setUp() { - dispatcher = new DrainDispatcher(); this.rm = new MockRM() { @Override public void init(Configuration conf) { @@ -61,12 +57,8 @@ public class TestAMRMRPCNodeUpdates { "1.0"); super.init(conf); } - - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } }; + rm.start(); amService = rm.getApplicationMasterService(); } @@ -80,14 +72,14 @@ public class TestAMRMRPCNodeUpdates { private void syncNodeHeartbeat(MockNM nm, boolean health) throws Exception { nm.nodeHeartbeat(health); - dispatcher.await(); + rm.drainEvents(); } private void syncNodeLost(MockNM nm) throws Exception { rm.sendNodeStarted(nm); rm.waitForState(nm.getNodeId(), NodeState.RUNNING); rm.sendNodeLost(nm); - dispatcher.await(); + rm.drainEvents(); } private AllocateResponse allocate(final ApplicationAttemptId attemptId, @@ -113,7 +105,7 @@ public class TestAMRMRPCNodeUpdates { MockNM nm2 = rm.registerNode("127.0.0.2:1234", 10000); MockNM nm3 = rm.registerNode("127.0.0.3:1234", 10000); MockNM nm4 = rm.registerNode("127.0.0.4:1234", 10000); - dispatcher.await(); + rm.drainEvents(); RMApp app1 = rm.submitApp(2000); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java index e7c7e51bf2c..6a7325c25c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java @@ -28,7 +28,6 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider; import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -228,21 +227,16 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { // The node(127.0.0.1:1234) reconnected with RM. When it registered with // RM, RM set its lastNodeHeartbeatResponse's id to 0 asynchronously. But // the node's heartbeat come before RM succeeded setting the id to 0. - final DrainDispatcher dispatcher = new DrainDispatcher(); - MockRM rm = new MockRM(){ - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm = new MockRM(); rm.start(); + MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); nm1.registerNode(); int i = 0; while(i < 3) { nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); i++; } @@ -251,7 +245,7 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase { nm2.registerNode(); RMNode rmNode = rm.getRMContext().getRMNodes().get(nm2.getNodeId()); nm2.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); Assert.assertEquals("Node is Not in Running state.", NodeState.RUNNING, rmNode.getState()); rm.stop(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java index 893f802ade3..db3144898fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index ff52efd89be..fd17bd91a3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -36,8 +36,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; @@ -612,24 +610,17 @@ public class TestApplicationPriority { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); - final DrainDispatcher dispatcher = new DrainDispatcher(); MemoryRMStateStore memStore = new MemoryRMStateStore(); memStore.init(conf); - MockRM rm1 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - }; + MockRM rm1 = new MockRM(conf, memStore); rm1.start(); MockNM nm1 = new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService()); nm1.registerNode(); - - dispatcher.await(); + rm1.drainEvents(); ResourceScheduler scheduler = rm1.getRMContext().getScheduler(); LeafQueue defaultQueue = @@ -648,7 +639,7 @@ public class TestApplicationPriority { MockAM am2 = MockRM.launchAM(app2, rm1, nm1); am2.registerAppAttempt(); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); Assert.assertEquals(0, defaultQueue.getNumPendingApplications()); @@ -657,7 +648,7 @@ public class TestApplicationPriority { Priority appPriority3 = Priority.newInstance(7); RMApp app3 = rm1.submitApp(memory, appPriority3); - dispatcher.await(); + rm1.drainEvents(); Assert.assertEquals(2, defaultQueue.getNumActiveApplications()); Assert.assertEquals(1, defaultQueue.getNumPendingApplications()); @@ -676,14 +667,8 @@ public class TestApplicationPriority { Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(), fcApp3.getApplicationAttemptId()); - final DrainDispatcher dispatcher1 = new DrainDispatcher(); // create new RM to represent restart and recover state - MockRM rm2 = new MockRM(conf, memStore) { - @Override - protected Dispatcher createDispatcher() { - return dispatcher1; - } - }; + MockRM rm2 = new MockRM(conf, memStore); // start new RM rm2.start(); @@ -693,7 +678,7 @@ public class TestApplicationPriority { // Verify RM Apps after this restart Assert.assertEquals(3, rm2.getRMContext().getRMApps().size()); - dispatcher1.await(); + rm2.drainEvents(); scheduler = rm2.getRMContext().getScheduler(); defaultQueue = (LeafQueue) ((CapacityScheduler) scheduler).getQueue("default"); @@ -714,7 +699,7 @@ public class TestApplicationPriority { // NM resync to new RM nm1.registerNode(); - dispatcher1.await(); + rm2.drainEvents(); // wait for activating applications count = 50; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java index d36fb9f6827..e6e19c6f63e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestClientToAMTokens.java @@ -63,8 +63,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; @@ -183,7 +181,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { StartContainersResponse mockResponse = mock(StartContainersResponse.class); when(containerManager.startContainers((StartContainersRequest) any())) .thenReturn(mockResponse); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) { protected ClientRMService createClientRMService() { @@ -192,11 +189,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { getRMContext().getRMDelegationTokenSecretManager()); }; - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - @Override protected void doSecureLogin() throws IOException { } @@ -209,11 +201,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { // Set up a node. MockNM nm1 = rm.registerNode("localhost:1234", 3072); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); - nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM = @@ -424,7 +415,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { StartContainersResponse mockResponse = mock(StartContainersResponse.class); when(containerManager.startContainers((StartContainersRequest) any())) .thenReturn(mockResponse); - final DrainDispatcher dispatcher = new DrainDispatcher(); MockRM rm = new MockRMWithCustomAMLauncher(conf, containerManager) { protected ClientRMService createClientRMService() { @@ -433,11 +423,6 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { getRMContext().getRMDelegationTokenSecretManager()); }; - @Override - protected Dispatcher createDispatcher() { - return dispatcher; - } - @Override protected void doSecureLogin() throws IOException { } @@ -450,10 +435,10 @@ public class TestClientToAMTokens extends ParameterizedSchedulerTestBase { // Set up a node. MockNM nm1 = rm.registerNode("localhost:1234", 3072); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); nm1.nodeHeartbeat(true); - dispatcher.await(); + rm.drainEvents(); ApplicationAttemptId appAttempt = app.getCurrentAppAttempt().getAppAttemptId(); final MockAM mockAM =