HDFS-9628. libhdfs++: implement builder apis from C bindings. Contributed by Bob Hansen.

This commit is contained in:
James 2016-01-14 10:58:55 -05:00 committed by James Clampffer
parent dc335474a6
commit 1017ccabeb
15 changed files with 499 additions and 41 deletions

View File

@ -168,21 +168,17 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<goals><goal>run</goal></goals>
<configuration>
<skip>${skipTests}</skip>
<target>
<property name="compile_classpath" refid="maven.compile.classpath"/>
<property name="test_classpath" refid="maven.test.classpath"/>
<exec executable="ctest" failonerror="true" dir="${project.build.directory}/">
<arg line="${native_ctest_args}"/>
<env key="CLASSPATH" value="${test_classpath}:${compile_classpath}"/>
<!-- HADOOP_HOME required to find winutils. -->
<env key="HADOOP_HOME" value="${hadoop.common.build.dir}"/>
<!-- Make sure hadoop.dll and jvm.dll are on PATH. -->
<env key="PATH" value="${env.PATH};${hadoop.common.build.dir}/bin;${java.home}/jre/bin/server;${java.home}/bin/server"/>
<!-- Make sure libhadoop.so is on LD_LIBRARY_PATH. -->
<env key="LD_LIBRARY_PATH" value="${env.LD_LIBRARY_PATH}:${project.build.directory}/target/usr/local/lib:${hadoop.common.build.dir}/native/target/usr/local/lib"/>
<arg line="${native_ctest_args}"/>
</exec>
</target>
<target>
<property name="compile_classpath" refid="maven.compile.classpath"/>
<property name="test_classpath" refid="maven.test.classpath"/>
<exec executable="ctest" failonerror="true" dir="${project.build.directory}/">
<arg line="--output-on-failure"/>
<arg line="${native_ctest_args}"/>
<env key="CLASSPATH" value="${test_classpath}:${compile_classpath}"/>
<!-- Make sure libhadoop.so is on LD_LIBRARY_PATH. -->
<env key="LD_LIBRARY_PATH" value="${env.LD_LIBRARY_PATH}:${project.build.directory}/target/usr/local/lib:${hadoop.common.build.dir}/native/target/usr/local/lib"/>
</exec>
</target>
</configuration>
</execution>
</executions>
@ -237,11 +233,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<property name="compile_classpath" refid="maven.compile.classpath"/>
<property name="test_classpath" refid="maven.test.classpath"/>
<exec executable="ctest" failonerror="true" dir="${project.build.directory}/">
<arg line="--output-on-failure"/>
<arg line="${native_ctest_args}"/>
<env key="CLASSPATH" value="${test_classpath}:${compile_classpath}"/>
<!-- Make sure libhadoop.so is on LD_LIBRARY_PATH. -->
<env key="LD_LIBRARY_PATH" value="${env.LD_LIBRARY_PATH}:${project.build.directory}/target/usr/local/lib:${hadoop.common.build.dir}/native/target/usr/local/lib"/>
<arg line="${native_ctest_args}"/>
</exec>
</target>
</configuration>

View File

@ -63,5 +63,42 @@ LIBHDFS_EXTERNAL
void hdfsGetLastError(char *buf, int len);
/**
* Create an HDFS builder, using the configuration XML files from the indicated
* directory. If the directory does not exist, or contains no configuration
* XML files, a Builder using all default values will be returned.
*
* @return The HDFS builder, or NULL on error.
*/
struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory);
/**
* Get a configuration string from the settings currently read into the builder.
*
* @param key The key to find
* @param val (out param) The value. This will be set to NULL if the
* key isn't found. You must free this string with
* hdfsConfStrFree.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
char **val);
/**
* Get a configuration integer from the settings currently read into the builder.
*
* @param key The key to find
* @param val (out param) The value. This will NOT be changed if the
* key isn't found.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val);
} /* end extern "C" */
#endif

View File

