MAPREDUCE-7138. ThrottledContainerAllocator in MRAppBenchmark should implement RMHeartbeatHandler. Contributed by Oleksandr Shevchenko

(cherry picked from commit 8382b860d4ef4f20d000537ded42a88e98bd2190)
This commit is contained in:
Jason Lowe 2018-09-18 17:06:32 -05:00
parent 85ae097f3c
commit 3a13fa1b7b

View File

@ -33,10 +33,12 @@
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -116,7 +118,7 @@ protected ContainerAllocator createContainerAllocator(
} }
class ThrottledContainerAllocator extends AbstractService class ThrottledContainerAllocator extends AbstractService
implements ContainerAllocator { implements ContainerAllocator, RMHeartbeatHandler {
private int containerCount; private int containerCount;
private Thread thread; private Thread thread;
private BlockingQueue<ContainerAllocatorEvent> eventQueue = private BlockingQueue<ContainerAllocatorEvent> eventQueue =
@ -182,6 +184,15 @@ protected void serviceStop() throws Exception {
} }
super.serviceStop(); super.serviceStop();
} }
@Override
public long getLastHeartbeatTime() {
return Time.now();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
}
} }
} }
@ -264,7 +275,7 @@ public AllocateResponse allocate(AllocateRequest request)
}); });
} }
@Test @Test(timeout = 60000)
public void benchmark2() throws Exception { public void benchmark2() throws Exception {
int maps = 100; // Adjust for benchmarking, start with a couple of thousands int maps = 100; // Adjust for benchmarking, start with a couple of thousands
int reduces = 50; int reduces = 50;