YARN-5922. Remove direct references of HBaseTimelineWriter/Reader in core ATS classes. Contributed by Haibo Chen.
This commit is contained in:
parent
c265515725
commit
a5a55a54ab
|
@ -2019,9 +2019,18 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String TIMELINE_SERVICE_WRITER_CLASS =
|
public static final String TIMELINE_SERVICE_WRITER_CLASS =
|
||||||
TIMELINE_SERVICE_PREFIX + "writer.class";
|
TIMELINE_SERVICE_PREFIX + "writer.class";
|
||||||
|
|
||||||
|
public static final String DEFAULT_TIMELINE_SERVICE_WRITER_CLASS =
|
||||||
|
"org.apache.hadoop.yarn.server.timelineservice"
|
||||||
|
+ ".storage.HBaseTimelineWriterImpl";
|
||||||
|
|
||||||
public static final String TIMELINE_SERVICE_READER_CLASS =
|
public static final String TIMELINE_SERVICE_READER_CLASS =
|
||||||
TIMELINE_SERVICE_PREFIX + "reader.class";
|
TIMELINE_SERVICE_PREFIX + "reader.class";
|
||||||
|
|
||||||
|
public static final String DEFAULT_TIMELINE_SERVICE_READER_CLASS =
|
||||||
|
"org.apache.hadoop.yarn.server.timelineservice" +
|
||||||
|
".storage.HBaseTimelineReaderImpl";
|
||||||
|
|
||||||
|
|
||||||
/** The setting that controls how often the timeline collector flushes the
|
/** The setting that controls how often the timeline collector flushes the
|
||||||
* timeline writer.
|
* timeline writer.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -59,10 +58,7 @@ public class TimelineCollectorManager extends AbstractService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
writer = ReflectionUtils.newInstance(conf.getClass(
|
writer = createTimelineWriter(conf);
|
||||||
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
|
||||||
HBaseTimelineWriterImpl.class,
|
|
||||||
TimelineWriter.class), conf);
|
|
||||||
writer.init(conf);
|
writer.init(conf);
|
||||||
// create a single dedicated thread for flushing the writer on a periodic
|
// create a single dedicated thread for flushing the writer on a periodic
|
||||||
// basis
|
// basis
|
||||||
|
@ -75,6 +71,26 @@ public class TimelineCollectorManager extends AbstractService {
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TimelineWriter createTimelineWriter(final Configuration conf) {
|
||||||
|
String timelineWriterClassName = conf.get(
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||||
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_CLASS);
|
||||||
|
LOG.info("Using TimelineWriter: " + timelineWriterClassName);
|
||||||
|
try {
|
||||||
|
Class<?> timelineWriterClazz = Class.forName(timelineWriterClassName);
|
||||||
|
if (TimelineWriter.class.isAssignableFrom(timelineWriterClazz)) {
|
||||||
|
return (TimelineWriter) ReflectionUtils.newInstance(
|
||||||
|
timelineWriterClazz, conf);
|
||||||
|
} else {
|
||||||
|
throw new YarnRuntimeException("Class: " + timelineWriterClassName
|
||||||
|
+ " not instance of " + TimelineWriter.class.getCanonicalName());
|
||||||
|
}
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new YarnRuntimeException("Could not instantiate TimelineWriter: "
|
||||||
|
+ timelineWriterClassName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
||||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||||
|
@ -72,20 +71,33 @@ public class TimelineReaderServer extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
TimelineReader timelineReaderStore = createTimelineReaderStore(conf);
|
TimelineReader timelineReaderStore = createTimelineReaderStore(conf);
|
||||||
|
timelineReaderStore.init(conf);
|
||||||
addService(timelineReaderStore);
|
addService(timelineReaderStore);
|
||||||
timelineReaderManager = createTimelineReaderManager(timelineReaderStore);
|
timelineReaderManager = createTimelineReaderManager(timelineReaderStore);
|
||||||
addService(timelineReaderManager);
|
addService(timelineReaderManager);
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private TimelineReader createTimelineReaderStore(Configuration conf) {
|
private TimelineReader createTimelineReaderStore(final Configuration conf) {
|
||||||
TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
|
String timelineReaderClassName = conf.get(
|
||||||
YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||||
HBaseTimelineReaderImpl.class, TimelineReader.class), conf);
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_READER_CLASS);
|
||||||
LOG.info("Using store " + readerStore.getClass().getName());
|
LOG.info("Using store: " + timelineReaderClassName);
|
||||||
readerStore.init(conf);
|
try {
|
||||||
return readerStore;
|
Class<?> timelineReaderClazz = Class.forName(timelineReaderClassName);
|
||||||
|
if (TimelineReader.class.isAssignableFrom(timelineReaderClazz)) {
|
||||||
|
return (TimelineReader) ReflectionUtils.newInstance(
|
||||||
|
timelineReaderClazz, conf);
|
||||||
|
} else {
|
||||||
|
throw new YarnRuntimeException("Class: " + timelineReaderClassName
|
||||||
|
+ " not instance of " + TimelineReader.class.getCanonicalName());
|
||||||
}
|
}
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new YarnRuntimeException("Could not instantiate TimelineReader: "
|
||||||
|
+ timelineReaderClassName, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private TimelineReaderManager createTimelineReaderManager(
|
private TimelineReaderManager createTimelineReaderManager(
|
||||||
TimelineReader timelineReaderStore) {
|
TimelineReader timelineReaderStore) {
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for TimelineCollectorManager.
|
||||||
|
*/
|
||||||
|
public class TestTimelineCollectorManager{
|
||||||
|
|
||||||
|
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||||
|
public void testTimelineCollectorManagerWithInvalidTimelineWriter() {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||||
|
Object.class.getName());
|
||||||
|
runTimelineCollectorManagerWithConfig(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||||
|
public void testTimelineCollectorManagerWithNonexistentTimelineWriter() {
|
||||||
|
String nonexistentTimelineWriterClass = "org.apache.org.yarn.server." +
|
||||||
|
"timelineservice.storage.XXXXXXXX";
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||||
|
nonexistentTimelineWriterClass);
|
||||||
|
runTimelineCollectorManagerWithConfig(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testTimelineCollectorManagerWithFileSystemWriter() {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||||
|
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
|
||||||
|
runTimelineCollectorManagerWithConfig(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a TimelineCollectorManager with a given configuration.
|
||||||
|
* @param conf configuration to run TimelineCollectorManager with
|
||||||
|
*/
|
||||||
|
private static void runTimelineCollectorManagerWithConfig(
|
||||||
|
final Configuration conf) {
|
||||||
|
TimelineCollectorManager collectorManager =
|
||||||
|
new TimelineCollectorManager("testTimelineCollectorManager");
|
||||||
|
try {
|
||||||
|
collectorManager.init(conf);
|
||||||
|
collectorManager.start();
|
||||||
|
} finally {
|
||||||
|
collectorManager.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.Service.STATE;
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -54,4 +55,46 @@ public class TestTimelineReaderServer {
|
||||||
server.stop();
|
server.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||||
|
public void testTimelineReaderServerWithInvalidTimelineReader() {
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||||
|
"localhost:0");
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||||
|
Object.class.getName());
|
||||||
|
runTimelineReaderServerWithConfig(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||||
|
public void testTimelineReaderServerWithNonexistentTimelineReader() {
|
||||||
|
String nonexistentTimelineReaderClass = "org.apache.org.yarn.server." +
|
||||||
|
"timelineservice.storage.XXXXXXXX";
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||||
|
"localhost:0");
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||||
|
nonexistentTimelineReaderClass);
|
||||||
|
runTimelineReaderServerWithConfig(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run a TimelineReaderServer with a given configuration.
|
||||||
|
* @param conf configuration to run TimelineReaderServer with
|
||||||
|
*/
|
||||||
|
private static void runTimelineReaderServerWithConfig(
|
||||||
|
final Configuration conf) {
|
||||||
|
TimelineReaderServer server = new TimelineReaderServer();
|
||||||
|
try {
|
||||||
|
server.init(conf);
|
||||||
|
server.start();
|
||||||
|
} finally {
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue