HDFS-9556: libhdfs++: pull Options from default configs by default. Contributed by Bob Hansen.

This commit is contained in:
Bob Hansen 2016-03-14 11:34:42 -04:00 committed by James Clampffer
parent f25bff50bf
commit 8f4a66ab8f
19 changed files with 946 additions and 65 deletions

View File

@ -46,6 +46,7 @@ set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
if(UNIX)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -pedantic -std=c++11 -g -fPIC -fno-strict-aliasing")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -fPIC -fno-strict-aliasing")
endif()
# Mac OS 10.7 and later deprecates most of the methods in OpenSSL.
@ -138,7 +139,7 @@ if(NEED_LINK_DL)
set(LIB_DL dl)
endif()
set(LIBHDFSPP_ALL_OBJECTS $<TARGET_OBJECTS:bindings_c_obj> $<TARGET_OBJECTS:fs_obj> $<TARGET_OBJECTS:rpc_obj> $<TARGET_OBJECTS:reader_obj> $<TARGET_OBJECTS:proto_obj> $<TARGET_OBJECTS:connection_obj> $<TARGET_OBJECTS:common_obj>)
set(LIBHDFSPP_ALL_OBJECTS $<TARGET_OBJECTS:bindings_c_obj> $<TARGET_OBJECTS:fs_obj> $<TARGET_OBJECTS:rpc_obj> $<TARGET_OBJECTS:reader_obj> $<TARGET_OBJECTS:proto_obj> $<TARGET_OBJECTS:connection_obj> $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
if (HADOOP_BUILD)
hadoop_add_dual_library(hdfspp ${EMPTY_FILE_CC} ${LIBHDFSPP_ALL_OBJECTS})
hadoop_target_link_dual_libraries(hdfspp

View File

@ -128,12 +128,23 @@ class FileSystem {
virtual void Connect(const std::string &server,
const std::string &service,
const std::function<void(const Status &, FileSystem *)> &&handler) = 0;
const std::function<void(const Status &, FileSystem *)> &handler) = 0;
/* Synchronous call of Connect */
virtual Status Connect(const std::string &server,
const std::string &service) = 0;
/**
* Connects to the hdfs instance indicated by the defaultFs value of the
* Options structure.
*
* If no defaultFs is defined, returns an error.
*/
virtual void ConnectToDefaultFs(
const std::function<void(const Status &, FileSystem *)> &handler) = 0;
virtual Status ConnectToDefaultFs() = 0;
/**
* Open a file on HDFS. The call issues an RPC to the NameNode to
* gather the locations of all blocks in the file and to return a

View File

@ -18,6 +18,8 @@
#ifndef LIBHDFSPP_OPTIONS_H_
#define LIBHDFSPP_OPTIONS_H_
#include "common/uri.h"
namespace hdfs {
/**
@ -51,6 +53,11 @@ struct Options {
unsigned int host_exclusion_duration;
static const unsigned int kDefaultHostExclusionDuration = 600000;
/**
* URI to connect to if no host:port are specified in connect
*/
URI defaultFS;
Options();
};
}

View File

