MAPREDUCE-7138. ThrottledContainerAllocator in MRAppBenchmark should implement RMHeartbeatHandler. Contributed by Oleksandr Shevchenko

(cherry picked from commit 8382b860d4ef4f20d000537ded42a88e98bd2190)

Conflicts:
	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
This commit is contained in:
Jason Lowe 2018-09-18 17:06:32 -05:00
parent f5cc9f71e6
commit 726aee360a

View File

@ -33,7 +33,9 @@
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -114,7 +116,7 @@ protected ContainerAllocator createContainerAllocator(
} }
class ThrottledContainerAllocator extends AbstractService class ThrottledContainerAllocator extends AbstractService
implements ContainerAllocator { implements ContainerAllocator, RMHeartbeatHandler {
private int containerCount; private int containerCount;
private Thread thread; private Thread thread;
private BlockingQueue<ContainerAllocatorEvent> eventQueue = private BlockingQueue<ContainerAllocatorEvent> eventQueue =
@ -179,6 +181,15 @@ protected void serviceStop() throws Exception {
} }
super.serviceStop(); super.serviceStop();
} }
@Override
public long getLastHeartbeatTime() {
return Time.now();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
}
} }
} }
@ -259,7 +270,7 @@ public AllocateResponse allocate(AllocateRequest request)
}); });
} }
@Test @Test(timeout = 60000)
public void benchmark2() throws Exception { public void benchmark2() throws Exception {
int maps = 100; // Adjust for benchmarking, start with a couple of thousands int maps = 100; // Adjust for benchmarking, start with a couple of thousands
int reduces = 50; int reduces = 50;