@ -16,7 +16,11 @@
* limitations under the License.
*/
#include "hdfspp/hdfspp.h"
#include "fs/filesystem.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include <hdfs/hdfs.h>
#include <hdfspp/hdfs_ext.h>
@ -28,7 +32,7 @@
using namespace hdfs;
/* Seperate the handles used by the C api from the C++ API*/
/* Separate the handles used by the C api from the C++ API*/
struct hdfs_internal {
hdfs_internal(FileSystem *p) : filesystem_(p) {}
hdfs_internal(std::unique_ptr<FileSystem> p)
@ -73,6 +77,20 @@ void hdfsGetLastError(char *buf, int len) {
buf[copylen] = 0;
}
struct hdfsBuilder {
hdfsBuilder();
hdfsBuilder(const char * directory);
virtual ~hdfsBuilder() {}
ConfigurationLoader loader;
HdfsConfiguration config;
std::string overrideHost;
tPort overridePort; // 0 --> use default
static constexpr tPort kUseDefaultPort = 0;
static constexpr tPort kDefaultPort = 8020;
};
/* Error handling with optional debug to stderr */
static void ReportError(int errnum, std::string msg) {
errno = errnum;
@ -248,3 +266,155 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
return offset;
}
/*******************************************************************
* BUILDER INTERFACE
*******************************************************************/
HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
{
optional<HdfsConfiguration> result = loader.LoadDefaultResources<HdfsConfiguration>();
if (result)
{
return result.value();
}
else
{
return loader.New<HdfsConfiguration>();
}
}
hdfsBuilder::hdfsBuilder() : config(LoadDefault(loader)), overridePort(kUseDefaultPort)
{
}
hdfsBuilder::hdfsBuilder(const char * directory) :
config(loader.New<HdfsConfiguration>()), overridePort(kUseDefaultPort)
{
loader.SetSearchPath(directory);
config = LoadDefault(loader);
}
struct hdfsBuilder *hdfsNewBuilder(void)
{
return new struct hdfsBuilder();
}
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
{
bld->overrideHost = nn;
}
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
{
bld->overridePort = port;
}
void hdfsFreeBuilder(struct hdfsBuilder *bld)
{
delete bld;
}
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
const char *val)
{
optional<HdfsConfiguration> newConfig = bld->loader.OverlayValue(bld->config, key, val);
if (newConfig)
{
bld->config = newConfig.value();
return 0;
}
else
{
return 1;
}
}
void hdfsConfStrFree(char *val)
{
if (val)
{
free(val);
}
}
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
if (!bld->overrideHost.empty())
{
// TODO: pass rest of config once we get that done (HDFS-9556)
tPort port = bld->overridePort;
if (port == hdfsBuilder::kUseDefaultPort)
{
port = hdfsBuilder::kDefaultPort;
}
return hdfsConnect(bld->overrideHost.c_str(), port);
}
else
{
//TODO: allow construction from default port once that is done (HDFS-9556)
ReportError(EINVAL, "No host provided to builder in hdfsBuilderConnect");
return nullptr;
}
}
int hdfsConfGetStr(const char *key, char **val)
{
hdfsBuilder builder;
return hdfsBuilderConfGetStr(&builder, key, val);
}
int hdfsConfGetInt(const char *key, int32_t *val)
{
hdfsBuilder builder;
return hdfsBuilderConfGetInt(&builder, key, val);
}
//
// Extended builder interface
//
struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
{
return new struct hdfsBuilder(configDirectory);
}
int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
char **val)
{
optional<std::string> value = bld->config.Get(key);
if (value)
{
size_t len = value->length() + 1;
*val = static_cast<char *>(malloc(len));
strncpy(*val, value->c_str(), len);
}
else
{
*val = nullptr;
}
return 0;
}
// If we're running on a 32-bit platform, we might get 64-bit values that
// don't fit in an int, and int is specified by the java hdfs.h interface
bool isValidInt(int64_t value)
{
return (value >= std::numeric_limits<int>::min() &&
value <= std::numeric_limits<int>::max());
}
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
{
// Pull from default configuration
optional<int64_t> value = bld->config.GetInt(key);
if (value)
{
if (!isValidInt(*value))
return 1;
*val = *value;
}
// If not found, don't change val
return 0;
}

View File

