YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it. (kasha)

(cherry picked from commit d40859fab1ad977636457a6cc96b6a4f9b903afc)

Conflicts:
	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
This commit is contained in:
Karthik Kambatla 2016-01-18 10:58:14 +01:00
parent c56d46307a
commit ef59521094
41 changed files with 89 additions and 73 deletions

View File

@ -242,7 +242,7 @@ public MRAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) { long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime); SystemClock.getInstance(), appSubmitTime);
} }
public MRAppMaster(ApplicationAttemptId applicationAttemptId, public MRAppMaster(ApplicationAttemptId applicationAttemptId,

View File

@ -38,7 +38,7 @@ public class TaskAttemptFinishingMonitor extends
private EventHandler eventHandler; private EventHandler eventHandler;
public TaskAttemptFinishingMonitor(EventHandler eventHandler) { public TaskAttemptFinishingMonitor(EventHandler eventHandler) {
super("TaskAttemptFinishingMonitor", new SystemClock()); super("TaskAttemptFinishingMonitor", SystemClock.getInstance());
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
} }

View File

@ -46,7 +46,7 @@ public class TestTaskAttemptFinishingMonitor {
@Test @Test
public void testFinshingAttemptTimeout() public void testFinshingAttemptTimeout()
throws IOException, InterruptedException { throws IOException, InterruptedException {
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);

View File

@ -236,7 +236,7 @@ private static TaskAttemptCompletionEvent createTce(int eventId,
@Test (timeout=10000) @Test (timeout=10000)
public void testCommitWindow() throws IOException { public void testCommitWindow() throws IOException {
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);

View File

@ -152,7 +152,7 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, String assignedQueue) { boolean cleanOnStart, String assignedQueue) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, this(maps, reduces, autoComplete, testName, cleanOnStart, 1,
new SystemClock(), assignedQueue); SystemClock.getInstance(), assignedQueue);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -186,13 +186,13 @@ private static ContainerId getContainerId(ApplicationId applicationId,
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock(), null); SystemClock.getInstance(), null);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean unregistered) { boolean cleanOnStart, int startCount, boolean unregistered) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock(), unregistered); SystemClock.getInstance(), unregistered);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -213,14 +213,14 @@ public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean unregistered) { boolean cleanOnStart, int startCount, boolean unregistered) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), unregistered, null); cleanOnStart, startCount, SystemClock.getInstance(), unregistered, null);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), true, null); cleanOnStart, startCount, SystemClock.getInstance(), true, null);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,

View File

