YARN-7537. Add ability to load hbase config from distributed file system.
Contributed by Prabhu Joseph
This commit is contained in:
parent
bcacb57114
commit
6110af2d1d
|
@ -74,6 +74,19 @@
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-hdfs</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-hdfs</artifactId>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.hadoop</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>hadoop-yarn-api</artifactId>
|
<artifactId>hadoop-yarn-api</artifactId>
|
||||||
|
|
|
@ -17,11 +17,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||||
|
|
||||||
import java.net.MalformedURLException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.client.Query;
|
import org.apache.hadoop.hbase.client.Query;
|
||||||
|
@ -40,7 +42,6 @@ public final class HBaseTimelineStorageUtils {
|
||||||
private HBaseTimelineStorageUtils() {
|
private HBaseTimelineStorageUtils() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param conf YARN configuration. Used to see if there is an explicit config
|
* @param conf YARN configuration. Used to see if there is an explicit config
|
||||||
* pointing to the HBase config file to read. It should not be null
|
* pointing to the HBase config file to read. It should not be null
|
||||||
|
@ -48,28 +49,33 @@ public final class HBaseTimelineStorageUtils {
|
||||||
* @return a configuration with the HBase configuration from the classpath,
|
* @return a configuration with the HBase configuration from the classpath,
|
||||||
* optionally overwritten by the timeline service configuration URL if
|
* optionally overwritten by the timeline service configuration URL if
|
||||||
* specified.
|
* specified.
|
||||||
* @throws MalformedURLException if a timeline service HBase configuration URL
|
* @throws IOException if a timeline service HBase configuration URL
|
||||||
* is specified but is a malformed URL.
|
* is specified but unable to read it.
|
||||||
*/
|
*/
|
||||||
public static Configuration getTimelineServiceHBaseConf(Configuration conf)
|
public static Configuration getTimelineServiceHBaseConf(Configuration conf)
|
||||||
throws MalformedURLException {
|
throws IOException {
|
||||||
if (conf == null) {
|
if (conf == null) {
|
||||||
throw new NullPointerException();
|
throw new NullPointerException();
|
||||||
}
|
}
|
||||||
|
|
||||||
Configuration hbaseConf;
|
Configuration hbaseConf;
|
||||||
String timelineServiceHBaseConfFileURL =
|
String timelineServiceHBaseConfFilePath =
|
||||||
conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
|
conf.get(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE);
|
||||||
if (timelineServiceHBaseConfFileURL != null
|
|
||||||
&& timelineServiceHBaseConfFileURL.length() > 0) {
|
if (timelineServiceHBaseConfFilePath != null
|
||||||
|
&& timelineServiceHBaseConfFilePath.length() > 0) {
|
||||||
LOG.info("Using hbase configuration at " +
|
LOG.info("Using hbase configuration at " +
|
||||||
timelineServiceHBaseConfFileURL);
|
timelineServiceHBaseConfFilePath);
|
||||||
// create a clone so that we don't mess with out input one
|
// create a clone so that we don't mess with out input one
|
||||||
hbaseConf = new Configuration(conf);
|
hbaseConf = new Configuration(conf);
|
||||||
Configuration plainHBaseConf = new Configuration(false);
|
Configuration plainHBaseConf = new Configuration(false);
|
||||||
URL hbaseSiteXML = new URL(timelineServiceHBaseConfFileURL);
|
Path hbaseConfigPath = new Path(timelineServiceHBaseConfFilePath);
|
||||||
plainHBaseConf.addResource(hbaseSiteXML);
|
try (FileSystem fs =
|
||||||
|
FileSystem.newInstance(hbaseConfigPath.toUri(), conf);
|
||||||
|
FSDataInputStream in = fs.open(hbaseConfigPath)) {
|
||||||
|
plainHBaseConf.addResource(in);
|
||||||
HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
|
HBaseConfiguration.merge(hbaseConf, plainHBaseConf);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// default to what is on the classpath
|
// default to what is on the classpath
|
||||||
hbaseConf = HBaseConfiguration.create(conf);
|
hbaseConf = HBaseConfiguration.create(conf);
|
||||||
|
|
|
@ -18,16 +18,90 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
package org.apache.hadoop.yarn.server.timelineservice.storage.common;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for HBaseTimelineStorageUtils static methos.
|
* Unit tests for HBaseTimelineStorageUtils static methos.
|
||||||
*/
|
*/
|
||||||
public class TestHBaseTimelineStorageUtils {
|
public class TestHBaseTimelineStorageUtils {
|
||||||
|
|
||||||
|
private String hbaseConfigPath = "target/hbase-site.xml";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
// Input Hbase Configuration
|
||||||
|
Configuration hbaseConf = new Configuration();
|
||||||
|
hbaseConf.set("input", "test");
|
||||||
|
|
||||||
|
//write the document to a buffer (not directly to the file, as that
|
||||||
|
//can cause the file being written to get read which will then fail.
|
||||||
|
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||||
|
hbaseConf.writeXml(bytesOut);
|
||||||
|
bytesOut.close();
|
||||||
|
|
||||||
|
//write the bytes to the file
|
||||||
|
File file = new File(hbaseConfigPath);
|
||||||
|
OutputStream os = new FileOutputStream(file);
|
||||||
|
os.write(bytesOut.toByteArray());
|
||||||
|
os.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(expected=NullPointerException.class)
|
@Test(expected=NullPointerException.class)
|
||||||
public void testGetTimelineServiceHBaseConfNullArgument() throws Exception {
|
public void testGetTimelineServiceHBaseConfNullArgument() throws Exception {
|
||||||
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null);
|
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithHbaseConfAtLocalFileSystem() throws IOException {
|
||||||
|
// Verifying With Hbase Conf from Local FileSystem
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE,
|
||||||
|
hbaseConfigPath);
|
||||||
|
Configuration hbaseConfFromLocal =
|
||||||
|
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
||||||
|
Assert.assertEquals("Failed to read hbase config from Local FileSystem",
|
||||||
|
"test", hbaseConfFromLocal.get("input"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithHbaseConfAtHdfsFileSystem() throws IOException {
|
||||||
|
MiniDFSCluster hdfsCluster = null;
|
||||||
|
try {
|
||||||
|
HdfsConfiguration hdfsConfig = new HdfsConfiguration();
|
||||||
|
hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
|
||||||
|
.numDataNodes(1).build();
|
||||||
|
|
||||||
|
FileSystem fs = hdfsCluster.getFileSystem();
|
||||||
|
Path path = new Path("/tmp/hdfs-site.xml");
|
||||||
|
fs.copyFromLocalFile(new Path(hbaseConfigPath), path);
|
||||||
|
|
||||||
|
// Verifying With Hbase Conf from HDFS FileSystem
|
||||||
|
Configuration conf = new Configuration(hdfsConfig);
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_HBASE_CONFIGURATION_FILE,
|
||||||
|
path.toString());
|
||||||
|
Configuration hbaseConfFromHdfs =
|
||||||
|
HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf);
|
||||||
|
Assert.assertEquals("Failed to read hbase config from Hdfs FileSystem",
|
||||||
|
"test", hbaseConfFromHdfs.get("input"));
|
||||||
|
} finally {
|
||||||
|
if (hdfsCluster != null) {
|
||||||
|
hdfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue