YARN-6695. Fixed NPE in publishing appFinished events to ATSv2.
Contributed by Prabhu Joseph
(cherry picked from commit df76cdc895
)
This commit is contained in:
parent
546ed4937c
commit
68a98be8a2
|
@ -454,10 +454,15 @@ public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
|
|||
}
|
||||
TimelineCollector timelineCollector =
|
||||
rmTimelineCollectorManager.get(appId);
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
entities.addEntity(entity);
|
||||
timelineCollector.putEntities(entities,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
if (timelineCollector != null) {
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
entities.addEntity(entity);
|
||||
timelineCollector.putEntities(entities,
|
||||
UserGroupInformation.getCurrentUser());
|
||||
} else {
|
||||
LOG.debug("Cannot find active collector while publishing entity "
|
||||
+ entity);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error when publishing entity " + entity);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -28,12 +28,17 @@ import java.io.BufferedReader;
|
|||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -291,6 +296,48 @@ public class TestSystemMetricsPublisherForV2 {
|
|||
ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE, 0, 0);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testPutEntityWhenNoCollector() throws Exception {
|
||||
// Validating the logs as DrainDispatcher won't throw exception
|
||||
class TestAppender extends AppenderSkeleton {
|
||||
private final List<LoggingEvent> log = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public boolean requiresLayout() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void append(final LoggingEvent loggingEvent) {
|
||||
log.add(loggingEvent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
public List<LoggingEvent> getLog() {
|
||||
return new ArrayList<>(log);
|
||||
}
|
||||
}
|
||||
|
||||
TestAppender appender = new TestAppender();
|
||||
final Logger logger = Logger.getRootLogger();
|
||||
logger.addAppender(appender);
|
||||
|
||||
try {
|
||||
RMApp app = createRMApp(ApplicationId.newInstance(0, 1));
|
||||
metricsPublisher.appCreated(app, app.getStartTime());
|
||||
dispatcher.await();
|
||||
for (LoggingEvent event : appender.getLog()) {
|
||||
assertFalse("Dispatcher Crashed",
|
||||
event.getRenderedMessage().contains("Error in dispatcher thread"));
|
||||
}
|
||||
} finally {
|
||||
logger.removeAppender(appender);
|
||||
}
|
||||
}
|
||||
|
||||
private RMApp createAppAndRegister(ApplicationId appId) {
|
||||
RMApp app = createRMApp(appId);
|
||||
|
||||
|
|
|
@ -295,14 +295,6 @@ Following are the basic configurations to start Timeline service v.2:
|
|||
<name>yarn.system-metrics-publisher.enabled</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The setting that controls whether yarn container events are
|
||||
published to the timeline service or not by RM. This configuration setting
|
||||
is for ATS V2.</description>
|
||||
<name>yarn.rm.system-metrics-publisher.emit-container-events</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
```
|
||||
|
||||
In addition, you may want to set the YARN cluster name to a reasonably unique value in case you
|
||||
|
|
Loading…
Reference in New Issue