@ -1785,7 +1785,7 @@ private MapTaskImpl getMockMapTask(long clusterTimestamp, EventHandler eh) {
Token<JobTokenIdentifier> jobToken = Token<JobTokenIdentifier> jobToken =
(Token<JobTokenIdentifier>) mock(Token.class); (Token<JobTokenIdentifier>) mock(Token.class);
Credentials credentials = null; Credentials credentials = null;
Clock clock = new SystemClock(); Clock clock = SystemClock.getInstance();
int appAttemptId = 3; int appAttemptId = 3;
MRAppMetrics metrics = mock(MRAppMetrics.class); MRAppMetrics metrics = mock(MRAppMetrics.class);
Resource minContainerRequirements = mock(Resource.class); Resource minContainerRequirements = mock(Resource.class);

View File

@ -788,7 +788,7 @@ class MyAppMaster extends CompositeService {
public MyAppMaster(Clock clock) { public MyAppMaster(Clock clock) {
super(MyAppMaster.class.getName()); super(MyAppMaster.class.getName());
if (clock == null) { if (clock == null) {
clock = new SystemClock(); clock = SystemClock.getInstance();
} }
this.clock = clock; this.clock = clock;
LOG.info("Created MyAppMaster"); LOG.info("Created MyAppMaster");

View File

@ -44,7 +44,7 @@ public class TestTaskHeartbeatHandler {
@Test @Test
public void testTimeout() throws InterruptedException { public void testTimeout() throws InterruptedException {
EventHandler mockHandler = mock(EventHandler.class); EventHandler mockHandler = mock(EventHandler.class);
Clock clock = new SystemClock(); Clock clock = SystemClock.getInstance();
TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1);

View File

@ -127,7 +127,7 @@ public void testCommitWindow() throws Exception {
TestingJobEventHandler jeh = new TestingJobEventHandler(); TestingJobEventHandler jeh = new TestingJobEventHandler();
dispatcher.register(JobEventType.class, jeh); dispatcher.register(JobEventType.class, jeh);
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
ApplicationAttemptId attemptid = ApplicationAttemptId attemptid =
ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0"); ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");

View File

@ -723,7 +723,7 @@ public void testReportDiagnostics() throws Exception {
.newRecord(ApplicationAttemptId.class), new Configuration(), .newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, SystemClock.getInstance(), null,
mrAppMetrics, null, true, null, 0, null, mockContext, null, null); mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics(); String diagnostics = job.getReport().getDiagnostics();
@ -734,7 +734,7 @@ null, mock(JobTokenSecretManager.class), null,
.newRecord(ApplicationAttemptId.class), new Configuration(), .newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, SystemClock.getInstance(), null,
mrAppMetrics, null, true, null, 0, null, mockContext, null, null); mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
@ -926,7 +926,7 @@ public void testJobPriorityUpdate() throws Exception {
private static CommitterEventHandler createCommitterEventHandler( private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) { Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = new SystemClock(); final SystemClock clock = SystemClock.getInstance();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn( when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler()); dispatcher.getEventHandler());
@ -1105,7 +1105,7 @@ public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
String user, int numSplits, AppContext appContext) { String user, int numSplits, AppContext appContext) {
super(jobId, applicationAttemptId, conf, eventHandler, super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(), null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(), SystemClock.getInstance(), Collections.<TaskId, TaskInfo> emptyMap(),
MRAppMetrics.create(), null, newApiCommitter, user, MRAppMetrics.create(), null, newApiCommitter, user,
System.currentTimeMillis(), null, appContext, null, null); System.currentTimeMillis(), null, appContext, null, null);

View File

@ -99,7 +99,7 @@ public void testShuffleProviders() throws Exception {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
mock(TaskSplitMetaInfo.class), jobConf, taListener, mock(TaskSplitMetaInfo.class), jobConf, taListener,
jobToken, credentials, jobToken, credentials,
new SystemClock(), null); SystemClock.getInstance(), null);
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

View File

@ -257,7 +257,7 @@ public void testMillisCountersUpdate() throws Exception {
public void verifyMillisCounters(int mapMemMb, int reduceMemMb, public void verifyMillisCounters(int mapMemMb, int reduceMemMb,
int minContainerSize) throws Exception { int minContainerSize) throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = SystemClock.getInstance();
ControlledClock clock = new ControlledClock(actualClock); ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(10); clock.setTime(10);
MRApp app = MRApp app =
@ -320,7 +320,7 @@ public void verifyMillisCounters(int mapMemMb, int reduceMemMb,
private TaskAttemptImpl createMapTaskAttemptImplForTest( private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) { EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = new SystemClock(); Clock clock = SystemClock.getInstance();
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock); return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
} }
@ -512,7 +512,7 @@ public void testLaunchFailedWhileKilling() throws Exception {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), null); SystemClock.getInstance(), null);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -569,7 +569,7 @@ public void testContainerCleanedWhileRunning() throws Exception {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -627,7 +627,7 @@ public void testContainerCleanedWhileCommitting() throws Exception {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -691,7 +691,7 @@ public void testDoubleTooManyFetchFailure() throws Exception {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -759,7 +759,7 @@ public void testAppDiognosticEventOnUnassignedTask() throws Exception {
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
new Token(), new Credentials(), new SystemClock(), appCtx); new Token(), new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -818,7 +818,7 @@ public void testTooManyFetchFailureAfterKill() throws Exception {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -884,7 +884,7 @@ public void testAppDiognosticEventOnNewTask() throws Exception {
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
new Token(), new Credentials(), new SystemClock(), appCtx); new Token(), new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -934,7 +934,7 @@ public void testFetchFailureAttemptFinishTime() throws Exception{
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,mock(Token.class), new Credentials(), splits, jobConf, taListener,mock(Token.class), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1004,7 +1004,7 @@ public void testContainerKillAfterAssigned() throws Exception {
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(), jobFile, 1, splits, jobConf, taListener, new Token(),
new Credentials(), new SystemClock(), appCtx); new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1058,7 +1058,7 @@ public void testContainerKillWhileRunning() throws Exception {
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(), jobFile, 1, splits, jobConf, taListener, new Token(),
new Credentials(), new SystemClock(), appCtx); new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1115,7 +1115,7 @@ public void testContainerKillWhileCommitPending() throws Exception {
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(), jobFile, 1, splits, jobConf, taListener, new Token(),
new Credentials(), new SystemClock(), appCtx); new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1376,7 +1376,7 @@ private TaskAttemptImpl createTaskAttemptImpl(
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);

View File

@ -104,7 +104,7 @@ public void testAttemptContainerRequest() throws Exception {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
mock(TaskSplitMetaInfo.class), jobConf, taListener, mock(TaskSplitMetaInfo.class), jobConf, taListener,
jobToken, credentials, jobToken, credentials,
new SystemClock(), null); SystemClock.getInstance(), null);
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

View File

@ -243,7 +243,7 @@ public void setup() {
jobToken = (Token<JobTokenIdentifier>) mock(Token.class); jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
remoteJobConfFile = mock(Path.class); remoteJobConfFile = mock(Path.class);
credentials = null; credentials = null;
clock = new SystemClock(); clock = SystemClock.getInstance();
metrics = mock(MRAppMetrics.class); metrics = mock(MRAppMetrics.class);
dataLocations = new String[1]; dataLocations = new String[1];

View File

@ -430,7 +430,7 @@ public void testReducerRampdownDiagnostics() throws Exception {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
// add resources to scheduler // add resources to scheduler
dispatcher.await(); dispatcher.await();
@ -484,7 +484,7 @@ public void testPreemptReducers() throws Exception {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
RMContainerAllocator.AssignedRequests assignedRequests = RMContainerAllocator.AssignedRequests assignedRequests =
@ -652,7 +652,7 @@ public void testExcessReduceContainerAssign() throws Exception {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
// request to allocate two reduce priority containers // request to allocate two reduce priority containers
final String[] locations = new String[] { host }; final String[] locations = new String[] { host };
@ -697,7 +697,7 @@ public void testMapReduceAllocationWithNodeLabelExpression() throws Exception {
final MockScheduler mockScheduler = new MockScheduler(appAttemptId); final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = MyContainerAllocator allocator =
new MyContainerAllocator(null, conf, appAttemptId, mockJob, new MyContainerAllocator(null, conf, appAttemptId, mockJob,
new SystemClock()) { SystemClock.getInstance()) {
@Override @Override
protected void register() { protected void register() {
} }
@ -789,7 +789,7 @@ public void testMapReduceScheduling() throws Exception {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
// add resources to scheduler // add resources to scheduler
MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
@ -2262,7 +2262,7 @@ public void testHeartbeatHandler() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1); conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
when(appContext.getClock()).thenReturn(clock); when(appContext.getClock()).thenReturn(clock);
when(appContext.getApplicationID()).thenReturn( when(appContext.getApplicationID()).thenReturn(
@ -2746,7 +2746,7 @@ public void testConcurrentTaskLimits() throws Exception {
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId); final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = new MyContainerAllocator(null, conf, MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
appAttemptId, mockJob, new SystemClock()) { appAttemptId, mockJob, SystemClock.getInstance()) {
@Override @Override
protected void register() { protected void register() {
} }

View File

@ -536,7 +536,7 @@ protected void serviceInit(Configuration conf) throws Exception {
long maxFSWaitTime = conf.getLong( long maxFSWaitTime = conf.getLong(
JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
this.aclsMgr = new JobACLsManager(conf); this.aclsMgr = new JobACLsManager(conf);

View File

@ -185,7 +185,8 @@ public void run() {
} }
} }
}.start(); }.start();
testCreateHistoryDirs(dfsCluster.getConfiguration(0), new SystemClock()); testCreateHistoryDirs(dfsCluster.getConfiguration(0),
SystemClock.getInstance());
} }
@Test(expected = YarnRuntimeException.class) @Test(expected = YarnRuntimeException.class)
@ -194,7 +195,7 @@ public void testCreateDirsWithFileSystemNotBecomingAvailBeforeTimeout()
dfsCluster.getFileSystem().setSafeMode( dfsCluster.getFileSystem().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_ENTER); HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode()); Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode());
final ControlledClock clock = new ControlledClock(new SystemClock()); final ControlledClock clock = new ControlledClock();
clock.setTime(1); clock.setTime(1);
new Thread() { new Thread() {
@Override @Override

View File

@ -57,7 +57,7 @@ public class TestSpeculativeExecutionWithMRApp {
@Test @Test
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = SystemClock.getInstance();
final ControlledClock clock = new ControlledClock(actualClock); final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis()); clock.setTime(System.currentTimeMillis());
@ -128,7 +128,7 @@ public Boolean get() {
@Test @Test
public void testSepculateSuccessfulWithUpdateEvents() throws Exception { public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = SystemClock.getInstance();
final ControlledClock clock = new ControlledClock(actualClock); final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis()); clock.setTime(System.currentTimeMillis());

View File

@ -37,6 +37,9 @@ Release 2.9.0 - UNRELEASED
YARN-4553. Add cgroups support for docker containers. YARN-4553. Add cgroups support for docker containers.
(Sidharta Seethana via vvasudev) (Sidharta Seethana via vvasudev)
YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it.
(kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -122,7 +122,7 @@ public static MemInfo getMemInfoByName(String name) {
new HashMap<String, ProcessInfo>(); new HashMap<String, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) { public ProcfsBasedProcessTree(String pid) {
this(pid, PROCFS, new SystemClock()); this(pid, PROCFS, SystemClock.getInstance());
} }
@Override @Override
@ -136,7 +136,7 @@ public void setConf(Configuration conf) {
} }
public ProcfsBasedProcessTree(String pid, String procfsDir) { public ProcfsBasedProcessTree(String pid, String procfsDir) {
this(pid, procfsDir, new SystemClock()); this(pid, procfsDir, SystemClock.getInstance());
} }
/** /**

View File

@ -30,7 +30,17 @@
*/ */
@Public @Public
@Stable @Stable
public class SystemClock implements Clock { public final class SystemClock implements Clock {
private static final SystemClock INSTANCE = new SystemClock();
public static SystemClock getInstance() {
return INSTANCE;
}
private SystemClock() {
// do nothing
}
public long getTime() { public long getTime() {
return System.currentTimeMillis(); return System.currentTimeMillis();

View File

@ -83,7 +83,7 @@ public static boolean isAvailable() {
* @param pid Identifier of the job object. * @param pid Identifier of the job object.
*/ */
public WindowsBasedProcessTree(final String pid) { public WindowsBasedProcessTree(final String pid) {
this(pid, new SystemClock()); this(pid, SystemClock.getInstance());
} }
/** /**

View File

@ -23,7 +23,7 @@ public class ControlledClock implements Clock {
private final Clock actualClock; private final Clock actualClock;
// Convenience for getting a controlled clock with overridden time // Convenience for getting a controlled clock with overridden time
public ControlledClock() { public ControlledClock() {
this(new SystemClock()); this(SystemClock.getInstance());
setTime(0); setTime(0);
} }
public ControlledClock(Clock actualClock) { public ControlledClock(Clock actualClock) {

View File

@ -395,7 +395,7 @@ public void testCpuAndMemoryForProcessTree() throws IOException {
// test processes // test processes
String[] pids = { "100", "200", "300", "400" }; String[] pids = { "100", "200", "300", "400" };
ControlledClock testClock = new ControlledClock(new SystemClock()); ControlledClock testClock = new ControlledClock();
testClock.setTime(0); testClock.setTime(0);
// create the fake procfs root directory. // create the fake procfs root directory.
File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
@ -575,7 +575,7 @@ private void testMemForOlderProcesses(boolean smapEnabled) throws IOException {
// crank up the process tree class. // crank up the process tree class.
ProcfsBasedProcessTree processTree = ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath(), createProcessTree("100", procfsRootDir.getAbsolutePath(),
new SystemClock()); SystemClock.getInstance());
setSmapsInProceTree(processTree, smapEnabled); setSmapsInProceTree(processTree, smapEnabled);
// verify virtual memory // verify virtual memory
@ -774,7 +774,8 @@ public void testDestroyProcessTree() throws IOException {
setupProcfsRootDir(procfsRootDir); setupProcfsRootDir(procfsRootDir);
// crank up the process tree class. // crank up the process tree class.
createProcessTree(pid, procfsRootDir.getAbsolutePath(), new SystemClock()); createProcessTree(pid, procfsRootDir.getAbsolutePath(),
SystemClock.getInstance());
// Let us not create stat file for pid 100. // Let us not create stat file for pid 100.
Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(pid, Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(pid,
@ -844,7 +845,7 @@ public void testProcessTreeDump() throws IOException {
ProcfsBasedProcessTree processTree = ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath(), createProcessTree("100", procfsRootDir.getAbsolutePath(),
new SystemClock()); SystemClock.getInstance());
// build the process tree. // build the process tree.
processTree.updateProcessTree(); processTree.updateProcessTree();

View File

@ -51,7 +51,7 @@ public void tree() {
} }
assertTrue("WindowsBasedProcessTree should be available on Windows", assertTrue("WindowsBasedProcessTree should be available on Windows",
WindowsBasedProcessTree.isAvailable()); WindowsBasedProcessTree.isAvailable());
ControlledClock testClock = new ControlledClock(new SystemClock()); ControlledClock testClock = new ControlledClock();
long elapsedTimeBetweenUpdatesMsec = 0; long elapsedTimeBetweenUpdatesMsec = 0;
testClock.setTime(elapsedTimeBetweenUpdatesMsec); testClock.setTime(elapsedTimeBetweenUpdatesMsec);

View File

@ -99,7 +99,7 @@ public class ContainerImpl implements Container {
private boolean wasLaunched; private boolean wasLaunched;
private long containerLocalizationStartTime; private long containerLocalizationStartTime;
private long containerLaunchStartTime; private long containerLaunchStartTime;
private static Clock clock = new SystemClock(); private static Clock clock = SystemClock.getInstance();
/** The NM-wide configuration - not specific to this container */ /** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf; private final Configuration daemonConf;

View File

@ -88,7 +88,7 @@ public CGroupsHandlerImpl(Configuration conf, PrivilegedOperationExecutor
this.controllerPaths = new HashMap<>(); this.controllerPaths = new HashMap<>();
this.rwLock = new ReentrantReadWriteLock(); this.rwLock = new ReentrantReadWriteLock();
this.privilegedOperationExecutor = privilegedOperationExecutor; this.privilegedOperationExecutor = privilegedOperationExecutor;
this.clock = new SystemClock(); this.clock = SystemClock.getInstance();
init(); init();
} }

View File

@ -87,7 +87,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
public CgroupsLCEResourcesHandler() { public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>(); this.controllerPaths = new HashMap<String, String>();
clock = new SystemClock(); clock = SystemClock.getInstance();
} }
@Override @Override

View File

@ -83,7 +83,8 @@ protected void serviceInit(Configuration conf) throws Exception {
if (nodeIpCacheTimeout <= 0) { if (nodeIpCacheTimeout <= 0) {
resolver = new DirectResolver(); resolver = new DirectResolver();
} else { } else {
resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout); resolver =
new CachedResolver(SystemClock.getInstance(), nodeIpCacheTimeout);
addIfService(resolver); addIfService(resolver);
} }

View File

@ -97,7 +97,7 @@ public class RMActiveServiceContext {
private RMNodeLabelsManager nodeLabelManager; private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater; private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
private long epoch; private long epoch;
private Clock systemClock = new SystemClock(); private Clock systemClock = SystemClock.getInstance();
private long schedulerRecoveryStartTime = 0; private long schedulerRecoveryStartTime = 0;
private long schedulerRecoveryWaitTime = 0; private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true; private boolean printLog = true;

View File

@ -136,12 +136,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private RMNodeLabelsManager nlm; private RMNodeLabelsManager nlm;
public ProportionalCapacityPreemptionPolicy() { public ProportionalCapacityPreemptionPolicy() {
clock = new SystemClock(); clock = SystemClock.getInstance();
} }
public ProportionalCapacityPreemptionPolicy(Configuration config, public ProportionalCapacityPreemptionPolicy(Configuration config,
RMContext context, CapacityScheduler scheduler) { RMContext context, CapacityScheduler scheduler) {
this(config, context, scheduler, new SystemClock()); this(config, context, scheduler, SystemClock.getInstance());
} }
public ProportionalCapacityPreemptionPolicy(Configuration config, public ProportionalCapacityPreemptionPolicy(Configuration config,

View File

@ -399,7 +399,7 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
String applicationType, Set<String> applicationTags, String applicationType, Set<String> applicationTags,
ResourceRequest amReq) { ResourceRequest amReq) {
this.systemClock = new SystemClock(); this.systemClock = SystemClock.getInstance();
this.applicationId = applicationId; this.applicationId = applicationId;
this.name = name; this.name = name;

View File

@ -88,7 +88,7 @@ public class AllocationFileLoaderService extends AbstractService {
private volatile boolean running = true; private volatile boolean running = true;
public AllocationFileLoaderService() { public AllocationFileLoaderService() {
this(new SystemClock()); this(SystemClock.getInstance());
} }
public AllocationFileLoaderService(Clock clock) { public AllocationFileLoaderService(Clock clock) {

View File

@ -215,7 +215,7 @@ public class FairScheduler extends
public FairScheduler() { public FairScheduler() {
super(FairScheduler.class.getName()); super(FairScheduler.class.getName());
clock = new SystemClock(); clock = SystemClock.getInstance();
allocsLoader = new AllocationFileLoaderService(); allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this); queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this);

View File

@ -1163,7 +1163,7 @@ public void testNewContainersNotAllocatedDuringSchedulerRecovery()
rm2.start(); rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1.registerNode(); nm1.registerNode();
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
((RMContextImpl)rm2.getRMContext()).setSystemClock(clock); ((RMContextImpl)rm2.getRMContext()).setSystemClock(clock);
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());

View File

@ -820,7 +820,7 @@ public void testRMAppAttemptFailuresValidityInterval() throws Exception {
// current app should be failed. // current app should be failed.
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED); rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
// set window size to 10s // set window size to 10s
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);; RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);;
app1.setSystemClock(clock); app1.setSystemClock(clock);

View File

@ -136,7 +136,7 @@ protected Dispatcher createDispatcher() {
public void testCachedResolver() throws Exception { public void testCachedResolver() throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
clock.setTime(0); clock.setTime(0);
final int CACHE_EXPIRY_INTERVAL_SECS = 30; final int CACHE_EXPIRY_INTERVAL_SECS = 30;
NodesListManager.CachedResolver resolver = NodesListManager.CachedResolver resolver =

View File

@ -41,7 +41,7 @@ public void testResetTimer() throws Exception {
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000); conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000);
final ControlledClock clock = new ControlledClock(new SystemClock()); final ControlledClock clock = new ControlledClock();
clock.setTime(0); clock.setTime(0);
MemoryRMStateStore memStore = new MemoryRMStateStore() { MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override @Override

View File

@ -42,7 +42,7 @@ public void setUp() throws Exception {
AllocationConfiguration allocConf = new AllocationConfiguration(conf); AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>(); notEmptyQueues = new HashSet<FSQueue>();
queueManager = new QueueManager(scheduler) { queueManager = new QueueManager(scheduler) {

View File

@ -41,7 +41,7 @@ public void setUp() throws Exception {
AllocationConfiguration allocConf = new AllocationConfiguration(conf); AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>(); notEmptyQueues = new HashSet<FSQueue>();
queueManager = new QueueManager(scheduler) { queueManager = new QueueManager(scheduler) {

View File

@ -43,7 +43,7 @@ public void testEmptyChildQueues() throws Exception {
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1)); when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1));
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
QueueManager queueManager = new QueueManager(scheduler); QueueManager queueManager = new QueueManager(scheduler);
queueManager.initialize(conf); queueManager.initialize(conf);