YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it. (kasha)

This commit is contained in:
Karthik Kambatla 2016-01-18 10:58:14 +01:00
parent 3fe5728563
commit d40859fab1
41 changed files with 90 additions and 74 deletions

View File

@ -245,7 +245,7 @@ public class MRAppMaster extends CompositeService {
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
long appSubmitTime) { long appSubmitTime) {
this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
new SystemClock(), appSubmitTime); SystemClock.getInstance(), appSubmitTime);
} }
public MRAppMaster(ApplicationAttemptId applicationAttemptId, public MRAppMaster(ApplicationAttemptId applicationAttemptId,

View File

@ -38,7 +38,7 @@ public class TaskAttemptFinishingMonitor extends
private EventHandler eventHandler; private EventHandler eventHandler;
public TaskAttemptFinishingMonitor(EventHandler eventHandler) { public TaskAttemptFinishingMonitor(EventHandler eventHandler) {
super("TaskAttemptFinishingMonitor", new SystemClock()); super("TaskAttemptFinishingMonitor", SystemClock.getInstance());
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
} }

View File

@ -47,7 +47,7 @@ public class TestTaskAttemptFinishingMonitor {
@Test @Test
public void testFinshingAttemptTimeout() public void testFinshingAttemptTimeout()
throws IOException, InterruptedException { throws IOException, InterruptedException {
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10); conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);

View File

@ -258,7 +258,7 @@ public class TestTaskAttemptListenerImpl {
@Test (timeout=10000) @Test (timeout=10000)
public void testCommitWindow() throws IOException { public void testCommitWindow() throws IOException {
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);
@ -309,7 +309,7 @@ public class TestTaskAttemptListenerImpl {
public void testCheckpointIDTracking() public void testCheckpointIDTracking()
throws IOException, InterruptedException{ throws IOException, InterruptedException{
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
org.apache.hadoop.mapreduce.v2.app.job.Task mockTask = org.apache.hadoop.mapreduce.v2.app.job.Task mockTask =
mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class); mock(org.apache.hadoop.mapreduce.v2.app.job.Task.class);

View File

@ -153,7 +153,7 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, String assignedQueue) { boolean cleanOnStart, String assignedQueue) {
this(maps, reduces, autoComplete, testName, cleanOnStart, 1, this(maps, reduces, autoComplete, testName, cleanOnStart, 1,
new SystemClock(), assignedQueue); SystemClock.getInstance(), assignedQueue);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -187,13 +187,13 @@ public class MRApp extends MRAppMaster {
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock(), null); SystemClock.getInstance(), null);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean unregistered) { boolean cleanOnStart, int startCount, boolean unregistered) {
this(maps, reduces, autoComplete, testName, cleanOnStart, startCount, this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
new SystemClock(), unregistered); SystemClock.getInstance(), unregistered);
} }
public MRApp(int maps, int reduces, boolean autoComplete, String testName, public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@ -214,14 +214,14 @@ public class MRApp extends MRAppMaster {
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount, boolean unregistered) { boolean cleanOnStart, int startCount, boolean unregistered) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), unregistered, null); cleanOnStart, startCount, SystemClock.getInstance(), unregistered, null);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
int maps, int reduces, boolean autoComplete, String testName, int maps, int reduces, boolean autoComplete, String testName,
boolean cleanOnStart, int startCount) { boolean cleanOnStart, int startCount) {
this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName, this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
cleanOnStart, startCount, new SystemClock(), true, null); cleanOnStart, startCount, SystemClock.getInstance(), true, null);
} }
public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,

View File

@ -1785,7 +1785,7 @@ public class TestRecovery {
Token<JobTokenIdentifier> jobToken = Token<JobTokenIdentifier> jobToken =
(Token<JobTokenIdentifier>) mock(Token.class); (Token<JobTokenIdentifier>) mock(Token.class);
Credentials credentials = null; Credentials credentials = null;
Clock clock = new SystemClock(); Clock clock = SystemClock.getInstance();
int appAttemptId = 3; int appAttemptId = 3;
MRAppMetrics metrics = mock(MRAppMetrics.class); MRAppMetrics metrics = mock(MRAppMetrics.class);
Resource minContainerRequirements = mock(Resource.class); Resource minContainerRequirements = mock(Resource.class);

View File

@ -788,7 +788,7 @@ public class TestRuntimeEstimators {
public MyAppMaster(Clock clock) { public MyAppMaster(Clock clock) {
super(MyAppMaster.class.getName()); super(MyAppMaster.class.getName());
if (clock == null) { if (clock == null) {
clock = new SystemClock(); clock = SystemClock.getInstance();
} }
this.clock = clock; this.clock = clock;
LOG.info("Created MyAppMaster"); LOG.info("Created MyAppMaster");

View File

@ -44,7 +44,7 @@ public class TestTaskHeartbeatHandler {
@Test @Test
public void testTimeout() throws InterruptedException { public void testTimeout() throws InterruptedException {
EventHandler mockHandler = mock(EventHandler.class); EventHandler mockHandler = mock(EventHandler.class);
Clock clock = new SystemClock(); Clock clock = SystemClock.getInstance();
TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1); TaskHeartbeatHandler hb = new TaskHeartbeatHandler(mockHandler, clock, 1);

View File

@ -127,7 +127,7 @@ public class TestCommitterEventHandler {
TestingJobEventHandler jeh = new TestingJobEventHandler(); TestingJobEventHandler jeh = new TestingJobEventHandler();
dispatcher.register(JobEventType.class, jeh); dispatcher.register(JobEventType.class, jeh);
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
ApplicationAttemptId attemptid = ApplicationAttemptId attemptid =
ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0"); ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");

View File

@ -723,7 +723,7 @@ public class TestJobImpl {
.newRecord(ApplicationAttemptId.class), new Configuration(), .newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, SystemClock.getInstance(), null,
mrAppMetrics, null, true, null, 0, null, mockContext, null, null); mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
String diagnostics = job.getReport().getDiagnostics(); String diagnostics = job.getReport().getDiagnostics();
@ -734,7 +734,7 @@ public class TestJobImpl {
.newRecord(ApplicationAttemptId.class), new Configuration(), .newRecord(ApplicationAttemptId.class), new Configuration(),
mock(EventHandler.class), mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, mock(JobTokenSecretManager.class), null,
new SystemClock(), null, SystemClock.getInstance(), null,
mrAppMetrics, null, true, null, 0, null, mockContext, null, null); mrAppMetrics, null, true, null, 0, null, mockContext, null, null);
job.handle(new JobEvent(jobId, JobEventType.JOB_KILL)); job.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
job.handle(diagUpdateEvent); job.handle(diagUpdateEvent);
@ -926,7 +926,7 @@ public class TestJobImpl {
private static CommitterEventHandler createCommitterEventHandler( private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) { Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = new SystemClock(); final SystemClock clock = SystemClock.getInstance();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
when(appContext.getEventHandler()).thenReturn( when(appContext.getEventHandler()).thenReturn(
dispatcher.getEventHandler()); dispatcher.getEventHandler());
@ -1105,7 +1105,7 @@ public class TestJobImpl {
String user, int numSplits, AppContext appContext) { String user, int numSplits, AppContext appContext) {
super(jobId, applicationAttemptId, conf, eventHandler, super(jobId, applicationAttemptId, conf, eventHandler,
null, new JobTokenSecretManager(), new Credentials(), null, new JobTokenSecretManager(), new Credentials(),
new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(), SystemClock.getInstance(), Collections.<TaskId, TaskInfo> emptyMap(),
MRAppMetrics.create(), null, newApiCommitter, user, MRAppMetrics.create(), null, newApiCommitter, user,
System.currentTimeMillis(), null, appContext, null, null); System.currentTimeMillis(), null, appContext, null, null);

View File

@ -99,7 +99,7 @@ public class TestShuffleProvider {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
mock(TaskSplitMetaInfo.class), jobConf, taListener, mock(TaskSplitMetaInfo.class), jobConf, taListener,
jobToken, credentials, jobToken, credentials,
new SystemClock(), null); SystemClock.getInstance(), null);
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

View File

@ -257,7 +257,7 @@ public class TestTaskAttempt{
public void verifyMillisCounters(int mapMemMb, int reduceMemMb, public void verifyMillisCounters(int mapMemMb, int reduceMemMb,
int minContainerSize) throws Exception { int minContainerSize) throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = SystemClock.getInstance();
ControlledClock clock = new ControlledClock(actualClock); ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(10); clock.setTime(10);
MRApp app = MRApp app =
@ -320,7 +320,7 @@ public class TestTaskAttempt{
private TaskAttemptImpl createMapTaskAttemptImplForTest( private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) { EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = new SystemClock(); Clock clock = SystemClock.getInstance();
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock); return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
} }
@ -512,7 +512,7 @@ public class TestTaskAttempt{
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), null); SystemClock.getInstance(), null);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -569,7 +569,7 @@ public class TestTaskAttempt{
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -627,7 +627,7 @@ public class TestTaskAttempt{
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -691,7 +691,7 @@ public class TestTaskAttempt{
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
new Token(), new Credentials(), new Token(), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -759,7 +759,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
new Token(), new Credentials(), new SystemClock(), appCtx); new Token(), new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -818,7 +818,7 @@ public class TestTaskAttempt{
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -884,7 +884,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, jobFile, 1, splits, jobConf, taListener,
new Token(), new Credentials(), new SystemClock(), appCtx); new Token(), new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -934,7 +934,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl = TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,mock(Token.class), new Credentials(), splits, jobConf, taListener,mock(Token.class), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1004,7 +1004,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(), jobFile, 1, splits, jobConf, taListener, new Token(),
new Credentials(), new SystemClock(), appCtx); new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1058,7 +1058,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(), jobFile, 1, splits, jobConf, taListener, new Token(),
new Credentials(), new SystemClock(), appCtx); new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1115,7 +1115,7 @@ public class TestTaskAttempt{
TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
jobFile, 1, splits, jobConf, taListener, new Token(), jobFile, 1, splits, jobConf, taListener, new Token(),
new Credentials(), new SystemClock(), appCtx); new Credentials(), SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.2", 0); NodeId nid = NodeId.newInstance("127.0.0.2", 0);
ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); ContainerId contId = ContainerId.newContainerId(appAttemptId, 3);
@ -1376,7 +1376,7 @@ public class TestTaskAttempt{
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener, splits, jobConf, taListener,
mock(Token.class), new Credentials(), mock(Token.class), new Credentials(),
new SystemClock(), appCtx); SystemClock.getInstance(), appCtx);
NodeId nid = NodeId.newInstance("127.0.0.1", 0); NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3); ContainerId contId = ContainerId.newInstance(appAttemptId, 3);

View File

@ -104,7 +104,7 @@ public class TestTaskAttemptContainerRequest {
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
mock(TaskSplitMetaInfo.class), jobConf, taListener, mock(TaskSplitMetaInfo.class), jobConf, taListener,
jobToken, credentials, jobToken, credentials,
new SystemClock(), null); SystemClock.getInstance(), null);
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString()); jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());

View File

@ -243,7 +243,7 @@ public class TestTaskImpl {
jobToken = (Token<JobTokenIdentifier>) mock(Token.class); jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
remoteJobConfFile = mock(Path.class); remoteJobConfFile = mock(Path.class);
credentials = null; credentials = null;
clock = new SystemClock(); clock = SystemClock.getInstance();
metrics = mock(MRAppMetrics.class); metrics = mock(MRAppMetrics.class);
dataLocations = new String[1]; dataLocations = new String[1];

View File

@ -431,7 +431,7 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
// add resources to scheduler // add resources to scheduler
dispatcher.await(); dispatcher.await();
@ -485,7 +485,7 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1)); allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1)); allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
RMContainerAllocator.AssignedRequests assignedRequests = RMContainerAllocator.AssignedRequests assignedRequests =
@ -653,7 +653,7 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, final MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
// request to allocate two reduce priority containers // request to allocate two reduce priority containers
final String[] locations = new String[] { host }; final String[] locations = new String[] { host };
@ -698,7 +698,7 @@ public class TestRMContainerAllocator {
final MockScheduler mockScheduler = new MockScheduler(appAttemptId); final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = MyContainerAllocator allocator =
new MyContainerAllocator(null, conf, appAttemptId, mockJob, new MyContainerAllocator(null, conf, appAttemptId, mockJob,
new SystemClock()) { SystemClock.getInstance()) {
@Override @Override
protected void register() { protected void register() {
} }
@ -790,7 +790,7 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob, new SystemClock()); appAttemptId, mockJob, SystemClock.getInstance());
// add resources to scheduler // add resources to scheduler
MockNM nodeManager1 = rm.registerNode("h1:1234", 1024); MockNM nodeManager1 = rm.registerNode("h1:1234", 1024);
@ -2265,7 +2265,7 @@ public class TestRMContainerAllocator {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1); conf.setInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS, 1);
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
AppContext appContext = mock(AppContext.class); AppContext appContext = mock(AppContext.class);
when(appContext.getClock()).thenReturn(clock); when(appContext.getClock()).thenReturn(clock);
when(appContext.getApplicationID()).thenReturn( when(appContext.getApplicationID()).thenReturn(
@ -2751,7 +2751,7 @@ public class TestRMContainerAllocator {
0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
final MockScheduler mockScheduler = new MockScheduler(appAttemptId); final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
MyContainerAllocator allocator = new MyContainerAllocator(null, conf, MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
appAttemptId, mockJob, new SystemClock()) { appAttemptId, mockJob, SystemClock.getInstance()) {
@Override @Override
protected void register() { protected void register() {
} }

View File

@ -536,7 +536,7 @@ public class HistoryFileManager extends AbstractService {
long maxFSWaitTime = conf.getLong( long maxFSWaitTime = conf.getLong(
JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
createHistoryDirs(new SystemClock(), 10 * 1000, maxFSWaitTime); createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
this.aclsMgr = new JobACLsManager(conf); this.aclsMgr = new JobACLsManager(conf);

View File

@ -185,7 +185,8 @@ public class TestHistoryFileManager {
} }
} }
}.start(); }.start();
testCreateHistoryDirs(dfsCluster.getConfiguration(0), new SystemClock()); testCreateHistoryDirs(dfsCluster.getConfiguration(0),
SystemClock.getInstance());
} }
@Test(expected = YarnRuntimeException.class) @Test(expected = YarnRuntimeException.class)
@ -194,7 +195,7 @@ public class TestHistoryFileManager {
dfsCluster.getFileSystem().setSafeMode( dfsCluster.getFileSystem().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_ENTER); HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode()); Assert.assertTrue(dfsCluster.getFileSystem().isInSafeMode());
final ControlledClock clock = new ControlledClock(new SystemClock()); final ControlledClock clock = new ControlledClock();
clock.setTime(1); clock.setTime(1);
new Thread() { new Thread() {
@Override @Override

View File

@ -57,7 +57,7 @@ public class TestSpeculativeExecutionWithMRApp {
@Test @Test
public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = SystemClock.getInstance();
final ControlledClock clock = new ControlledClock(actualClock); final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis()); clock.setTime(System.currentTimeMillis());
@ -128,7 +128,7 @@ public class TestSpeculativeExecutionWithMRApp {
@Test @Test
public void testSepculateSuccessfulWithUpdateEvents() throws Exception { public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
Clock actualClock = new SystemClock(); Clock actualClock = SystemClock.getInstance();
final ControlledClock clock = new ControlledClock(actualClock); final ControlledClock clock = new ControlledClock(actualClock);
clock.setTime(System.currentTimeMillis()); clock.setTime(System.currentTimeMillis());

View File

@ -95,6 +95,9 @@ Release 2.9.0 - UNRELEASED
YARN-4553. Add cgroups support for docker containers. YARN-4553. Add cgroups support for docker containers.
(Sidharta Seethana via vvasudev) (Sidharta Seethana via vvasudev)
YARN-4526. Make SystemClock singleton so AppSchedulingInfo could use it.
(kasha)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -122,7 +122,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
new HashMap<String, ProcessInfo>(); new HashMap<String, ProcessInfo>();
public ProcfsBasedProcessTree(String pid) { public ProcfsBasedProcessTree(String pid) {
this(pid, PROCFS, new SystemClock()); this(pid, PROCFS, SystemClock.getInstance());
} }
@Override @Override
@ -136,7 +136,7 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
} }
public ProcfsBasedProcessTree(String pid, String procfsDir) { public ProcfsBasedProcessTree(String pid, String procfsDir) {
this(pid, procfsDir, new SystemClock()); this(pid, procfsDir, SystemClock.getInstance());
} }
/** /**

View File

@ -30,7 +30,17 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
*/ */
@Public @Public
@Stable @Stable
public class SystemClock implements Clock { public final class SystemClock implements Clock {
private static final SystemClock INSTANCE = new SystemClock();
public static SystemClock getInstance() {
return INSTANCE;
}
private SystemClock() {
// do nothing
}
public long getTime() { public long getTime() {
return System.currentTimeMillis(); return System.currentTimeMillis();

View File

@ -83,7 +83,7 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {
* @param pid Identifier of the job object. * @param pid Identifier of the job object.
*/ */
public WindowsBasedProcessTree(final String pid) { public WindowsBasedProcessTree(final String pid) {
this(pid, new SystemClock()); this(pid, SystemClock.getInstance());
} }
/** /**

View File

@ -23,7 +23,7 @@ public class ControlledClock implements Clock {
private final Clock actualClock; private final Clock actualClock;
// Convenience for getting a controlled clock with overridden time // Convenience for getting a controlled clock with overridden time
public ControlledClock() { public ControlledClock() {
this(new SystemClock()); this(SystemClock.getInstance());
setTime(0); setTime(0);
} }
public ControlledClock(Clock actualClock) { public ControlledClock(Clock actualClock) {

View File

@ -395,7 +395,7 @@ public class TestProcfsBasedProcessTree {
// test processes // test processes
String[] pids = { "100", "200", "300", "400" }; String[] pids = { "100", "200", "300", "400" };
ControlledClock testClock = new ControlledClock(new SystemClock()); ControlledClock testClock = new ControlledClock();
testClock.setTime(0); testClock.setTime(0);
// create the fake procfs root directory. // create the fake procfs root directory.
File procfsRootDir = new File(TEST_ROOT_DIR, "proc"); File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
@ -575,7 +575,7 @@ public class TestProcfsBasedProcessTree {
// crank up the process tree class. // crank up the process tree class.
ProcfsBasedProcessTree processTree = ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath(), createProcessTree("100", procfsRootDir.getAbsolutePath(),
new SystemClock()); SystemClock.getInstance());
setSmapsInProceTree(processTree, smapEnabled); setSmapsInProceTree(processTree, smapEnabled);
// verify virtual memory // verify virtual memory
@ -774,7 +774,8 @@ public class TestProcfsBasedProcessTree {
setupProcfsRootDir(procfsRootDir); setupProcfsRootDir(procfsRootDir);
// crank up the process tree class. // crank up the process tree class.
createProcessTree(pid, procfsRootDir.getAbsolutePath(), new SystemClock()); createProcessTree(pid, procfsRootDir.getAbsolutePath(),
SystemClock.getInstance());
// Let us not create stat file for pid 100. // Let us not create stat file for pid 100.
Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(pid, Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(pid,
@ -844,7 +845,7 @@ public class TestProcfsBasedProcessTree {
ProcfsBasedProcessTree processTree = ProcfsBasedProcessTree processTree =
createProcessTree("100", procfsRootDir.getAbsolutePath(), createProcessTree("100", procfsRootDir.getAbsolutePath(),
new SystemClock()); SystemClock.getInstance());
// build the process tree. // build the process tree.
processTree.updateProcessTree(); processTree.updateProcessTree();

View File

@ -51,7 +51,7 @@ public class TestWindowsBasedProcessTree {
} }
assertTrue("WindowsBasedProcessTree should be available on Windows", assertTrue("WindowsBasedProcessTree should be available on Windows",
WindowsBasedProcessTree.isAvailable()); WindowsBasedProcessTree.isAvailable());
ControlledClock testClock = new ControlledClock(new SystemClock()); ControlledClock testClock = new ControlledClock();
long elapsedTimeBetweenUpdatesMsec = 0; long elapsedTimeBetweenUpdatesMsec = 0;
testClock.setTime(elapsedTimeBetweenUpdatesMsec); testClock.setTime(elapsedTimeBetweenUpdatesMsec);

View File

@ -99,7 +99,7 @@ public class ContainerImpl implements Container {
private boolean wasLaunched; private boolean wasLaunched;
private long containerLocalizationStartTime; private long containerLocalizationStartTime;
private long containerLaunchStartTime; private long containerLaunchStartTime;
private static Clock clock = new SystemClock(); private static Clock clock = SystemClock.getInstance();
/** The NM-wide configuration - not specific to this container */ /** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf; private final Configuration daemonConf;

View File

@ -88,7 +88,7 @@ class CGroupsHandlerImpl implements CGroupsHandler {
this.controllerPaths = new HashMap<>(); this.controllerPaths = new HashMap<>();
this.rwLock = new ReentrantReadWriteLock(); this.rwLock = new ReentrantReadWriteLock();
this.privilegedOperationExecutor = privilegedOperationExecutor; this.privilegedOperationExecutor = privilegedOperationExecutor;
this.clock = new SystemClock(); this.clock = SystemClock.getInstance();
init(); init();
} }

View File

@ -87,7 +87,7 @@ public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
public CgroupsLCEResourcesHandler() { public CgroupsLCEResourcesHandler() {
this.controllerPaths = new HashMap<String, String>(); this.controllerPaths = new HashMap<String, String>();
clock = new SystemClock(); clock = SystemClock.getInstance();
} }
@Override @Override

View File

@ -83,7 +83,8 @@ public class NodesListManager extends CompositeService implements
if (nodeIpCacheTimeout <= 0) { if (nodeIpCacheTimeout <= 0) {
resolver = new DirectResolver(); resolver = new DirectResolver();
} else { } else {
resolver = new CachedResolver(new SystemClock(), nodeIpCacheTimeout); resolver =
new CachedResolver(SystemClock.getInstance(), nodeIpCacheTimeout);
addIfService(resolver); addIfService(resolver);
} }

View File

@ -97,7 +97,7 @@ public class RMActiveServiceContext {
private RMNodeLabelsManager nodeLabelManager; private RMNodeLabelsManager nodeLabelManager;
private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater; private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
private long epoch; private long epoch;
private Clock systemClock = new SystemClock(); private Clock systemClock = SystemClock.getInstance();
private long schedulerRecoveryStartTime = 0; private long schedulerRecoveryStartTime = 0;
private long schedulerRecoveryWaitTime = 0; private long schedulerRecoveryWaitTime = 0;
private boolean printLog = true; private boolean printLog = true;

View File

@ -136,12 +136,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private RMNodeLabelsManager nlm; private RMNodeLabelsManager nlm;
public ProportionalCapacityPreemptionPolicy() { public ProportionalCapacityPreemptionPolicy() {
clock = new SystemClock(); clock = SystemClock.getInstance();
} }
public ProportionalCapacityPreemptionPolicy(Configuration config, public ProportionalCapacityPreemptionPolicy(Configuration config,
RMContext context, CapacityScheduler scheduler) { RMContext context, CapacityScheduler scheduler) {
this(config, context, scheduler, new SystemClock()); this(config, context, scheduler, SystemClock.getInstance());
} }
public ProportionalCapacityPreemptionPolicy(Configuration config, public ProportionalCapacityPreemptionPolicy(Configuration config,

View File

@ -399,7 +399,7 @@ public class RMAppImpl implements RMApp, Recoverable {
String applicationType, Set<String> applicationTags, String applicationType, Set<String> applicationTags,
ResourceRequest amReq) { ResourceRequest amReq) {
this.systemClock = new SystemClock(); this.systemClock = SystemClock.getInstance();
this.applicationId = applicationId; this.applicationId = applicationId;
this.name = name; this.name = name;

View File

@ -88,7 +88,7 @@ public class AllocationFileLoaderService extends AbstractService {
private volatile boolean running = true; private volatile boolean running = true;
public AllocationFileLoaderService() { public AllocationFileLoaderService() {
this(new SystemClock()); this(SystemClock.getInstance());
} }
public AllocationFileLoaderService(Clock clock) { public AllocationFileLoaderService(Clock clock) {

View File

@ -215,7 +215,7 @@ public class FairScheduler extends
public FairScheduler() { public FairScheduler() {
super(FairScheduler.class.getName()); super(FairScheduler.class.getName());
clock = new SystemClock(); clock = SystemClock.getInstance();
allocsLoader = new AllocationFileLoaderService(); allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this); queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this); maxRunningEnforcer = new MaxRunningAppsEnforcer(this);

View File

@ -1163,7 +1163,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
rm2.start(); rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1.registerNode(); nm1.registerNode();
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
((RMContextImpl)rm2.getRMContext()).setSystemClock(clock); ((RMContextImpl)rm2.getRMContext()).setSystemClock(clock);
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());

View File

@ -820,7 +820,7 @@ public class TestAMRestart {
// current app should be failed. // current app should be failed.
rm1.waitForState(app.getApplicationId(), RMAppState.FAILED); rm1.waitForState(app.getApplicationId(), RMAppState.FAILED);
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
// set window size to 10s // set window size to 10s
RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);; RMAppImpl app1 = (RMAppImpl)rm1.submitApp(200, 10000);;
app1.setSystemClock(clock); app1.setSystemClock(clock);

View File

@ -136,7 +136,7 @@ public class TestNodesListManager {
public void testCachedResolver() throws Exception { public void testCachedResolver() throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG); rootLogger.setLevel(Level.DEBUG);
ControlledClock clock = new ControlledClock(new SystemClock()); ControlledClock clock = new ControlledClock();
clock.setTime(0); clock.setTime(0);
final int CACHE_EXPIRY_INTERVAL_SECS = 30; final int CACHE_EXPIRY_INTERVAL_SECS = 30;
NodesListManager.CachedResolver resolver = NodesListManager.CachedResolver resolver =

View File

@ -41,7 +41,7 @@ public class TestAMLivelinessMonitor {
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000); conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 6000);
final ControlledClock clock = new ControlledClock(new SystemClock()); final ControlledClock clock = new ControlledClock();
clock.setTime(0); clock.setTime(0);
MemoryRMStateStore memStore = new MemoryRMStateStore() { MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override @Override

View File

@ -42,7 +42,7 @@ public class TestFSParentQueue {
AllocationConfiguration allocConf = new AllocationConfiguration(conf); AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>(); notEmptyQueues = new HashSet<FSQueue>();
queueManager = new QueueManager(scheduler) { queueManager = new QueueManager(scheduler) {

View File

@ -41,7 +41,7 @@ public class TestQueueManager {
AllocationConfiguration allocConf = new AllocationConfiguration(conf); AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>(); notEmptyQueues = new HashSet<FSQueue>();
queueManager = new QueueManager(scheduler) { queueManager = new QueueManager(scheduler) {

View File

@ -43,7 +43,7 @@ public class TestFairSchedulerQueueInfo {
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1)); when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1));
SystemClock clock = new SystemClock(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
QueueManager queueManager = new QueueManager(scheduler); QueueManager queueManager = new QueueManager(scheduler);
queueManager.initialize(conf); queueManager.initialize(conf);