MAPREDUCE-6394. Speed up Task processing loop in HsTasksBlock#render(). Contributed by Ray Chiang
This commit is contained in:
parent
32e490b6c0
commit
d0e0ba8010
|
@ -378,6 +378,9 @@ Release 2.8.0 - UNRELEASED
|
|||
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via
|
||||
jlowe)
|
||||
|
||||
MAPREDUCE-6394. Speed up Task processing loop in HsTasksBlock#render()
|
||||
(Ray Chiang via jlowe)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-6314. TestPipeApplication fails on trunk.
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.api.records;
|
||||
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskAttemptReportPBImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
|
||||
import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestTaskAttemptReport {
|
||||
|
||||
@Test
|
||||
public void testSetRawCounters() {
|
||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
report.setRawCounters(rCounters);
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildImplicitRawCounters() {
|
||||
TaskAttemptReportPBImpl report = new TaskAttemptReportPBImpl();
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
report.setRawCounters(rCounters);
|
||||
MRProtos.TaskAttemptReportProto protoVal = report.getProto();
|
||||
Counters counters = report.getCounters();
|
||||
assertTrue(protoVal.hasCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountersOverRawCounters() {
|
||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
Counters altCounters = TypeConverter.toYarn(rCounters);
|
||||
report.setRawCounters(rCounters);
|
||||
report.setCounters(altCounters);
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
assertNotEquals(rCounters, altCounters);
|
||||
assertEquals(counters, altCounters);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUninitializedCounters() {
|
||||
// Create basic class
|
||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||
// Verify properties initialized to null
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetRawCountersToNull() {
|
||||
// Create basic class
|
||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||
// Set raw counters to null
|
||||
report.setRawCounters(null);
|
||||
// Verify properties still null
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetCountersToNull() {
|
||||
// Create basic class
|
||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||
// Set raw counters to null
|
||||
report.setCounters(null);
|
||||
// Verify properties still null
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetNonNullCountersToNull() {
|
||||
// Create basic class
|
||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||
// Set raw counters
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
report.setRawCounters(rCounters);
|
||||
// Verify getCounters converts properly from raw to real
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
// Clear counters to null and then verify
|
||||
report.setCounters(null);
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetNonNullRawCountersToNull() {
|
||||
// Create basic class
|
||||
TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
|
||||
// Set raw counters
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
report.setRawCounters(rCounters);
|
||||
// Verify getCounters converts properly from raw to real
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
// Clear counters to null and then verify
|
||||
report.setRawCounters(null);
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,139 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapreduce.v2.api.records;
|
||||
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskReportPBImpl;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
|
||||
import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestTaskReport {
|
||||
|
||||
@Test
|
||||
public void testSetRawCounters() {
|
||||
// Create basic class
|
||||
TaskReport report = Records.newRecord(TaskReport.class);
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
// Set raw counters
|
||||
report.setRawCounters(rCounters);
|
||||
// Verify getCounters converts properly from raw to real
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuildImplicitRawCounters() {
|
||||
// Create basic class
|
||||
TaskReportPBImpl report = new TaskReportPBImpl();
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
// Set raw counters
|
||||
report.setRawCounters(rCounters);
|
||||
// Verify getProto method implicitly converts/sets real counters
|
||||
MRProtos.TaskReportProto protoVal = report.getProto();
|
||||
assertTrue(protoVal.hasCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountersOverRawCounters() {
|
||||
// Create basic class
|
||||
TaskReport report = Records.newRecord(TaskReport.class);
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
Counters altCounters = TypeConverter.toYarn(rCounters);
|
||||
// Set raw counters
|
||||
report.setRawCounters(rCounters);
|
||||
// Set real counters
|
||||
report.setCounters(altCounters);
|
||||
// Verify real counters has priority over raw
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
assertNotEquals(rCounters, altCounters);
|
||||
assertEquals(counters, altCounters);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUninitializedCounters() {
|
||||
// Create basic class
|
||||
TaskReport report = Records.newRecord(TaskReport.class);
|
||||
// Verify properties initialized to null
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetRawCountersToNull() {
|
||||
// Create basic class
|
||||
TaskReport report = Records.newRecord(TaskReport.class);
|
||||
// Set raw counters to null
|
||||
report.setRawCounters(null);
|
||||
// Verify properties still null
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetCountersToNull() {
|
||||
// Create basic class
|
||||
TaskReport report = Records.newRecord(TaskReport.class);
|
||||
// Set raw counters to null
|
||||
report.setCounters(null);
|
||||
// Verify properties still null
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetNonNullCountersToNull() {
|
||||
// Create basic class
|
||||
TaskReport report = Records.newRecord(TaskReport.class);
|
||||
// Set raw counters
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
report.setRawCounters(rCounters);
|
||||
// Verify getCounters converts properly from raw to real
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
// Clear counters to null and then verify
|
||||
report.setCounters(null);
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetNonNullRawCountersToNull() {
|
||||
// Create basic class
|
||||
TaskReport report = Records.newRecord(TaskReport.class);
|
||||
// Set raw counters
|
||||
org.apache.hadoop.mapreduce.Counters rCounters = MockJobs.newCounters();
|
||||
report.setRawCounters(rCounters);
|
||||
// Verify getCounters converts properly from raw to real
|
||||
Counters counters = report.getCounters();
|
||||
assertNotEquals(null, counters);
|
||||
// Clear counters to null and then verify
|
||||
report.setRawCounters(null);
|
||||
assertEquals(null, report.getCounters());
|
||||
assertEquals(null, report.getRawCounters());
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@ public interface TaskAttemptReport {
|
|||
/** @return the sort/merge finish time. Applicable only for reduce attempts */
|
||||
public abstract long getSortFinishTime();
|
||||
public abstract Counters getCounters();
|
||||
public abstract org.apache.hadoop.mapreduce.Counters getRawCounters();
|
||||
public abstract String getDiagnosticInfo();
|
||||
public abstract String getStateString();
|
||||
public abstract Phase getPhase();
|
||||
|
@ -45,6 +46,8 @@ public interface TaskAttemptReport {
|
|||
public abstract void setStartTime(long startTime);
|
||||
public abstract void setFinishTime(long finishTime);
|
||||
public abstract void setCounters(Counters counters);
|
||||
public abstract void
|
||||
setRawCounters(org.apache.hadoop.mapreduce.Counters rCounters);
|
||||
public abstract void setDiagnosticInfo(String diagnosticInfo);
|
||||
public abstract void setStateString(String stateString);
|
||||
public abstract void setPhase(Phase phase);
|
||||
|
|
|
@ -28,6 +28,7 @@ public interface TaskReport {
|
|||
public abstract long getStartTime();
|
||||
public abstract long getFinishTime();
|
||||
public abstract Counters getCounters();
|
||||
public abstract org.apache.hadoop.mapreduce.Counters getRawCounters();
|
||||
public abstract List<TaskAttemptId> getRunningAttemptsList();
|
||||
public abstract TaskAttemptId getRunningAttempt(int index);
|
||||
public abstract int getRunningAttemptsCount();
|
||||
|
@ -46,7 +47,9 @@ public interface TaskReport {
|
|||
public abstract void setStartTime(long startTime);
|
||||
public abstract void setFinishTime(long finishTime);
|
||||
public abstract void setCounters(Counters counters);
|
||||
|
||||
public abstract void
|
||||
setRawCounters(org.apache.hadoop.mapreduce.Counters rCounters);
|
||||
|
||||
public abstract void addAllRunningAttempts(List<TaskAttemptId> taskAttempts);
|
||||
public abstract void addRunningAttempt(TaskAttemptId taskAttempt);
|
||||
public abstract void removeRunningAttempt(int index);
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
|
||||
|
||||
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
|
@ -42,12 +42,12 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
|
|||
TaskAttemptReportProto proto = TaskAttemptReportProto.getDefaultInstance();
|
||||
TaskAttemptReportProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
|
||||
private TaskAttemptId taskAttemptId = null;
|
||||
private Counters counters = null;
|
||||
private org.apache.hadoop.mapreduce.Counters rawCounters = null;
|
||||
private ContainerId containerId = null;
|
||||
|
||||
|
||||
|
||||
public TaskAttemptReportPBImpl() {
|
||||
builder = TaskAttemptReportProto.newBuilder();
|
||||
}
|
||||
|
@ -68,6 +68,7 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
|
|||
if (this.taskAttemptId != null) {
|
||||
builder.setTaskAttemptId(convertToProtoFormat(this.taskAttemptId));
|
||||
}
|
||||
convertRawCountersToCounters();
|
||||
if (this.counters != null) {
|
||||
builder.setCounters(convertToProtoFormat(this.counters));
|
||||
}
|
||||
|
@ -90,11 +91,12 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
|
|||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Counters getCounters() {
|
||||
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
convertRawCountersToCounters();
|
||||
if (this.counters != null) {
|
||||
return this.counters;
|
||||
}
|
||||
|
@ -108,10 +110,32 @@ public class TaskAttemptReportPBImpl extends ProtoBase<TaskAttemptReportProto> i
|
|||
@Override
|
||||
public void setCounters(Counters counters) {
|
||||
maybeInitBuilder();
|
||||
if (counters == null)
|
||||
if (counters == null) {
|
||||
builder.clearCounters();
|
||||
}
|
||||
this.counters = counters;
|
||||
this.rawCounters = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.mapreduce.Counters
|
||||
getRawCounters() {
|
||||
return this.rawCounters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRawCounters(org.apache.hadoop.mapreduce.Counters rCounters) {
|
||||
setCounters(null);
|
||||
this.rawCounters = rCounters;
|
||||
}
|
||||
|
||||
private void convertRawCountersToCounters() {
|
||||
if (this.counters == null && this.rawCounters != null) {
|
||||
this.counters = TypeConverter.toYarn(rawCounters);
|
||||
this.rawCounters = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getStartTime() {
|
||||
TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
|
@ -37,21 +38,19 @@ import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskStateProto;
|
|||
import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
|
||||
|
||||
|
||||
|
||||
public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements TaskReport {
|
||||
TaskReportProto proto = TaskReportProto.getDefaultInstance();
|
||||
TaskReportProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
|
||||
private TaskId taskId = null;
|
||||
private Counters counters = null;
|
||||
private org.apache.hadoop.mapreduce.Counters rawCounters = null;
|
||||
private List<TaskAttemptId> runningAttempts = null;
|
||||
private TaskAttemptId successfulAttemptId = null;
|
||||
private List<String> diagnostics = null;
|
||||
private String status;
|
||||
|
||||
|
||||
|
||||
public TaskReportPBImpl() {
|
||||
builder = TaskReportProto.newBuilder();
|
||||
}
|
||||
|
@ -72,6 +71,7 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
|
|||
if (this.taskId != null) {
|
||||
builder.setTaskId(convertToProtoFormat(this.taskId));
|
||||
}
|
||||
convertRawCountersToCounters();
|
||||
if (this.counters != null) {
|
||||
builder.setCounters(convertToProtoFormat(this.counters));
|
||||
}
|
||||
|
@ -100,11 +100,11 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
|
|||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Counters getCounters() {
|
||||
TaskReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
convertRawCountersToCounters();
|
||||
if (this.counters != null) {
|
||||
return this.counters;
|
||||
}
|
||||
|
@ -118,10 +118,32 @@ public class TaskReportPBImpl extends ProtoBase<TaskReportProto> implements Task
|
|||
@Override
|
||||
public void setCounters(Counters counters) {
|
||||
maybeInitBuilder();
|
||||
if (counters == null)
|
||||
if (counters == null) {
|
||||
builder.clearCounters();
|
||||
}
|
||||
this.counters = counters;
|
||||
this.rawCounters = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.mapreduce.Counters
|
||||
getRawCounters() {
|
||||
return this.rawCounters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRawCounters(org.apache.hadoop.mapreduce.Counters rCounters) {
|
||||
setCounters(null);
|
||||
this.rawCounters = rCounters;
|
||||
}
|
||||
|
||||
private void convertRawCountersToCounters() {
|
||||
if (this.counters == null && this.rawCounters != null) {
|
||||
this.counters = TypeConverter.toYarn(rawCounters);
|
||||
this.rawCounters = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getStartTime() {
|
||||
TaskReportProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
|
|
@ -135,7 +135,7 @@ public class CompletedTask implements Task {
|
|||
if (counters == null) {
|
||||
counters = EMPTY_COUNTERS;
|
||||
}
|
||||
report.setCounters(TypeConverter.toYarn(counters));
|
||||
report.setRawCounters(counters);
|
||||
if (successfulAttempt != null) {
|
||||
report.setSuccessfulAttempt(successfulAttempt);
|
||||
}
|
||||
|
|
|
@ -170,7 +170,7 @@ public class CompletedTaskAttempt implements TaskAttempt {
|
|||
}
|
||||
// report.setPhase(attemptInfo.get); //TODO
|
||||
report.setStateString(attemptInfo.getState());
|
||||
report.setCounters(TypeConverter.toYarn(getCounters()));
|
||||
report.setRawCounters(getCounters());
|
||||
report.setContainerId(attemptInfo.getContainerId());
|
||||
if (attemptInfo.getHostname() == null) {
|
||||
report.setNodeManagerHost("UNKNOWN");
|
||||
|
|
Loading…
Reference in New Issue