@ -31,6 +31,9 @@
#include <algorithm>
using namespace hdfs;
using std::experimental::nullopt;
static constexpr tPort kDefaultPort = 8020;
/* Separate the handles used by the C api from the C++ API*/
struct hdfs_internal {
@ -84,12 +87,11 @@ struct hdfsBuilder {
ConfigurationLoader loader;
HdfsConfiguration config;
std::string overrideHost;
tPort overridePort; // 0 --> use default
std::string user;
optional<std::string> overrideHost;
optional<tPort> overridePort;
optional<std::string> user;
static constexpr tPort kUseDefaultPort = 0;
static constexpr tPort kDefaultPort = 8020;
};
/* Error handling with optional debug to stderr */
@ -183,28 +185,30 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
return 0;
}
hdfsFS hdfsConnect(const char *nn, tPort port) {
return hdfsConnectAsUser(nn, port, "");
}
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<std::string> user, const Options & options) {
try
{
std::string port_as_string = std::to_string(port);
IoService * io_service = IoService::New();
std::string user_name;
if (user) {
user_name = user;
}
FileSystem *fs = FileSystem::New(io_service, user_name, Options());
FileSystem *fs = FileSystem::New(io_service, user.value_or(""), options);
if (!fs) {
ReportError(ENODEV, "Could not create FileSystem object");
return nullptr;
}
if (!fs->Connect(nn, port_as_string).ok()) {
ReportError(ENODEV, "Unable to connect to NameNode.");
Status status;
if (nn || port) {
if (!port) {
port = kDefaultPort;
}
std::string port_as_string = std::to_string(*port);
status = fs->Connect(nn.value_or(""), port_as_string);
} else {
status = fs->ConnectToDefaultFs();
}
if (!status.ok()) {
Error(status);
// FileSystem's ctor might take ownership of the io_service; if it does,
// it will null out the pointer
@ -225,6 +229,14 @@ hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
}
}
hdfsFS hdfsConnect(const char *nn, tPort port) {
return hdfsConnectAsUser(nn, port, "");
}
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
return doHdfsConnect(std::string(nn), port, std::string(user), Options());
}
int hdfsDisconnect(hdfsFS fs) {
try
{
@ -403,12 +415,14 @@ HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
}
}
hdfsBuilder::hdfsBuilder() : config(LoadDefault(loader)), overridePort(kUseDefaultPort)
hdfsBuilder::hdfsBuilder() : config(loader.New<HdfsConfiguration>())
{
loader.SetDefaultSearchPath();
config = LoadDefault(loader);
}
hdfsBuilder::hdfsBuilder(const char * directory) :
config(loader.New<HdfsConfiguration>()), overridePort(kUseDefaultPort)
config(loader.New<HdfsConfiguration>())
{
loader.SetSearchPath(directory);
config = LoadDefault(loader);
@ -430,7 +444,7 @@ struct hdfsBuilder *hdfsNewBuilder(void)
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
{
bld->overrideHost = nn;
bld->overrideHost = std::string(nn);
}
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
@ -440,10 +454,8 @@ void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
{
if (userName) {
bld->user = userName;
} else {
bld->user = "";
if (userName && *userName) {
bld->user = std::string(userName);
}
}
@ -489,34 +501,7 @@ void hdfsConfStrFree(char *val)
}
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
try
{
if (!bld->overrideHost.empty())
{
// TODO: pass rest of config once we get that done (HDFS-9556)
tPort port = bld->overridePort;
if (port == hdfsBuilder::kUseDefaultPort)
{
port = hdfsBuilder::kDefaultPort;
}
if (bld->user.empty())
return hdfsConnect(bld->overrideHost.c_str(), port);
else
return hdfsConnectAsUser(bld->overrideHost.c_str(), port, bld->user.c_str());
}
else
{
//TODO: allow construction from default port once that is done (HDFS-9556)
ReportError(EINVAL, "No host provided to builder in hdfsBuilderConnect");
return nullptr;
}
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr;
}
return doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions());
}
int hdfsConfGetStr(const char *key, char **val)

View File

@ -19,6 +19,6 @@ if(NEED_LINK_DL)
set(LIB_DL dl)
endif()
add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc util.cc retry_policy.cc cancel_tracker.cc)
add_library(common $<TARGET_OBJECTS:common_obj>)
add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc)
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
target_link_libraries(common ${LIB_DL})

View File

@ -32,6 +32,7 @@
*/
#include "configuration.h"
#include "uri.h"
#include <strings.h>
#include <sstream>
@ -137,4 +138,30 @@ bool Configuration::GetBoolWithDefault(const std::string& key,
bool default_value) const {
return GetBool(key).value_or(default_value);
}
optional<URI> Configuration::GetUri(const std::string& key) const {
auto raw = Get(key);
if (raw) {
return URI::parse_from_string(*raw);
} else {
return optional<URI>();
}
}
URI Configuration::GetUriWithDefault(const std::string& key,
std::string default_value) const {
optional<URI> result = GetUri(key);
if (result) {
return *result;
} else {
result = URI::parse_from_string(default_value);
if (result) {
return *result;
} else {
return URI();
}
}
}
}

View File

@ -19,6 +19,8 @@
#ifndef COMMON_CONFIGURATION_H_
#define COMMON_CONFIGURATION_H_
#include "common/uri.h"
#include <string>
#include <map>
#include <vector>
@ -63,6 +65,9 @@ class Configuration {
bool GetBoolWithDefault(const std::string &key,
bool default_value) const;
optional<bool> GetBool(const std::string &key) const;
URI GetUriWithDefault(const std::string &key,
std::string default_value) const;
optional<URI> GetUri(const std::string &key) const;
protected:
friend class ConfigurationLoader;

View File

@ -63,9 +63,15 @@ bool str_to_bool(const std::string& raw) {
}
void ConfigurationLoader::SetDefaultSearchPath() {
//TODO: Use HADOOP_CONF_DIR when we get environment subs with HDFS-9385
AddToSearchPath("./");
AddToSearchPath("/etc/hadoop/");
// Try (in order, taking the first valid one):
// $HADOOP_CONF_DIR
// /etc/hadoop/conf
const char * hadoop_conf_dir_env = getenv("HADOOP_CONF_DIR");
if (hadoop_conf_dir_env) {
AddToSearchPath(hadoop_conf_dir_env);
} else {
AddToSearchPath("/etc/hadoop/conf");
}
}
void ConfigurationLoader::ClearSearchPath()

View File

@ -79,7 +79,7 @@ public:
* SEARCH PATH METHODS
***************************************************************************/
// Sets the search path to the default search path (namely, ".:/etc/hadoop")
// Sets the search path to the default search path (namely, "$HADOOP_CONF_DIR" or "/etc/hadoop/conf")
void SetDefaultSearchPath();
// Clears out the search path

View File

@ -47,6 +47,7 @@ Options HdfsConfiguration::GetOptions() {
OptionalSet(result.rpc_timeout, GetInt(kDfsClientSocketTimeoutKey));
OptionalSet(result.max_rpc_retries, GetInt(kIpcClientConnectMaxRetriesKey));
OptionalSet(result.rpc_retry_delay_ms, GetInt(kIpcClientConnectRetryIntervalKey));
OptionalSet(result.defaultFS, GetUri(kFsDefaultFsKey));
return result;
}

View File

@ -29,5 +29,7 @@ const unsigned int Options::kDefaultHostExclusionDuration;
Options::Options() : rpc_timeout(kDefaultRpcTimeout), max_rpc_retries(kDefaultMaxRpcRetries),
rpc_retry_delay_ms(kDefaultRpcRetryDelayMs),
host_exclusion_duration(kDefaultHostExclusionDuration) {}
host_exclusion_duration(kDefaultHostExclusionDuration),
defaultFS()
{}
}

View File

@ -0,0 +1,371 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <common/uri.h>
#include <uriparser2/uriparser/Uri.h>
#include <string.h>
#include <sstream>
#include <cstdlib>
#include <limits>
using std::experimental::nullopt;
namespace hdfs
{
///////////////////////////////////////////////////////////////////////////////
//
// Internal utilities
//
///////////////////////////////////////////////////////////////////////////////
const char kReserved[] = ":/?#[]@%+";
std::string URI::encode(const std::string & decoded)
{
bool hasCharactersToEncode = false;
for (auto c : decoded)
{
if (isalnum(c) || (strchr(kReserved, c) == NULL))
{
continue;
}
else
{
hasCharactersToEncode = true;
break;
}
}
if (hasCharactersToEncode)
{
std::vector<char> buf(decoded.size() * 3 + 1);
uriEscapeA(decoded.c_str(), &buf[0], true, URI_BR_DONT_TOUCH);
return std::string(&buf[0]);
}
else
{
return decoded;
}
}
std::string URI::decode(const std::string & encoded)
{
bool hasCharactersToDecode = false;
for (auto c : encoded)
{
switch (c)
{
case '%':
case '+':
hasCharactersToDecode = true;
break;
default:
continue;
}
}
if (hasCharactersToDecode)
{
std::vector<char> buf(encoded.size() + 1);
strncpy(&buf[0], encoded.c_str(), buf.size());
uriUnescapeInPlaceExA(&buf[0], true, URI_BR_DONT_TOUCH);
return std::string(&buf[0]);
}
else
{
return encoded;
}
}
std::vector<std::string> split(const std::string input, char separator)
{
std::vector<std::string> result;
if (!input.empty())
{
const char * remaining = input.c_str();
if (*remaining == '/')
remaining++;
const char * next_end = strchr(remaining, separator);
while (next_end) {
int len = next_end - remaining;
if (len)
result.push_back(std::string(remaining, len));
else
result.push_back("");
remaining = next_end + 1;
next_end = strchr(remaining, separator);
}
result.push_back(std::string(remaining));
}
return result;
}
///////////////////////////////////////////////////////////////////////////////
//
// Parsing
//
///////////////////////////////////////////////////////////////////////////////
std::string copy_range(const UriTextRangeA *r) {
const int size = r->afterLast - r->first;
if (size) {
return std::string(r->first, size);
}
return "";
}
bool parse_int(const UriTextRangeA *r, optional<uint16_t> * result) {
assert(result); // output
std::string int_string = copy_range(r);
if (!int_string.empty()) {
errno = 0;
unsigned long val = ::strtoul(int_string.c_str(), nullptr, 10);
if (errno == 0 && val < std::numeric_limits<uint16_t>::max() ) {
*result = std::experimental::make_optional<uint16_t>(val);
return true;
} else {
return false;
}
}
// No value
*result = nullopt;
return true;
}
std::vector<std::string> copy_path(const UriPathSegmentA *ps) {
std::vector<std::string> result;
if (nullptr == ps)
return result;
for (; ps != 0; ps = ps->next) {
result.push_back(copy_range(&ps->text));
}
return result;
}
void parse_user_info(const UriTextRangeA *r, std::string * user, std::string * pass) {
// Output parameters
assert(user);
assert(pass);
std::string user_and_password = copy_range(r);
if (!user_and_password.empty()) {
const char * begin = user_and_password.c_str();
const char * colon_loc = strchr(begin, ':');
if (colon_loc) {
*user = std::string(begin, colon_loc - begin - 1);
*pass = colon_loc + 1;
} else {
*user = user_and_password;
}
}
}
std::vector<std::pair<std::string, std::string > > parse_query(const char *first, const char * afterLast) {
std::vector<std::pair<std::string, std::string > > result;
UriQueryListA * query;
int count;
int dissect_result = uriDissectQueryMallocExA(&query, &count, first, afterLast, false, URI_BR_DONT_TOUCH);
if (URI_SUCCESS == dissect_result) {
for (auto ps = query; ps != nullptr; ps = ps->next) {
std::string key = ps->key ? URI::encode(ps->key) : "";
std::string value = ps->value ? URI::encode(ps->value) : "";
result.push_back(std::make_pair(key, value));
}
uriFreeQueryListA(query);
}
return result;
}
optional<URI> URI::parse_from_string(const std::string &str)
{
URI ret;
bool ok = true;
UriParserStateA state;
memset(&state, 0, sizeof(state));
UriUriA uu;
state.uri = &uu;
int parseResult = uriParseUriA(&state, str.c_str());
ok &= (parseResult == URI_SUCCESS);
if (ok) {
ret.scheme = copy_range(&uu.scheme);
ret.host = copy_range(&uu.hostText);
ok &= parse_int(&uu.portText, &ret.port);
ret.path = copy_path(uu.pathHead);
ret.query = parse_query(uu.query.first, uu.query.afterLast);
ret.fragment = copy_range(&uu.fragment);
parse_user_info(&uu.userInfo, &ret.user, &ret.pass);
uriFreeUriMembersA(&uu);
}
uriFreeUriMembersA(&uu);
if (ok) {
return std::experimental::make_optional(ret);
} else {
return nullopt;
}
}
///////////////////////////////////////////////////////////////////////////////
//
// Getters and setters
//
///////////////////////////////////////////////////////////////////////////////
std::string URI::str(bool encoded_output) const
{
std::stringstream ss;
if (!scheme.empty()) ss << from_encoded(encoded_output, scheme) << "://";
if (!user.empty() || !pass.empty()) {
if (!user.empty()) ss << from_encoded(encoded_output, user);
if (!pass.empty()) ss << ":" << from_encoded(encoded_output, pass);
ss << "@";
}
if (has_authority()) ss << build_authority(encoded_output);
if (!path.empty()) ss << get_path(encoded_output);
if (!query.empty()) ss << "?" << get_query(encoded_output);
if (!fragment.empty()) ss << "#" << from_encoded(encoded_output, fragment);
return ss.str();
}
bool URI::has_authority() const
{
return (!host.empty()) || (port);
}
std::string URI::build_authority(bool encoded_output) const
{
std::stringstream ss;
ss << URI::from_encoded(encoded_output, host);
if (port)
{
ss << ":" << *port;
}
return ss.str();
}
std::string URI::get_path(bool encoded_output) const
{
std::ostringstream out;
for (auto s: path) {
out << "/" << from_encoded(encoded_output, s);
}
return out.str();
}
std::vector<std::string> URI::get_path_elements(bool encoded_output) const
{
std::vector<std::string> result;
for (auto path_elem: path) {
result.push_back(from_encoded(encoded_output, path_elem));
}
return result;
}
void URI::parse_path(bool input_encoded, const std::string &input_path)
{
std::vector<std::string> split_path = split(input_path, '/');
for (auto s: split_path) {
path.push_back(to_encoded(input_encoded, s));
}
}
// Mostly copied and modified from uriparser2.c
void URI::add_path(const std::string &p, bool encoded_input)
{
path.push_back(to_encoded(encoded_input, p));
}
std::string URI::get_query(bool encoded_output) const {
bool first = true;
std::stringstream ss;
for (auto q: query) {
if (!first) {
ss << "&";
}
ss << from_encoded(encoded_output, q.first) << "=" << from_encoded(encoded_output, q.second);
first = false;
}
return ss.str();
}
std::vector< std::pair<std::string, std::string> > URI::get_query_elements(bool encoded_output) const
{
std::vector< std::pair<std::string, std::string> > result;
for (auto q: query) {
auto key = from_encoded(encoded_output, q.first);
auto value = from_encoded(encoded_output, q.second);
result.push_back(std::make_pair(key, value));
}
return result;
}
void URI::set_query(const std::string &q) {
query = parse_query(q.c_str(), q.c_str() + q.size() + 1);
}
void URI::add_query(const std::string &name, const std::string & value, bool encoded_input)
{
query.push_back(std::make_pair(to_encoded(encoded_input, name), to_encoded(encoded_input, value)));
}
void URI::remove_queries(const std::string &q_name, bool encoded_input)
{
if (query.empty())
return;
// This is the one place we need to do decoded comparisons
std::string decoded_key = encoded_input ? decode(q_name) : q_name;
for (int i = query.size() - 1; i >= 0; i--) {
if (decode(query[i].first) == decoded_key) {
query.erase(query.begin() + i);
}
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef COMMON_HDFS_URI_H_
#define COMMON_HDFS_URI_H_
#include <iostream>
#include <string>
#include <optional.hpp>
#include <vector>
namespace hdfs
{
template <class T>
using optional = std::experimental::optional<T>;
class URI
{
// These are stored in encoded form
std::string scheme;
std::string user;
std::string pass;
std::string host;
optional<uint16_t> port;
std::vector<std::string> path;
std::vector<std::pair<std::string,std::string> > query;
std::string fragment;
template <class T>
static T from_encoded(bool encoded_output, const T & input) {return encoded_output ? input : decode(input);}
template <class T>
static T to_encoded(bool encoded_input, const T & input) {return encoded_input ? input : encode(input);}
bool has_authority() const;
std::string build_authority(bool encoded_output) const;
std::string build_path(bool encoded_output) const;
void parse_path(bool input_encoded, const std::string &input_path);
public:
// Parse a string into a URI. Returns nullopt if the URI is malformed.
static optional<URI> parse_from_string(const std::string &str);
static std::string encode (const std::string &input);
static std::string decode (const std::string &input);
std::string get_scheme(bool encoded_output=false) const
{ return from_encoded(encoded_output,scheme); }
void set_scheme(const std::string &s, bool encoded_input=false)
{ scheme = to_encoded(encoded_input,s); }
// empty if none.
std::string get_host(bool encoded_output=false) const
{ return from_encoded(encoded_output,host); }
void set_host(const std::string& h, bool encoded_input=false)
{ host = to_encoded(encoded_input,h); }
// -1 if the port is undefined.
optional<uint16_t> get_port() const
{ return port; }
void set_port(uint16_t p)
{ port = p; }
void clear_port()
{ port = std::experimental::nullopt; }
std::string get_path(bool encoded_output=false) const;
std::vector<std::string> get_path_elements(bool encoded_output=false) const;
void set_path(const std::string &p, bool encoded_input=false) {
parse_path(encoded_input, p);
}
void add_path(const std::string &p, bool encoded_input=false);
std::string get_query(bool encoded_output=false) const;
std::vector< std::pair<std::string, std::string> > get_query_elements(bool encoded_output=false) const;
// Not that set_query must always pass in encoded strings
void set_query(const std::string &q);
// Adds a parameter onto the query; does not check if it already exists
// e.g. parseFromString("foo?bar=baz").addQuery("bing","bang")
// would leave "bar=baz&bing=bang" as the query
void add_query(const std::string &name, const std::string & value, bool encoded_input=false);
// Removes the query part if exists
// e.g. parseFromString("foo?bar=baz&bing=bang&bar=bong").removeQueries("bar")
// would leave bing=bang as the query
void remove_queries(const std::string &q_name, bool encoded_input=false);
std::string get_fragment(bool encoded_output=false) const
{ return from_encoded(encoded_output, fragment); }
void set_fragment(const std::string &f, bool encoded_input=false)
{ fragment = to_encoded(encoded_input,f); }
std::string str(bool encoded_output=true) const;
};
inline std::ostream& operator<<(std::ostream &out, const URI &uri)
{ return out << uri.str(); }
}
#endif

View File

@ -37,6 +37,9 @@ static const int kNamenodeProtocolVersion = 1;
using ::asio::ip::tcp;
static constexpr uint16_t kDefaultPort = 8020;
/*****************************************************************************
* NAMENODE OPERATIONS
****************************************************************************/
@ -148,7 +151,8 @@ const std::string get_effective_user_name(const std::string &user_name) {
FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_name,
const Options &options)
: io_service_(static_cast<IoServiceImpl *>(io_service)),
: options_(options),
io_service_(static_cast<IoServiceImpl *>(io_service)),
nn_(&io_service_->io_service(), options,
GetRandomClientName(), get_effective_user_name(user_name), kNamenodeProtocol,
kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
@ -175,7 +179,7 @@ FileSystemImpl::~FileSystemImpl() {
void FileSystemImpl::Connect(const std::string &server,
const std::string &service,
const std::function<void(const Status &, FileSystem * fs)> &&handler) {
const std::function<void(const Status &, FileSystem * fs)> &handler) {
/* IoService::New can return nullptr */
if (!io_service_) {
handler (Status::Error("Null IoService"), this);
@ -204,6 +208,48 @@ Status FileSystemImpl::Connect(const std::string &server, const std::string &ser
return s;
}
void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) {
std::string scheme = options_.defaultFS.get_scheme();
if (strcasecmp(scheme.c_str(), "hdfs") != 0) {
std::string error_message;
error_message += "defaultFS of [" + options_.defaultFS.str() + "] is not supported";
handler(Status::InvalidArgument(error_message.c_str()), nullptr);
return;
}
std::string host = options_.defaultFS.get_host();
if (host.empty()) {
handler(Status::InvalidArgument("defaultFS must specify a hostname"), nullptr);
return;
}
optional<uint16_t> port = options_.defaultFS.get_port();
if (!port) {
port = kDefaultPort;
}
std::string port_as_string = std::to_string(*port);
Connect(host, port_as_string, handler);
}
Status FileSystemImpl::ConnectToDefaultFs() {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callback = [stat](const Status &s, FileSystem *fs) {
(void)fs;
stat->set_value(s);
};
ConnectToDefaultFs(callback);
/* block until promise is set */
auto s = future.get();
return s;
}
int FileSystemImpl::AddWorkerThread() {
auto service_task = [](IoService *service) { service->Run(); };

View File

@ -85,10 +85,14 @@ public:
/* attempt to connect to namenode, return bad status on failure */
void Connect(const std::string &server, const std::string &service,
const std::function<void(const Status &, FileSystem *)> &&handler) override;
const std::function<void(const Status &, FileSystem *)> &handler) override;
/* attempt to connect to namenode, return bad status on failure */
Status Connect(const std::string &server, const std::string &service) override;
/* Connect to the NN indicated in options.defaultFs */
virtual void ConnectToDefaultFs(
const std::function<void(const Status &, FileSystem *)> &handler) override;
virtual Status ConnectToDefaultFs() override;
virtual void Open(const std::string &path,
const std::function<void(const Status &, FileHandle *)>
@ -105,6 +109,7 @@ public:
private:
const Options options_;
/**
* The IoService must be the first member variable to ensure that it gets
* destroyed last. This allows other members to dequeue things from the

View File

@ -55,6 +55,10 @@ function(add_memcheck_test name binary)
endfunction(add_memcheck_test)
add_executable(uri_test uri_test.cc)
target_link_libraries(uri_test common gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(uri uri_test)
add_executable(remote_block_reader_test remote_block_reader_test.cc)
target_link_libraries(remote_block_reader_test test_common reader proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(remote_block_reader remote_block_reader_test)

View File

@ -483,6 +483,43 @@ TEST(ConfigurationTest, TestBoolConversions) {
}
}
TEST(ConfigurationTest, TestUriConversions) {
/* No defaults */
{
std::stringstream stream;
simpleConfigStream(stream, "key1", "hdfs:///");
optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str());
EXPECT_TRUE(config && "Parse single value");
optional<URI> value = config->GetUri("key1");
EXPECT_TRUE((bool)value);
EXPECT_EQ("hdfs:///", value->str());
EXPECT_FALSE(config->GetUri("key2"));
}
{
optional<Configuration> config = simpleConfig("key1", "hdfs:///");
EXPECT_EQ("hdfs:///", config->GetUriWithDefault("key1", "http:///").str());
}
{
optional<Configuration> config = simpleConfig("key1", " hdfs:/// ");
EXPECT_EQ("hdfs:///", config->GetUriWithDefault("key1", "http:///").str());
}
{
optional<Configuration> config = simpleConfig("key1", "");
EXPECT_EQ("", config->GetUriWithDefault("key1", "http:///").str());
}
{
optional<Configuration> config = simpleConfig("key1", "%%"); // invalid URI
EXPECT_EQ("http:///", config->GetUriWithDefault("key1", "http:///").str());
}
{
optional<Configuration> config = simpleConfig("key2", "hdfs:///");
EXPECT_EQ("http:///", config->GetUriWithDefault("key1", "http:///").str());
}
}
int main(int argc, char *argv[]) {
/*
* The following line must be executed to initialize Google Mock

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/uri.h"
#include <gmock/gmock.h>
using ::testing::_;
using namespace hdfs;
TEST(UriTest, TestDegenerateInputs) {
/* Empty input */
{
optional<URI> uri = URI::parse_from_string("");
EXPECT_TRUE(uri && "Empty input");
}
/* Invalid encoding */
{
optional<URI> uri = URI::parse_from_string("%%");
EXPECT_FALSE(uri && "Bad input");
}
/* Invalid port */
{
optional<URI> uri = URI::parse_from_string("hdfs://nn:foo/");
EXPECT_FALSE(uri && "Bad port");
}
/* Negative port */
{
optional<URI> uri = URI::parse_from_string("hdfs://nn:-100/");
EXPECT_FALSE(uri && "Negative port");
}
/* Empty paths */
{
optional<URI> uri = URI::parse_from_string("hdfs://////");
EXPECT_TRUE(uri && "Empty paths");
}
}
TEST(UriTest, TestNominalInputs) {
/* Simple input */
{
optional<URI> uri = URI::parse_from_string("hdfs:///foo");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("hdfs", uri->get_scheme());
EXPECT_EQ("", uri->get_host());
EXPECT_EQ(0, uri->get_port().value_or(0));
EXPECT_EQ("/foo", uri->get_path());
EXPECT_EQ("", uri->get_fragment());
EXPECT_EQ("", uri->get_query());
}
/* With authority */
{
optional<URI> uri = URI::parse_from_string("hdfs://host:100/foo");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("hdfs", uri->get_scheme());
EXPECT_EQ("host", uri->get_host());
EXPECT_EQ(100, uri->get_port().value_or(0));
EXPECT_EQ("/foo", uri->get_path());
EXPECT_EQ("", uri->get_fragment());
EXPECT_EQ("", uri->get_query());
}
/* No scheme */
{
optional<URI> uri = URI::parse_from_string("/foo");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("", uri->get_scheme());
EXPECT_EQ("", uri->get_host());
EXPECT_EQ(0, uri->get_port().value_or(0));
EXPECT_EQ("/foo", uri->get_path());
EXPECT_EQ("", uri->get_fragment());
EXPECT_EQ("", uri->get_query());
}
/* All fields */
{
optional<URI> uri = URI::parse_from_string("hdfs://nn:8020/path/to/data?a=b&c=d#fragment");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("hdfs", uri->get_scheme());
EXPECT_EQ("nn", uri->get_host());
EXPECT_EQ(8020, uri->get_port().value_or(0));
EXPECT_EQ("/path/to/data", uri->get_path());
EXPECT_EQ("a=b&c=d", uri->get_query());
EXPECT_EQ(3, uri->get_path_elements().size());
EXPECT_EQ("path", uri->get_path_elements()[0]);
EXPECT_EQ("to", uri->get_path_elements()[1]);
EXPECT_EQ("data", uri->get_path_elements()[2]);
EXPECT_EQ(2, uri->get_query_elements().size());
EXPECT_EQ("a", uri->get_query_elements()[0].first);
EXPECT_EQ("b", uri->get_query_elements()[0].second);
EXPECT_EQ("c", uri->get_query_elements()[1].first);
EXPECT_EQ("d", uri->get_query_elements()[1].second);
EXPECT_EQ("fragment", uri->get_fragment());
}
}
TEST(UriTest, TestEncodedInputs) {
// Note that scheme and port cannot be uri-encoded
/* Encoded input */
{
optional<URI> uri = URI::parse_from_string("S://%5E:1/+%5E%20?%5E=%5E#%5E");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("S", uri->get_scheme());
EXPECT_EQ("^", uri->get_host());
EXPECT_EQ(1, uri->get_port().value_or(0));
EXPECT_EQ("/ ^ ", uri->get_path());
EXPECT_EQ("^", uri->get_fragment());
EXPECT_EQ("^=^", uri->get_query());
}
/* Lowercase */
{
optional<URI> uri = URI::parse_from_string("S://%5e:1/+%5e%20?%5e=%5e#%5e");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("S", uri->get_scheme());
EXPECT_EQ("^", uri->get_host());
EXPECT_EQ(1, uri->get_port().value_or(0));
EXPECT_EQ("/ ^ ", uri->get_path());
EXPECT_EQ("^", uri->get_fragment());
EXPECT_EQ("^=^", uri->get_query());
}
}
TEST(UriTest, TestDecodedInputsAndOutputs) {
/* All fields non-encoded and shouldn't be interpreted */
{
optional<URI> uri = URI::parse_from_string("S://%25/%25+?%25=%25#%25");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("S", uri->get_scheme());
EXPECT_EQ("%", uri->get_host());
EXPECT_EQ(0, uri->get_port().value_or(0));
EXPECT_EQ("/% ", uri->get_path());
EXPECT_EQ("%", uri->get_fragment());
EXPECT_EQ("%=%", uri->get_query());
}
/* All fields encode fields on their way out */
{
optional<URI> uri = URI::parse_from_string("S://%25/%25+?%25=%25#%25");
ASSERT_TRUE(uri && "Parsed");
EXPECT_EQ("S", uri->get_scheme(true));
EXPECT_EQ("%25", uri->get_host(true));
EXPECT_EQ(0, uri->get_port().value_or(0));
EXPECT_EQ("/%25+", uri->get_path(true));
EXPECT_EQ("%25", uri->get_fragment(true));
EXPECT_EQ("%25=%25", uri->get_query(true));
}
}
TEST(UriTest, TestSetters) {
/* Non-encoded inputs */
{
URI uri;
uri.set_scheme("S");
uri.set_host("%");
uri.set_port(100);
uri.set_path("%/%/%");
uri.set_fragment("%");
uri.set_query("%25=%25"); //set_query must always be encoded
EXPECT_EQ("S://%25:100/%25/%25/%25?%25=%25#%25", uri.str());
}
/* Incremental adders, non-encoded */
{
URI uri;
uri.set_scheme("S");
uri.set_host("%");
uri.set_port(100);
uri.set_fragment("%");
EXPECT_EQ("S://%25:100#%25", uri.str());
uri.add_path("%");
uri.add_query("%", "%");
EXPECT_EQ("S://%25:100/%25?%25=%25#%25", uri.str());
uri.add_path("%");
uri.add_query("%", "%");
EXPECT_EQ("S://%25:100/%25/%25?%25=%25&%25=%25#%25", uri.str());
}
/* Encoded inputs */
{
URI uri;
uri.set_scheme("S", true);
uri.set_host("%25", true);
uri.set_port(100);
uri.set_path("%25/%25/%25", true);
uri.set_fragment("%25", true);
uri.set_query("%25=%25"); //set_query must always be encoded
EXPECT_EQ("S://%25:100/%25/%25/%25?%25=%25#%25", uri.str());
}
/* Incremental adders, encoded */
{
URI uri;
uri.set_scheme("S", true);
uri.set_host("%25", true);
uri.set_port(100);
uri.set_fragment("%25", true);
EXPECT_EQ("S://%25:100#%25", uri.str());
uri.add_path("%25", true);
uri.add_query("%25", "%25", true);
EXPECT_EQ("S://%25:100/%25?%25=%25#%25", uri.str());
uri.add_path("%25", true);
uri.add_query("%25", "%25", true);
EXPECT_EQ("S://%25:100/%25/%25?%25=%25&%25=%25#%25", uri.str());
}
}
int main(int argc, char *argv[]) {
/*
* The following line must be executed to initialize Google Mock
* (and Google Test) before running the tests.
*/
::testing::InitGoogleMock(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -22,5 +22,5 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-attributes")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-attributes")
add_library(uriparser2_obj OBJECT uriparser2/uriparser2.c uriparser2/uriparser/UriParse.c uriparser2/uriparser/UriParseBase.c
uriparser2/uriparser/UriCommon.c uriparser2/uriparser/UriIp4Base.c uriparser2/uriparser/UriIp4.c)
uriparser2/uriparser/UriCommon.c uriparser2/uriparser/UriIp4Base.c uriparser2/uriparser/UriIp4.c uriparser2/uriparser/UriEscape.c uriparser2/uriparser/UriQuery.c)
add_library(uriparser2 $<TARGET_OBJECTS:uriparser2_obj>)