@ -52,7 +52,8 @@ std::vector<std::string> Configuration::GetDefaultFilenames() {
optional<std::string> Configuration::Get(const std::string& key) const {
auto found = raw_values_.find(key);
std::string caseFixedKey = fixCase(key);
auto found = raw_values_.find(caseFixedKey);
if (found != raw_values_.end()) {
return std::experimental::make_optional(found->second.value);
} else {

View File

@ -82,10 +82,20 @@ protected:
Configuration() {};
Configuration(ConfigMap &src_map) : raw_values_(src_map){};
Configuration(const ConfigMap &src_map) : raw_values_(src_map){};
static std::vector<std::string> GetDefaultFilenames();
const ConfigMap raw_values_;
// While we want this to be const, it would preclude copying Configuration
// objects. The Configuration class must not allow any mutations of
// the raw_values
ConfigMap raw_values_;
static std::string fixCase(const std::string &in) {
std::string result(in);
for (auto & c: result) c = toupper(c);
return result;
}
};
}

View File

@ -41,6 +41,10 @@ static const char kFileSeparator = '/';
static const char kSearchPathSeparator = ':';
bool is_valid_bool(const std::string& raw) {
if (raw.empty()) {
return false;
}
if (!strcasecmp(raw.c_str(), "true")) {
return true;
}
@ -198,36 +202,48 @@ bool ConfigurationLoader::UpdateMapWithBytes(ConfigMap& map,
auto value_node = property_node->first_node("value", 0, false);
if (name_node && value_node) {
auto mapValue = map.find(name_node->value());
if (mapValue != map.end() && mapValue->second.final) {
continue;
}
map[name_node->value()] = value_node->value();
std::string final_value;
auto final_node = property_node->first_node("final", 0, false);
if (final_node && is_valid_bool(final_node->value())) {
map[name_node->value()].final = str_to_bool(final_node->value());
if (final_node) {
final_value = final_node->value();
}
UpdateMapWithValue(map, name_node->value(), value_node->value(), final_value);
}
auto name_attr = property_node->first_attribute("name", 0, false);
auto value_attr = property_node->first_attribute("value", 0, false);
if (name_attr && value_attr) {
auto mapValue = map.find(name_attr->value());
if (mapValue != map.end() && mapValue->second.final) {
continue;
}
map[name_attr->value()] = value_attr->value();
std::string final_value;
auto final_attr = property_node->first_attribute("final", 0, false);
if (final_attr && is_valid_bool(final_attr->value())) {
map[name_attr->value()].final = str_to_bool(final_attr->value());
if (final_attr) {
final_value = final_attr->value();
}
UpdateMapWithValue(map, name_attr->value(), value_attr->value(), final_value);
}
}
return true;
}
bool ConfigurationLoader::UpdateMapWithValue(ConfigMap& map,
const std::string& key, const std::string& value,
const std::string& final_text)
{
std::string caseFixedKey = Configuration::fixCase(key);
auto mapValue = map.find(caseFixedKey);
if (mapValue != map.end() && mapValue->second.final) {
return false;
}
bool final_value = false;
if (is_valid_bool(final_text)) {
final_value = str_to_bool(final_text);
}
map[caseFixedKey].value = value;
map[caseFixedKey].final = final_value;
return true;
}
}

View File

@ -63,6 +63,11 @@ public:
template<class T>
optional<T> OverlayResourceFile(const T &src, const std::string &path) const;
// Attempts to update the map. If the update failed (because there was
// an existing final value, for example), returns the original map
template<class T>
optional<T> OverlayValue(const T &src, const std::string &key, const std::string &value) const;
// Returns an instance of the Configuration with all of the default resource
// files loaded.
// T must be Configuration or a subclass
@ -104,6 +109,11 @@ protected:
static bool UpdateMapWithBytes(Configuration::ConfigMap &map,
std::vector<char> &raw_bytes);
// Attempts to update the map. If the update failed (because there was
// an existing final value, for example), returns false
static bool UpdateMapWithValue(ConfigMap& map,
const std::string& key, const std::string& value, const std::string& final_text);
std::vector<std::string> search_path_;
};

View File

@ -84,17 +84,24 @@ optional<T> ConfigurationLoader::OverlayResourceString(const T& src, const std::
}
}
template<class T>
optional<T> ConfigurationLoader::OverlayValue(const T& src, const std::string &key, const std::string &value) const {
ConfigMap map(src.raw_values_);
UpdateMapWithValue(map, key, value, "");
return std::experimental::make_optional<T>(map);
}
template <class T>
optional<T> ConfigurationLoader::LoadDefaultResources() {
std::vector<std::string> default_filenames = T::GetDefaultFilenames();
ConfigMap result;
bool success = true;
bool success = false;
for (auto fn: default_filenames) {
success &= UpdateMapWithFile(result, fn);
if (!success)
break;
// We succeed if we have loaded data from any file
success |= UpdateMapWithFile(result, fn);
}
if (success) {

View File

@ -25,6 +25,7 @@ HdfsConfiguration::HdfsConfiguration() : Configuration() {}
// Constructs a configuration with a copy of the input data
HdfsConfiguration::HdfsConfiguration(ConfigMap &src_map) : Configuration(src_map) {}
HdfsConfiguration::HdfsConfiguration(const ConfigMap &src_map) : Configuration(src_map) {}
std::vector<std::string> HdfsConfiguration::GetDefaultFilenames() {
auto result = Configuration::GetDefaultFilenames();

View File

@ -37,6 +37,7 @@ class HdfsConfiguration : public Configuration {
Options GetOptions();
// Keys to look for in the configuration file
static constexpr const char * kFsDefaultFsKey = "fs.defaultFS";
static constexpr const char * kDfsClientSocketTimeoutKey = "dfs.client.socket-timeout";
static constexpr const char * kIpcClientConnectMaxRetriesKey = "ipc.client.connect.max.retries";
static constexpr const char * kIpcClientConnectRetryIntervalKey = "ipc.client.connect.retry.interval";
@ -48,6 +49,7 @@ private:
// Constructs a configuration with some static data
HdfsConfiguration(ConfigMap &src_map);
HdfsConfiguration(const ConfigMap &src_map);
static std::vector<std::string> GetDefaultFilenames();
};

View File

@ -100,3 +100,7 @@ build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdf
link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static)
endif(HADOOP_BUILD)
add_executable(hdfs_builder_test hdfs_builder_test.cc)
target_link_libraries(hdfs_builder_test test_common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(hdfs_builder_test hdfs_builder_test)

View File

@ -70,6 +70,15 @@ TEST(ConfigurationTest, TestBasicOperations) {
EXPECT_EQ("value2", config->GetWithDefault("key2", ""));
}
/* Case-insensitive */
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "value1");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
EXPECT_EQ("value1", config->GetWithDefault("KEY1", ""));
}
/* No defaults */
{
std::stringstream stream;
@ -126,6 +135,36 @@ TEST(ConfigurationTest, TestStringResource) {
}
}
TEST(ConfigurationTest, TestValueOverlay) {
/* Incremental updates */
{
ConfigurationLoader loader;
std::stringstream stream;
stream << "<configuration>"
"<property><name>key1</name><value>value1</value><final>false</final></property>"
"<property><name>final2</name><value>value2</value><final>true</final></property>"
"</configuration>";
optional<Configuration> config = loader.Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
EXPECT_EQ("value2", config->GetWithDefault("final2", ""));
config = loader.OverlayValue(config.value(), "key3", "value3");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
EXPECT_EQ("value2", config->GetWithDefault("final2", ""));
EXPECT_EQ("value3", config->GetWithDefault("key3", ""));
config = loader.OverlayValue(config.value(), "final2", "value4");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
EXPECT_EQ("value2", config->GetWithDefault("final2", ""));
EXPECT_EQ("value3", config->GetWithDefault("key3", ""));
// Case insensitive overlay
config = loader.OverlayValue(config.value(), "KEY3", "value3a");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
EXPECT_EQ("value2", config->GetWithDefault("final2", ""));
EXPECT_EQ("value3a", config->GetWithDefault("key3", ""));
}
}
TEST(ConfigurationTest, TestFinal) {
{
/* Explicitly non-final non-compact value */

View File

@ -84,8 +84,9 @@ public:
}
TempFile(const std::string & fn) : filename(fn), tempFileHandle(-1) {
strncpy(fn_buffer, fn.c_str(), sizeof(fn_buffer));
fn_buffer[sizeof(fn_buffer)-1] = 0;
}
~TempFile() { close(tempFileHandle); unlink(fn_buffer); }
~TempFile() { if(-1 != tempFileHandle) close(tempFileHandle); unlink(fn_buffer); }
};

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "hdfspp/hdfs_ext.h"
#include "configuration_test.h"
#include <gmock/gmock.h>
#include <google/protobuf/stubs/common.h>
using ::testing::_;
using namespace hdfs;
TEST(HdfsBuilderTest, TestStubBuilder) {
{
TempDir tempDir1;
hdfsBuilder * builder = hdfsNewBuilderFromDirectory(tempDir1.path.c_str());
hdfsFreeBuilder(builder);
}
{
hdfsBuilder * builder = hdfsNewBuilderFromDirectory("/this/path/does/not/exist");
hdfsFreeBuilder(builder);
}
}
TEST(HdfsBuilderTest, TestRead)
{
// Reading string values
{
TempDir tempDir1;
TempFile tempFile1(tempDir1.path + "/core-site.xml");
writeSimpleConfig(tempFile1.filename, "key1", "value1");
hdfsBuilder * builder = hdfsNewBuilderFromDirectory(tempDir1.path.c_str());
char * readVal = nullptr;
int result = hdfsBuilderConfGetStr(builder, "key1", &readVal);
ASSERT_EQ(0, result);
ASSERT_NE(nullptr, readVal);
EXPECT_EQ("value1", std::string(readVal));
hdfsConfStrFree(readVal);
readVal = nullptr;
result = hdfsBuilderConfGetStr(builder, "key2", &readVal);
ASSERT_EQ(0, result);
EXPECT_EQ(nullptr, readVal);
hdfsFreeBuilder(builder);
}
// Reading int values
{
TempDir tempDir1;
TempFile tempFile1(tempDir1.path + "/core-site.xml");
writeSimpleConfig(tempFile1.filename, "key1", "100");
hdfsBuilder * builder = hdfsNewBuilderFromDirectory(tempDir1.path.c_str());
int readVal = -1;
int result = hdfsBuilderConfGetInt(builder, "key1", &readVal);
EXPECT_EQ(0, result);
EXPECT_EQ(100, readVal);
readVal = -1;
result = hdfsBuilderConfGetInt(builder, "key2", &readVal);
EXPECT_EQ(0, result);
EXPECT_EQ(-1, readVal);
hdfsFreeBuilder(builder);
}
}
TEST(HdfsBuilderTest, TestSet)
{
{
// Setting values in an empty builder
// Don't use default, or it will load any data in /etc/hadoop
hdfsBuilder * builder = hdfsNewBuilderFromDirectory("/this/path/does/not/exist");
int result = hdfsBuilderConfSetStr(builder, "key1", "100");
EXPECT_EQ(0, result);
int readVal = -1;
result = hdfsBuilderConfGetInt(builder, "key1", &readVal);
EXPECT_EQ(0, result);
EXPECT_EQ(100, readVal);
// Set value in non-empty builder
result = hdfsBuilderConfSetStr(builder, "key2", "200");
EXPECT_EQ(0, result);
readVal = -1;
result = hdfsBuilderConfGetInt(builder, "key2", &readVal);
EXPECT_EQ(0, result);
EXPECT_EQ(200, readVal);
// Overwrite value
result = hdfsBuilderConfSetStr(builder, "key2", "300");
EXPECT_EQ(0, result);
readVal = -1;
result = hdfsBuilderConfGetInt(builder, "key2", &readVal);
EXPECT_EQ(0, result);
EXPECT_EQ(300, readVal);
hdfsFreeBuilder(builder);
}
}
int main(int argc, char *argv[]) {
/*
* The following line must be executed to initialize Google Mock
* (and Google Test) before running the tests.
*/
::testing::InitGoogleMock(&argc, argv);
int exit_code = RUN_ALL_TESTS();
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return exit_code;
}

View File

@ -26,7 +26,6 @@ using namespace hdfs;
namespace hdfs
{
TEST(HdfsConfigurationTest, TestDefaultOptions)
{
// Completely empty stream
@ -73,6 +72,34 @@ TEST(HdfsConfigurationTest, TestDefaultConfigs) {
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
EXPECT_EQ("value2", config->GetWithDefault("key2", ""));
}
// Only core-site.xml available
{
TempDir tempDir;
TempFile coreSite(tempDir.path + "/core-site.xml");
writeSimpleConfig(coreSite.filename, "key1", "value1");
ConfigurationLoader loader;
loader.SetSearchPath(tempDir.path);
optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
EXPECT_TRUE(config && "Parse streams");
EXPECT_EQ("value1", config->GetWithDefault("key1", ""));
}
// Only hdfs-site available
{
TempDir tempDir;
TempFile hdfsSite(tempDir.path + "/hdfs-site.xml");
writeSimpleConfig(hdfsSite.filename, "key2", "value2");
ConfigurationLoader loader;
loader.SetSearchPath(tempDir.path);
optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
EXPECT_TRUE(config && "Parse streams");
EXPECT_EQ("value2", config->GetWithDefault("key2", ""));
}
}
int main(int argc, char *argv[])