YARN-5922. Remove direct references of HBaseTimelineWriter/Reader in core ATS classes. Contributed by Haibo Chen.
(cherry picked from commit a5a55a54ab
)
(cherry picked from commit 17c4ab7ebb51088caf36fafedae8c256481eeed5)
This commit is contained in:
parent
a5371f6b6e
commit
54705f549f
|
@ -2194,9 +2194,18 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String TIMELINE_SERVICE_WRITER_CLASS =
|
||||
TIMELINE_SERVICE_PREFIX + "writer.class";
|
||||
|
||||
public static final String DEFAULT_TIMELINE_SERVICE_WRITER_CLASS =
|
||||
"org.apache.hadoop.yarn.server.timelineservice"
|
||||
+ ".storage.HBaseTimelineWriterImpl";
|
||||
|
||||
public static final String TIMELINE_SERVICE_READER_CLASS =
|
||||
TIMELINE_SERVICE_PREFIX + "reader.class";
|
||||
|
||||
public static final String DEFAULT_TIMELINE_SERVICE_READER_CLASS =
|
||||
"org.apache.hadoop.yarn.server.timelineservice" +
|
||||
".storage.HBaseTimelineReaderImpl";
|
||||
|
||||
|
||||
/**
|
||||
* default schema prefix for hbase tables.
|
||||
*/
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.hadoop.util.ReflectionUtils;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -59,10 +58,7 @@ public class TimelineCollectorManager extends AbstractService {
|
|||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
writer = ReflectionUtils.newInstance(conf.getClass(
|
||||
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||
HBaseTimelineWriterImpl.class,
|
||||
TimelineWriter.class), conf);
|
||||
writer = createTimelineWriter(conf);
|
||||
writer.init(conf);
|
||||
// create a single dedicated thread for flushing the writer on a periodic
|
||||
// basis
|
||||
|
@ -75,6 +71,26 @@ public class TimelineCollectorManager extends AbstractService {
|
|||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
private TimelineWriter createTimelineWriter(final Configuration conf) {
|
||||
String timelineWriterClassName = conf.get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_CLASS);
|
||||
LOG.info("Using TimelineWriter: " + timelineWriterClassName);
|
||||
try {
|
||||
Class<?> timelineWriterClazz = Class.forName(timelineWriterClassName);
|
||||
if (TimelineWriter.class.isAssignableFrom(timelineWriterClazz)) {
|
||||
return (TimelineWriter) ReflectionUtils.newInstance(
|
||||
timelineWriterClazz, conf);
|
||||
} else {
|
||||
throw new YarnRuntimeException("Class: " + timelineWriterClassName
|
||||
+ " not instance of " + TimelineWriter.class.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new YarnRuntimeException("Could not instantiate TimelineWriter: "
|
||||
+ timelineWriterClassName, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
|
|
|
@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||
|
@ -72,21 +71,34 @@ public class TimelineReaderServer extends CompositeService {
|
|||
}
|
||||
|
||||
TimelineReader timelineReaderStore = createTimelineReaderStore(conf);
|
||||
timelineReaderStore.init(conf);
|
||||
addService(timelineReaderStore);
|
||||
timelineReaderManager = createTimelineReaderManager(timelineReaderStore);
|
||||
addService(timelineReaderManager);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
private TimelineReader createTimelineReaderStore(Configuration conf) {
|
||||
TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass(
|
||||
private TimelineReader createTimelineReaderStore(final Configuration conf) {
|
||||
String timelineReaderClassName = conf.get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||
HBaseTimelineReaderImpl.class, TimelineReader.class), conf);
|
||||
LOG.info("Using store " + readerStore.getClass().getName());
|
||||
readerStore.init(conf);
|
||||
return readerStore;
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_READER_CLASS);
|
||||
LOG.info("Using store: " + timelineReaderClassName);
|
||||
try {
|
||||
Class<?> timelineReaderClazz = Class.forName(timelineReaderClassName);
|
||||
if (TimelineReader.class.isAssignableFrom(timelineReaderClazz)) {
|
||||
return (TimelineReader) ReflectionUtils.newInstance(
|
||||
timelineReaderClazz, conf);
|
||||
} else {
|
||||
throw new YarnRuntimeException("Class: " + timelineReaderClassName
|
||||
+ " not instance of " + TimelineReader.class.getCanonicalName());
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new YarnRuntimeException("Could not instantiate TimelineReader: "
|
||||
+ timelineReaderClassName, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private TimelineReaderManager createTimelineReaderManager(
|
||||
TimelineReader timelineReaderStore) {
|
||||
return new TimelineReaderManager(timelineReaderStore);
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.timelineservice.collector;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit tests for TimelineCollectorManager.
|
||||
*/
|
||||
public class TestTimelineCollectorManager{
|
||||
|
||||
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||
public void testTimelineCollectorManagerWithInvalidTimelineWriter() {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||
Object.class.getName());
|
||||
runTimelineCollectorManagerWithConfig(conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||
public void testTimelineCollectorManagerWithNonexistentTimelineWriter() {
|
||||
String nonexistentTimelineWriterClass = "org.apache.org.yarn.server." +
|
||||
"timelineservice.storage.XXXXXXXX";
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||
nonexistentTimelineWriterClass);
|
||||
runTimelineCollectorManagerWithConfig(conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTimelineCollectorManagerWithFileSystemWriter() {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
|
||||
runTimelineCollectorManagerWithConfig(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a TimelineCollectorManager with a given configuration.
|
||||
* @param conf configuration to run TimelineCollectorManager with
|
||||
*/
|
||||
private static void runTimelineCollectorManagerWithConfig(
|
||||
final Configuration conf) {
|
||||
TimelineCollectorManager collectorManager =
|
||||
new TimelineCollectorManager("testTimelineCollectorManager");
|
||||
try {
|
||||
collectorManager.init(conf);
|
||||
collectorManager.start();
|
||||
} finally {
|
||||
collectorManager.stop();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.Service.STATE;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
|
||||
import org.junit.Test;
|
||||
|
@ -54,4 +55,46 @@ public class TestTimelineReaderServer {
|
|||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||
public void testTimelineReaderServerWithInvalidTimelineReader() {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||
"localhost:0");
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||
Object.class.getName());
|
||||
runTimelineReaderServerWithConfig(conf);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000, expected = YarnRuntimeException.class)
|
||||
public void testTimelineReaderServerWithNonexistentTimelineReader() {
|
||||
String nonexistentTimelineReaderClass = "org.apache.org.yarn.server." +
|
||||
"timelineservice.storage.XXXXXXXX";
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||
"localhost:0");
|
||||
conf.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||
nonexistentTimelineReaderClass);
|
||||
runTimelineReaderServerWithConfig(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a TimelineReaderServer with a given configuration.
|
||||
* @param conf configuration to run TimelineReaderServer with
|
||||
*/
|
||||
private static void runTimelineReaderServerWithConfig(
|
||||
final Configuration conf) {
|
||||
TimelineReaderServer server = new TimelineReaderServer();
|
||||
try {
|
||||
server.init(conf);
|
||||
server.start();
|
||||
} finally {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue