HDFS-10787: libhdfs++: Public API should expose configuration parser. Original patch contributed by Mitchell Tracy, followup work and compile warning fixes contributed by Anatoli Shein.
This commit is contained in:
parent
eeb49d0ca7
commit
12942f679a
|
@ -30,6 +30,8 @@ cmake_minimum_required(VERSION 2.8)
|
||||||
|
|
||||||
enable_testing()
|
enable_testing()
|
||||||
include (CTest)
|
include (CTest)
|
||||||
|
|
||||||
|
SET(BUILD_SHARED_HDFSPP TRUE CACHE STRING "BUILD_SHARED_HDFSPP defaulting to 'TRUE'")
|
||||||
SET(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
|
SET(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
|
||||||
|
|
||||||
# If there's a better way to inform FindCyrusSASL.cmake, let's make this cleaner:
|
# If there's a better way to inform FindCyrusSASL.cmake, let's make this cleaner:
|
||||||
|
@ -246,6 +248,7 @@ if(NEED_LINK_DL)
|
||||||
set(LIB_DL dl)
|
set(LIB_DL dl)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
|
set(LIBHDFSPP_VERSION "0.1.0")
|
||||||
set(LIBHDFSPP_ALL_OBJECTS $<TARGET_OBJECTS:bindings_c_obj> $<TARGET_OBJECTS:fs_obj> $<TARGET_OBJECTS:rpc_obj> $<TARGET_OBJECTS:reader_obj> $<TARGET_OBJECTS:proto_obj> $<TARGET_OBJECTS:connection_obj> $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
|
set(LIBHDFSPP_ALL_OBJECTS $<TARGET_OBJECTS:bindings_c_obj> $<TARGET_OBJECTS:fs_obj> $<TARGET_OBJECTS:rpc_obj> $<TARGET_OBJECTS:reader_obj> $<TARGET_OBJECTS:proto_obj> $<TARGET_OBJECTS:connection_obj> $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
|
||||||
if (HADOOP_BUILD)
|
if (HADOOP_BUILD)
|
||||||
hadoop_add_dual_library(hdfspp ${EMPTY_FILE_CC} ${LIBHDFSPP_ALL_OBJECTS})
|
hadoop_add_dual_library(hdfspp ${EMPTY_FILE_CC} ${LIBHDFSPP_ALL_OBJECTS})
|
||||||
|
@ -256,6 +259,7 @@ if (HADOOP_BUILD)
|
||||||
${SASL_LIBRARIES}
|
${SASL_LIBRARIES}
|
||||||
${CMAKE_THREAD_LIBS_INIT}
|
${CMAKE_THREAD_LIBS_INIT}
|
||||||
)
|
)
|
||||||
|
set_target_properties(hdfspp PROPERTIES SOVERSION ${LIBHDFSPP_VERSION})
|
||||||
else (HADOOP_BUILD)
|
else (HADOOP_BUILD)
|
||||||
add_library(hdfspp_static STATIC ${EMPTY_FILE_CC} ${LIBHDFSPP_ALL_OBJECTS})
|
add_library(hdfspp_static STATIC ${EMPTY_FILE_CC} ${LIBHDFSPP_ALL_OBJECTS})
|
||||||
target_link_libraries(hdfspp_static
|
target_link_libraries(hdfspp_static
|
||||||
|
@ -265,18 +269,11 @@ else (HADOOP_BUILD)
|
||||||
${SASL_LIBRARIES}
|
${SASL_LIBRARIES}
|
||||||
${CMAKE_THREAD_LIBS_INIT}
|
${CMAKE_THREAD_LIBS_INIT}
|
||||||
)
|
)
|
||||||
add_library(hdfspp SHARED ${EMPTY_FILE_CC} ${LIBHDFSPP_ALL_OBJECTS})
|
if(BUILD_SHARED_HDFSPP)
|
||||||
target_link_libraries(hdfspp_static
|
add_library(hdfspp SHARED ${EMPTY_FILE_CC} ${LIBHDFSPP_ALL_OBJECTS})
|
||||||
${LIB_DL}
|
set_target_properties(hdfspp PROPERTIES SOVERSION ${LIBHDFSPP_VERSION})
|
||||||
${PROTOBUF_LIBRARY}
|
endif(BUILD_SHARED_HDFSPP)
|
||||||
${OPENSSL_LIBRARIES}
|
|
||||||
${SASL_LIBRARIES}
|
|
||||||
${CMAKE_THREAD_LIBS_INIT}
|
|
||||||
)
|
|
||||||
endif (HADOOP_BUILD)
|
endif (HADOOP_BUILD)
|
||||||
set(LIBHDFSPP_VERSION "0.1.0")
|
|
||||||
set_target_properties(hdfspp PROPERTIES
|
|
||||||
SOVERSION ${LIBHDFSPP_VERSION})
|
|
||||||
|
|
||||||
# Set up make install targets
|
# Set up make install targets
|
||||||
# Can be installed to a particular location via "make DESTDIR=... install"
|
# Can be installed to a particular location via "make DESTDIR=... install"
|
||||||
|
@ -286,7 +283,9 @@ install(FILES ${LIBHDFSPP_HEADER_FILES} DESTINATION include/hdfspp)
|
||||||
install(FILES ${LIBHDFS_HEADER_FILES} DESTINATION include/hdfs)
|
install(FILES ${LIBHDFS_HEADER_FILES} DESTINATION include/hdfs)
|
||||||
|
|
||||||
install(TARGETS hdfspp_static ARCHIVE DESTINATION lib)
|
install(TARGETS hdfspp_static ARCHIVE DESTINATION lib)
|
||||||
install(TARGETS hdfspp LIBRARY DESTINATION lib)
|
if(BUILD_SHARED_HDFSPP)
|
||||||
|
install(TARGETS hdfspp LIBRARY DESTINATION lib)
|
||||||
|
endif(BUILD_SHARED_HDFSPP)
|
||||||
|
|
||||||
add_custom_target(
|
add_custom_target(
|
||||||
InstallToBuildDirectory
|
InstallToBuildDirectory
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
|
|
||||||
|
include_directories( ../../tools )
|
||||||
|
|
||||||
add_subdirectory(cat)
|
add_subdirectory(cat)
|
||||||
add_subdirectory(gendirs)
|
add_subdirectory(gendirs)
|
||||||
add_subdirectory(find)
|
add_subdirectory(find)
|
||||||
|
|
|
@ -24,4 +24,4 @@ include_directories( ${LIBHDFSPP_DIR}/include )
|
||||||
link_directories( ${LIBHDFSPP_DIR}/lib )
|
link_directories( ${LIBHDFSPP_DIR}/lib )
|
||||||
|
|
||||||
add_executable(cat cat.cc)
|
add_executable(cat cat.cc)
|
||||||
target_link_libraries(cat hdfspp_static)
|
target_link_libraries(cat tools_common hdfspp_static)
|
|
@ -31,10 +31,8 @@
|
||||||
**/
|
**/
|
||||||
|
|
||||||
#include "hdfspp/hdfspp.h"
|
#include "hdfspp/hdfspp.h"
|
||||||
#include "common/hdfs_configuration.h"
|
|
||||||
#include "common/configuration_loader.h"
|
|
||||||
|
|
||||||
#include <google/protobuf/stubs/common.h>
|
#include <google/protobuf/stubs/common.h>
|
||||||
|
#include "tools_common.h"
|
||||||
|
|
||||||
const std::size_t BUF_SIZE = 1048576; //1 MB
|
const std::size_t BUF_SIZE = 1048576; //1 MB
|
||||||
static char input_buffer[BUF_SIZE];
|
static char input_buffer[BUF_SIZE];
|
||||||
|
@ -46,35 +44,17 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
std::string path = argv[1];
|
std::string path = argv[1];
|
||||||
|
|
||||||
hdfs::Options options;
|
//Building a URI object from the given uri path
|
||||||
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
|
hdfs::URI uri = hdfs::parse_path_or_exit(path);
|
||||||
hdfs::ConfigurationLoader loader;
|
|
||||||
//Loading default config files core-site.xml and hdfs-site.xml from the config path
|
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri, false);
|
||||||
hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
|
|
||||||
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
|
|
||||||
if(config){
|
|
||||||
//Loading options from the config
|
|
||||||
options = config->GetOptions();
|
|
||||||
}
|
|
||||||
hdfs::IoService * io_service = hdfs::IoService::New();
|
|
||||||
//Wrapping fs into a shared pointer to guarantee deletion
|
|
||||||
std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
|
|
||||||
if (!fs) {
|
if (!fs) {
|
||||||
std::cerr << "Could not connect the file system." << std::endl;
|
std::cerr << "Could not connect the file system. " << std::endl;
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
hdfs::Status status = fs->ConnectToDefaultFs();
|
|
||||||
if (!status.ok()) {
|
|
||||||
if(!options.defaultFS.get_host().empty()){
|
|
||||||
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
|
|
||||||
} else {
|
|
||||||
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
|
|
||||||
}
|
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
hdfs::FileHandle *file_raw = nullptr;
|
hdfs::FileHandle *file_raw = nullptr;
|
||||||
status = fs->Open(path, &file_raw);
|
hdfs::Status status = fs->Open(path, &file_raw);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl;
|
std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl;
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
|
|
|
@ -24,4 +24,4 @@ include_directories( ${LIBHDFSPP_DIR}/include )
|
||||||
link_directories( ${LIBHDFSPP_DIR}/lib )
|
link_directories( ${LIBHDFSPP_DIR}/lib )
|
||||||
|
|
||||||
add_executable(find find.cc)
|
add_executable(find find.cc)
|
||||||
target_link_libraries(find hdfspp_static)
|
target_link_libraries(find tools_common hdfspp_static)
|
|
@ -38,11 +38,9 @@
|
||||||
**/
|
**/
|
||||||
|
|
||||||
#include "hdfspp/hdfspp.h"
|
#include "hdfspp/hdfspp.h"
|
||||||
#include "common/hdfs_configuration.h"
|
|
||||||
#include "common/configuration_loader.h"
|
|
||||||
|
|
||||||
#include <google/protobuf/stubs/common.h>
|
#include <google/protobuf/stubs/common.h>
|
||||||
#include <future>
|
#include <future>
|
||||||
|
#include "tools_common.h"
|
||||||
|
|
||||||
void SyncFind(std::shared_ptr<hdfs::FileSystem> fs, const std::string &path, const std::string &name){
|
void SyncFind(std::shared_ptr<hdfs::FileSystem> fs, const std::string &path, const std::string &name){
|
||||||
std::vector<hdfs::StatInfo> results;
|
std::vector<hdfs::StatInfo> results;
|
||||||
|
@ -119,32 +117,12 @@ int main(int argc, char *argv[]) {
|
||||||
std::string name = argv[2];
|
std::string name = argv[2];
|
||||||
bool use_async = (std::stoi(argv[3]) != 0);
|
bool use_async = (std::stoi(argv[3]) != 0);
|
||||||
|
|
||||||
hdfs::Options options;
|
//Building a URI object from the given uri path
|
||||||
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
|
hdfs::URI uri = hdfs::parse_path_or_exit(path);
|
||||||
hdfs::ConfigurationLoader loader;
|
|
||||||
//Loading default config files core-site.xml and hdfs-site.xml from the config path
|
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri, true);
|
||||||
hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
|
|
||||||
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
|
|
||||||
if(config){
|
|
||||||
//Loading options from the config
|
|
||||||
options = config->GetOptions();
|
|
||||||
}
|
|
||||||
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
|
|
||||||
options.rpc_timeout = std::numeric_limits<int>::max();
|
|
||||||
hdfs::IoService * io_service = hdfs::IoService::New();
|
|
||||||
//Wrapping fs into a unique pointer to guarantee deletion
|
|
||||||
std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
|
|
||||||
if (!fs) {
|
if (!fs) {
|
||||||
std::cerr << "Could not connect the file system." << std::endl;
|
std::cerr << "Could not connect the file system. " << std::endl;
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
hdfs::Status status = fs->ConnectToDefaultFs();
|
|
||||||
if (!status.ok()) {
|
|
||||||
if(!options.defaultFS.get_host().empty()){
|
|
||||||
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
|
|
||||||
} else {
|
|
||||||
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
|
|
||||||
}
|
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,4 +24,4 @@ include_directories( ${LIBHDFSPP_DIR}/include )
|
||||||
link_directories( ${LIBHDFSPP_DIR}/lib )
|
link_directories( ${LIBHDFSPP_DIR}/lib )
|
||||||
|
|
||||||
add_executable(gendirs gendirs.cc)
|
add_executable(gendirs gendirs.cc)
|
||||||
target_link_libraries(gendirs hdfspp_static)
|
target_link_libraries(gendirs tools_common hdfspp_static)
|
|
@ -37,11 +37,9 @@
|
||||||
**/
|
**/
|
||||||
|
|
||||||
#include "hdfspp/hdfspp.h"
|
#include "hdfspp/hdfspp.h"
|
||||||
#include "common/hdfs_configuration.h"
|
|
||||||
#include "common/configuration_loader.h"
|
|
||||||
|
|
||||||
#include <google/protobuf/stubs/common.h>
|
#include <google/protobuf/stubs/common.h>
|
||||||
#include <future>
|
#include <future>
|
||||||
|
#include "tools_common.h"
|
||||||
|
|
||||||
#define DEFAULT_PERMISSIONS 0755
|
#define DEFAULT_PERMISSIONS 0755
|
||||||
|
|
||||||
|
@ -80,32 +78,12 @@ int main(int argc, char *argv[]) {
|
||||||
int depth = std::stoi(argv[2]);
|
int depth = std::stoi(argv[2]);
|
||||||
int fanout = std::stoi(argv[3]);
|
int fanout = std::stoi(argv[3]);
|
||||||
|
|
||||||
hdfs::Options options;
|
//Building a URI object from the given uri path
|
||||||
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
|
hdfs::URI uri = hdfs::parse_path_or_exit(path);
|
||||||
hdfs::ConfigurationLoader loader;
|
|
||||||
//Loading default config files core-site.xml and hdfs-site.xml from the config path
|
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri, true);
|
||||||
hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
|
|
||||||
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
|
|
||||||
if(config){
|
|
||||||
//Loading options from the config
|
|
||||||
options = config->GetOptions();
|
|
||||||
}
|
|
||||||
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
|
|
||||||
options.rpc_timeout = std::numeric_limits<int>::max();
|
|
||||||
hdfs::IoService * io_service = hdfs::IoService::New();
|
|
||||||
//Wrapping fs into a unique pointer to guarantee deletion
|
|
||||||
std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options));
|
|
||||||
if (!fs) {
|
if (!fs) {
|
||||||
std::cerr << "Could not connect the file system." << std::endl;
|
std::cerr << "Could not connect the file system. " << std::endl;
|
||||||
exit(EXIT_FAILURE);
|
|
||||||
}
|
|
||||||
hdfs::Status status = fs->ConnectToDefaultFs();
|
|
||||||
if (!status.ok()) {
|
|
||||||
if(!options.defaultFS.get_host().empty()){
|
|
||||||
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
|
|
||||||
} else {
|
|
||||||
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
|
|
||||||
}
|
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
#ifndef LIBHDFSPP_CONFIGPARSER_H_
|
||||||
|
#define LIBHDFSPP_CONFIGPARSER_H_
|
||||||
|
|
||||||
|
#include "hdfspp/options.h"
|
||||||
|
#include "hdfspp/uri.h"
|
||||||
|
#include "hdfspp/status.h"
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <memory>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace hdfs {
|
||||||
|
|
||||||
|
class ConfigParser {
|
||||||
|
public:
|
||||||
|
ConfigParser();
|
||||||
|
ConfigParser(const std::string& path);
|
||||||
|
ConfigParser(const std::vector<std::string>& configDirectories);
|
||||||
|
~ConfigParser();
|
||||||
|
ConfigParser(ConfigParser&&);
|
||||||
|
ConfigParser& operator=(ConfigParser&&);
|
||||||
|
|
||||||
|
bool LoadDefaultResources();
|
||||||
|
std::vector<std::pair<std::string, Status> > ValidateResources() const;
|
||||||
|
|
||||||
|
// Return false if value couldn't be found or cast to desired type
|
||||||
|
bool get_int(const std::string& key, int& outval) const;
|
||||||
|
int get_int_or(const std::string& key, const int defaultval) const;
|
||||||
|
|
||||||
|
bool get_string(const std::string& key, std::string& outval) const;
|
||||||
|
std::string get_string_or(const std::string& key, const std::string& defaultval) const;
|
||||||
|
|
||||||
|
bool get_bool(const std::string& key, bool& outval) const;
|
||||||
|
bool get_bool_or(const std::string& key, const bool defaultval) const;
|
||||||
|
|
||||||
|
bool get_double(const std::string& key, double& outval) const;
|
||||||
|
double get_double_or(const std::string& key, const double defaultval) const;
|
||||||
|
|
||||||
|
bool get_uri(const std::string& key, URI& outval) const;
|
||||||
|
URI get_uri_or(const std::string& key, const URI& defaultval) const;
|
||||||
|
|
||||||
|
bool get_options(Options& outval) const;
|
||||||
|
Options get_options_or(const Options& defaultval) const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
class impl;
|
||||||
|
std::unique_ptr<impl> pImpl;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif
|
|
@ -26,6 +26,7 @@
|
||||||
#include "hdfspp/fsinfo.h"
|
#include "hdfspp/fsinfo.h"
|
||||||
#include "hdfspp/content_summary.h"
|
#include "hdfspp/content_summary.h"
|
||||||
#include "hdfspp/uri.h"
|
#include "hdfspp/uri.h"
|
||||||
|
#include "hdfspp/config_parser.h"
|
||||||
#include "hdfspp/locks.h"
|
#include "hdfspp/locks.h"
|
||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
|
@ -1295,13 +1295,13 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ssize_t offset = 0;
|
off_t offset = 0;
|
||||||
Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
|
Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur);
|
||||||
if (!stat.ok()) {
|
if (!stat.ok()) {
|
||||||
return Error(stat);
|
return Error(stat);
|
||||||
}
|
}
|
||||||
|
|
||||||
return offset;
|
return (tOffset)offset;
|
||||||
} catch (const std::exception & e) {
|
} catch (const std::exception & e) {
|
||||||
return ReportException(e);
|
return ReportException(e);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
|
|
@ -19,6 +19,6 @@ if(NEED_LINK_DL)
|
||||||
set(LIB_DL dl)
|
set(LIB_DL dl)
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc)
|
add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc statinfo.cc fsinfo.cc content_summary.cc locks.cc config_parser.cc)
|
||||||
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
|
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
|
||||||
target_link_libraries(common ${LIB_DL})
|
target_link_libraries(common ${LIB_DL})
|
||||||
|
|
|
@ -0,0 +1,219 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "hdfspp/config_parser.h"
|
||||||
|
#include "common/hdfs_configuration.h"
|
||||||
|
#include "common/configuration_loader.h"
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
#include <memory>
|
||||||
|
#include <vector>
|
||||||
|
#include <numeric>
|
||||||
|
|
||||||
|
namespace hdfs {
|
||||||
|
|
||||||
|
static const char kSearchPathSeparator = ':';
|
||||||
|
|
||||||
|
HdfsConfiguration LoadDefault(ConfigurationLoader & loader)
|
||||||
|
{
|
||||||
|
optional<HdfsConfiguration> result = loader.LoadDefaultResources<HdfsConfiguration>();
|
||||||
|
if (result)
|
||||||
|
{
|
||||||
|
return result.value();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return loader.NewConfig<HdfsConfiguration>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class ConfigParser::impl {
|
||||||
|
public:
|
||||||
|
impl() :
|
||||||
|
config_(loader_.NewConfig<HdfsConfiguration>()) {
|
||||||
|
}
|
||||||
|
|
||||||
|
impl(const std::vector<std::string>& dirs) :
|
||||||
|
config_(loader_.NewConfig<HdfsConfiguration>()) {
|
||||||
|
|
||||||
|
// Convert vector of paths into ':' separated path
|
||||||
|
std::string path = std::accumulate(dirs.begin(), dirs.end(), std::string(""),
|
||||||
|
[](std::string cumm, std::string elem) {return cumm + kSearchPathSeparator + elem;});
|
||||||
|
loader_.SetSearchPath(path);
|
||||||
|
config_ = LoadDefault(loader_);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl(const std::string& path) :
|
||||||
|
config_(loader_.NewConfig<HdfsConfiguration>()) {
|
||||||
|
|
||||||
|
loader_.SetSearchPath(path);
|
||||||
|
config_ = LoadDefault(loader_);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LoadDefaultResources() {
|
||||||
|
config_ = LoadDefault(loader_);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::pair<std::string, Status> > ValidateResources() const {
|
||||||
|
return loader_.ValidateDefaultResources<HdfsConfiguration>();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get_int(const std::string& key, int& outval) const {
|
||||||
|
auto ret = config_.GetInt(key);
|
||||||
|
if (!ret) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
outval = *ret;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get_string(const std::string& key, std::string& outval) const {
|
||||||
|
auto ret = config_.Get(key);
|
||||||
|
if (!ret) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
outval = *ret;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get_bool(const std::string& key, bool& outval) const {
|
||||||
|
auto ret = config_.GetBool(key);
|
||||||
|
if (!ret) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
outval = *ret;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get_double(const std::string& key, double& outval) const {
|
||||||
|
auto ret = config_.GetDouble(key);
|
||||||
|
if (!ret) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
outval = *ret;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get_uri(const std::string& key, URI& outval) const {
|
||||||
|
auto ret = config_.GetUri(key);
|
||||||
|
if (!ret) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
outval = *ret;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get_options(Options& outval) {
|
||||||
|
outval = config_.GetOptions();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
ConfigurationLoader loader_;
|
||||||
|
HdfsConfiguration config_;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
ConfigParser::ConfigParser() {
|
||||||
|
pImpl.reset(new ConfigParser::impl());
|
||||||
|
}
|
||||||
|
|
||||||
|
ConfigParser::ConfigParser(const std::vector<std::string>& configDirectories) {
|
||||||
|
pImpl.reset(new ConfigParser::impl(configDirectories));
|
||||||
|
}
|
||||||
|
|
||||||
|
ConfigParser::ConfigParser(const std::string& path) {
|
||||||
|
pImpl.reset(new ConfigParser::impl(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
ConfigParser::~ConfigParser() = default;
|
||||||
|
ConfigParser::ConfigParser(ConfigParser&&) = default;
|
||||||
|
ConfigParser& ConfigParser::operator=(ConfigParser&&) = default;
|
||||||
|
|
||||||
|
bool ConfigParser::LoadDefaultResources() { return pImpl->LoadDefaultResources(); }
|
||||||
|
std::vector<std::pair<std::string, Status> > ConfigParser::ValidateResources() const { return pImpl->ValidateResources();}
|
||||||
|
|
||||||
|
bool ConfigParser::get_int(const std::string& key, int& outval) const { return pImpl->get_int(key, outval); }
|
||||||
|
int ConfigParser::get_int_or(const std::string& key, const int defaultval) const {
|
||||||
|
int res = 0;
|
||||||
|
if(get_int(key, res)) {
|
||||||
|
return res;
|
||||||
|
} else {
|
||||||
|
return defaultval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ConfigParser::get_string(const std::string& key, std::string& outval) const { return pImpl->get_string(key, outval); }
|
||||||
|
std::string ConfigParser::get_string_or(const std::string& key, const std::string& defaultval) const {
|
||||||
|
std::string res;
|
||||||
|
if(get_string(key, res)) {
|
||||||
|
return res;
|
||||||
|
} else {
|
||||||
|
return defaultval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ConfigParser::get_bool(const std::string& key, bool& outval) const { return pImpl->get_bool(key, outval); }
|
||||||
|
bool ConfigParser::get_bool_or(const std::string& key, const bool defaultval) const {
|
||||||
|
bool res = false;
|
||||||
|
if(get_bool(key, res)) {
|
||||||
|
return res;
|
||||||
|
} else {
|
||||||
|
return defaultval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ConfigParser::get_double(const std::string& key, double& outval) const { return pImpl->get_double(key, outval); }
|
||||||
|
double ConfigParser::get_double_or(const std::string& key, const double defaultval) const {
|
||||||
|
double res = 0;
|
||||||
|
if(get_double(key, res)) {
|
||||||
|
return res;
|
||||||
|
} else {
|
||||||
|
return defaultval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ConfigParser::get_uri(const std::string& key, URI& outval) const { return pImpl->get_uri(key, outval); }
|
||||||
|
URI ConfigParser::get_uri_or(const std::string& key, const URI& defaultval) const {
|
||||||
|
URI res;
|
||||||
|
if(get_uri(key, res)) {
|
||||||
|
return res;
|
||||||
|
} else {
|
||||||
|
res = defaultval;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ConfigParser::get_options(Options& outval) const { return pImpl->get_options(outval); }
|
||||||
|
Options ConfigParser::get_options_or(const Options& defaultval) const {
|
||||||
|
Options res;
|
||||||
|
if(get_options(res)) {
|
||||||
|
return res;
|
||||||
|
} else {
|
||||||
|
res = defaultval;
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end namespace hdfs
|
|
@ -72,8 +72,8 @@ optional<int64_t> Configuration::GetInt(const std::string& key) const {
|
||||||
if (raw) {
|
if (raw) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
char* end = nullptr;
|
char* end = nullptr;
|
||||||
auto result =
|
optional<int64_t> result =
|
||||||
std::experimental::make_optional(strtol(raw->c_str(), &end, 10));
|
std::experimental::make_optional(static_cast<int64_t>(strtol(raw->c_str(), &end, 10)));
|
||||||
if (end == raw->c_str()) {
|
if (end == raw->c_str()) {
|
||||||
/* strtoll will set end to input if no conversion was done */
|
/* strtoll will set end to input if no conversion was done */
|
||||||
return optional<int64_t>();
|
return optional<int64_t>();
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "configuration_loader.h"
|
#include "configuration_loader.h"
|
||||||
|
#include "common/logging.h"
|
||||||
|
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <strings.h>
|
#include <strings.h>
|
||||||
|
@ -74,7 +75,11 @@ void ConfigurationLoader::SetDefaultSearchPath() {
|
||||||
// /etc/hadoop/conf
|
// /etc/hadoop/conf
|
||||||
const char * hadoop_conf_dir_env = getenv("HADOOP_CONF_DIR");
|
const char * hadoop_conf_dir_env = getenv("HADOOP_CONF_DIR");
|
||||||
if (hadoop_conf_dir_env) {
|
if (hadoop_conf_dir_env) {
|
||||||
AddToSearchPath(hadoop_conf_dir_env);
|
std::stringstream ss(hadoop_conf_dir_env);
|
||||||
|
std::string path;
|
||||||
|
while (std::getline(ss, path, kSearchPathSeparator)) {
|
||||||
|
AddToSearchPath(path);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
AddToSearchPath("/etc/hadoop/conf");
|
AddToSearchPath("/etc/hadoop/conf");
|
||||||
}
|
}
|
||||||
|
@ -136,6 +141,63 @@ std::string ConfigurationLoader::GetSearchPath()
|
||||||
return result.str();
|
return result.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status validateStream(std::istream & stream) {
|
||||||
|
std::streampos start = stream.tellg();
|
||||||
|
stream.seekg(0, std::ios::end);
|
||||||
|
std::streampos end = stream.tellg();
|
||||||
|
stream.seekg(start, std::ios::beg);
|
||||||
|
|
||||||
|
int length = end - start;
|
||||||
|
|
||||||
|
if (length <= 0 || start == -1 || end == -1)
|
||||||
|
return Status::Error("The configuration file is empty");
|
||||||
|
|
||||||
|
LOG_DEBUG(kFileSystem, << "validateStream will read a config file of length " << length);
|
||||||
|
|
||||||
|
std::vector<char> raw_bytes((int64_t)length + 1);
|
||||||
|
stream.read(&raw_bytes[0], length);
|
||||||
|
raw_bytes[length] = 0;
|
||||||
|
|
||||||
|
try {
|
||||||
|
rapidxml::xml_document<> dom;
|
||||||
|
dom.parse<rapidxml::parse_trim_whitespace|rapidxml::parse_validate_closing_tags>(&raw_bytes[0]);
|
||||||
|
|
||||||
|
/* File must contain a single <configuration> stanza */
|
||||||
|
auto config_node = dom.first_node("configuration", 0, false);
|
||||||
|
if (!config_node) {
|
||||||
|
return Status::Error("The configuration file is missing a 'configuration' tag");
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
|
} catch (const rapidxml::parse_error &e) {
|
||||||
|
size_t location = e.where<char>() - &raw_bytes[0];
|
||||||
|
std::string msg = "The configuration file has invalid xml around character " + std::to_string(location);
|
||||||
|
return Status::Error(msg.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::pair<std::string, Status> > ConfigurationLoader::ValidateResources(std::vector<std::string> filenames) const
|
||||||
|
{
|
||||||
|
std::vector<std::pair<std::string, Status> > stats;
|
||||||
|
bool found;
|
||||||
|
for(auto file: filenames) {
|
||||||
|
found = false;
|
||||||
|
for(auto dir: search_path_) {
|
||||||
|
std::ifstream stream(dir + file);
|
||||||
|
if ( stream.is_open() ) {
|
||||||
|
found = true;
|
||||||
|
stats.push_back(std::make_pair(file,validateStream(stream)));
|
||||||
|
} else {
|
||||||
|
LOG_DEBUG(kFileSystem, << dir << file << " was not found");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(!found) {
|
||||||
|
std::string msg("No directory in the current search path contains the file [" + file + "]");
|
||||||
|
stats.push_back(std::make_pair(file,Status::PathNotFound(msg.c_str())));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
bool ConfigurationLoader::UpdateMapWithFile(ConfigMap & map, const std::string & path) const
|
bool ConfigurationLoader::UpdateMapWithFile(ConfigMap & map, const std::string & path) const
|
||||||
{
|
{
|
||||||
if (path.front() == kFileSeparator) { // Absolute path
|
if (path.front() == kFileSeparator) { // Absolute path
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#define COMMON_CONFIGURATION_BUILDER_H_
|
#define COMMON_CONFIGURATION_BUILDER_H_
|
||||||
|
|
||||||
#include "configuration.h"
|
#include "configuration.h"
|
||||||
|
#include "hdfspp/status.h"
|
||||||
|
|
||||||
namespace hdfs {
|
namespace hdfs {
|
||||||
|
|
||||||
|
@ -75,6 +76,13 @@ public:
|
||||||
optional<T> LoadDefaultResources();
|
optional<T> LoadDefaultResources();
|
||||||
|
|
||||||
|
|
||||||
|
// Returns a vector of filenames and the corresponding status when validation is attempted.
|
||||||
|
// If the files can be successfully validated, then the status returned for that file is Status::OK
|
||||||
|
// The files that are validated are those returned by T::GetDefaultFilenames().
|
||||||
|
// T must be Configuration or a subclass
|
||||||
|
template<class T>
|
||||||
|
std::vector<std::pair<std::string, Status>> ValidateDefaultResources() const;
|
||||||
|
|
||||||
/****************************************************************************
|
/****************************************************************************
|
||||||
* SEARCH PATH METHODS
|
* SEARCH PATH METHODS
|
||||||
***************************************************************************/
|
***************************************************************************/
|
||||||
|
@ -98,6 +106,8 @@ public:
|
||||||
protected:
|
protected:
|
||||||
using ConfigMap = Configuration::ConfigMap;
|
using ConfigMap = Configuration::ConfigMap;
|
||||||
|
|
||||||
|
std::vector<std::pair<std::string, Status>> ValidateResources(std::vector<std::string> filenames) const;
|
||||||
|
|
||||||
// Updates the src map with data from the XML in the path
|
// Updates the src map with data from the XML in the path
|
||||||
// The search path will be searched for the filename
|
// The search path will be searched for the filename
|
||||||
bool UpdateMapWithFile(ConfigMap & map, const std::string & path) const;
|
bool UpdateMapWithFile(ConfigMap & map, const std::string & path) const;
|
||||||
|
|
|
@ -111,6 +111,11 @@ optional<T> ConfigurationLoader::LoadDefaultResources() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<class T>
|
||||||
|
std::vector<std::pair<std::string, Status> > ConfigurationLoader::ValidateDefaultResources() const{
|
||||||
|
return ValidateResources(T::GetDefaultFilenames());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ struct ReadDelimitedPBMessageContinuation : public Continuation {
|
||||||
uint32_t size = 0;
|
uint32_t size = 0;
|
||||||
bool v = is.ReadVarint32(&size);
|
bool v = is.ReadVarint32(&size);
|
||||||
assert(v);
|
assert(v);
|
||||||
|
(void)v; //avoids unused variable warning
|
||||||
is.PushLimit(size);
|
is.PushLimit(size);
|
||||||
msg_->Clear();
|
msg_->Clear();
|
||||||
v = msg_->MergeFromCodedStream(&is);
|
v = msg_->MergeFromCodedStream(&is);
|
||||||
|
|
|
@ -54,6 +54,7 @@ RetryAction FixedDelayWithFailover::ShouldRetry(const Status &s, uint64_t retrie
|
||||||
uint64_t failovers,
|
uint64_t failovers,
|
||||||
bool isIdempotentOrAtMostOnce) const {
|
bool isIdempotentOrAtMostOnce) const {
|
||||||
(void)isIdempotentOrAtMostOnce;
|
(void)isIdempotentOrAtMostOnce;
|
||||||
|
(void)max_failover_conn_retries_;
|
||||||
LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")");
|
LOG_TRACE(kRPC, << "FixedDelayWithFailover::ShouldRetry(retries=" << retries << ", failovers=" << failovers << ")");
|
||||||
|
|
||||||
if(failovers < max_failover_retries_ && (s.code() == ::asio::error::timed_out || s.get_server_exception_type() == Status::kStandbyException) )
|
if(failovers < max_failover_retries_ && (s.code() == ::asio::error::timed_out || s.get_server_exception_type() == Status::kStandbyException) )
|
||||||
|
|
|
@ -52,7 +52,7 @@ void FileHandleImpl::PositionRead(
|
||||||
const std::function<void(const Status &, size_t)> &handler) {
|
const std::function<void(const Status &, size_t)> &handler) {
|
||||||
LOG_DEBUG(kFileHandle, << "FileHandleImpl::PositionRead("
|
LOG_DEBUG(kFileHandle, << "FileHandleImpl::PositionRead("
|
||||||
<< FMT_THIS_ADDR << ", buf=" << buf
|
<< FMT_THIS_ADDR << ", buf=" << buf
|
||||||
<< ", buf_size=" << buf_size << ") called");
|
<< ", buf_size=" << std::to_string(buf_size) << ") called");
|
||||||
|
|
||||||
/* prevent usage after cancelation */
|
/* prevent usage after cancelation */
|
||||||
if(cancel_state_->is_canceled()) {
|
if(cancel_state_->is_canceled()) {
|
||||||
|
@ -78,7 +78,7 @@ void FileHandleImpl::PositionRead(
|
||||||
Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) {
|
Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) {
|
||||||
LOG_DEBUG(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
|
LOG_DEBUG(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
|
||||||
<< FMT_THIS_ADDR << ", buf=" << buf
|
<< FMT_THIS_ADDR << ", buf=" << buf
|
||||||
<< ", buf_size=" << buf_size
|
<< ", buf_size=" << std::to_string(buf_size)
|
||||||
<< ", offset=" << offset << ") called");
|
<< ", offset=" << offset << ") called");
|
||||||
|
|
||||||
auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
|
auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
|
||||||
|
@ -106,7 +106,7 @@ Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, si
|
||||||
Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) {
|
Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) {
|
||||||
LOG_DEBUG(kFileHandle, << "FileHandleImpl::Read("
|
LOG_DEBUG(kFileHandle, << "FileHandleImpl::Read("
|
||||||
<< FMT_THIS_ADDR << ", buf=" << buf
|
<< FMT_THIS_ADDR << ", buf=" << buf
|
||||||
<< ", buf_size=" << buf_size << ") called");
|
<< ", buf_size=" << std::to_string(buf_size) << ") called");
|
||||||
|
|
||||||
Status stat = PositionRead(buf, buf_size, offset_, bytes_read);
|
Status stat = PositionRead(buf, buf_size, offset_, bytes_read);
|
||||||
if(!stat.ok()) {
|
if(!stat.ok()) {
|
||||||
|
@ -237,7 +237,7 @@ void FileHandleImpl::AsyncPreadSome(
|
||||||
|
|
||||||
LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
|
LOG_DEBUG(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
|
||||||
<< FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr
|
<< FMT_THIS_ADDR << "), ...) Datanode hostname=" << dnHostName << ", IP Address=" << dnIpAddr
|
||||||
<< ", file path=\"" << path_ << "\", offset=" << offset << ", read size=" << size_within_block);
|
<< ", file path=\"" << path_ << "\", offset=" << std::to_string(offset) << ", read size=" << size_within_block);
|
||||||
|
|
||||||
// This is where we will put the logic for re-using a DN connection; we can
|
// This is where we will put the logic for re-using a DN connection; we can
|
||||||
// steal the FileHandle's dn and put it back when we're done
|
// steal the FileHandle's dn and put it back when we're done
|
||||||
|
|
|
@ -203,9 +203,9 @@ public:
|
||||||
/* all monitored events will need to lookup handlers */
|
/* all monitored events will need to lookup handlers */
|
||||||
std::shared_ptr<LibhdfsEvents> get_event_handlers();
|
std::shared_ptr<LibhdfsEvents> get_event_handlers();
|
||||||
|
|
||||||
Options get_options();
|
Options get_options() override;
|
||||||
|
|
||||||
std::string get_cluster_name();
|
std::string get_cluster_name() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -41,7 +41,7 @@ namespace hdfs {
|
||||||
|
|
||||||
class NameNodeOperations {
|
class NameNodeOperations {
|
||||||
public:
|
public:
|
||||||
MEMCHECKED_CLASS(NameNodeOperations);
|
MEMCHECKED_CLASS(NameNodeOperations)
|
||||||
NameNodeOperations(::asio::io_service *io_service, const Options &options,
|
NameNodeOperations(::asio::io_service *io_service, const Options &options,
|
||||||
const std::string &client_name, const std::string &user_name,
|
const std::string &client_name, const std::string &user_name,
|
||||||
const char *protocol_name, int protocol_version) :
|
const char *protocol_name, int protocol_version) :
|
||||||
|
|
|
@ -177,6 +177,7 @@ struct BlockReaderImpl::ReadPacketHeader : continuation::Continuation
|
||||||
bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
|
bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart],
|
||||||
header_length());
|
header_length());
|
||||||
assert(v && "Failed to parse the header");
|
assert(v && "Failed to parse the header");
|
||||||
|
(void)v; //avoids unused variable warning
|
||||||
parent_->state_ = kReadChecksum;
|
parent_->state_ = kReadChecksum;
|
||||||
}
|
}
|
||||||
if(parent_->event_handlers_) {
|
if(parent_->event_handlers_) {
|
||||||
|
|
|
@ -31,8 +31,6 @@ namespace pbio = ::google::protobuf::io;
|
||||||
using namespace ::hadoop::common;
|
using namespace ::hadoop::common;
|
||||||
using namespace ::std::placeholders;
|
using namespace ::std::placeholders;
|
||||||
|
|
||||||
static const int kNoRetry = -1;
|
|
||||||
|
|
||||||
static void AddHeadersToPacket(
|
static void AddHeadersToPacket(
|
||||||
std::string *res, std::initializer_list<const pb::MessageLite *> headers,
|
std::string *res, std::initializer_list<const pb::MessageLite *> headers,
|
||||||
const std::string *payload) {
|
const std::string *payload) {
|
||||||
|
|
|
@ -38,14 +38,14 @@ namespace hdfs {
|
||||||
template <class Socket>
|
template <class Socket>
|
||||||
class RpcConnectionImpl : public RpcConnection {
|
class RpcConnectionImpl : public RpcConnection {
|
||||||
public:
|
public:
|
||||||
MEMCHECKED_CLASS(RpcConnectionImpl);
|
MEMCHECKED_CLASS(RpcConnectionImpl)
|
||||||
|
|
||||||
RpcConnectionImpl(RpcEngine *engine);
|
RpcConnectionImpl(RpcEngine *engine);
|
||||||
virtual ~RpcConnectionImpl() override;
|
virtual ~RpcConnectionImpl() override;
|
||||||
|
|
||||||
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
|
virtual void Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
|
||||||
const AuthInfo & auth_info,
|
const AuthInfo & auth_info,
|
||||||
RpcCallback &handler);
|
RpcCallback &handler) override;
|
||||||
virtual void ConnectAndFlush(
|
virtual void ConnectAndFlush(
|
||||||
const std::vector<::asio::ip::tcp::endpoint> &server) override;
|
const std::vector<::asio::ip::tcp::endpoint> &server) override;
|
||||||
virtual void SendHandshake(RpcCallback &handler) override;
|
virtual void SendHandshake(RpcCallback &handler) override;
|
||||||
|
|
|
@ -68,7 +68,7 @@ void RpcEngine::Connect(const std::string &cluster_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Construct retry policy after we determine if config is HA
|
// Construct retry policy after we determine if config is HA
|
||||||
retry_policy_ = std::move(MakeRetryPolicy(options_));
|
retry_policy_ = MakeRetryPolicy(options_);
|
||||||
|
|
||||||
conn_ = InitializeConnection();
|
conn_ = InitializeConnection();
|
||||||
conn_->Connect(last_endpoints_, auth_info_, handler);
|
conn_->Connect(last_endpoints_, auth_info_, handler);
|
||||||
|
@ -133,7 +133,7 @@ std::string RpcEngine::getRandomClientId()
|
||||||
|
|
||||||
void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) {
|
void RpcEngine::TEST_SetRpcConnection(std::shared_ptr<RpcConnection> conn) {
|
||||||
conn_ = conn;
|
conn_ = conn;
|
||||||
retry_policy_ = std::move(MakeRetryPolicy(options_));
|
retry_policy_ = MakeRetryPolicy(options_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RpcEngine::TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy) {
|
void RpcEngine::TEST_SetRetryPolicy(std::unique_ptr<const RetryPolicy> policy) {
|
||||||
|
@ -188,7 +188,7 @@ void RpcEngine::AsyncRpcCommsError(
|
||||||
const Status &status,
|
const Status &status,
|
||||||
std::shared_ptr<RpcConnection> failedConnection,
|
std::shared_ptr<RpcConnection> failedConnection,
|
||||||
std::vector<std::shared_ptr<Request>> pendingRequests) {
|
std::vector<std::shared_ptr<Request>> pendingRequests) {
|
||||||
LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << pendingRequests.size());
|
LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << std::to_string(pendingRequests.size()));
|
||||||
|
|
||||||
io_service().post([this, status, failedConnection, pendingRequests]() {
|
io_service().post([this, status, failedConnection, pendingRequests]() {
|
||||||
RpcCommsError(status, failedConnection, pendingRequests);
|
RpcCommsError(status, failedConnection, pendingRequests);
|
||||||
|
@ -199,7 +199,7 @@ void RpcEngine::RpcCommsError(
|
||||||
const Status &status,
|
const Status &status,
|
||||||
std::shared_ptr<RpcConnection> failedConnection,
|
std::shared_ptr<RpcConnection> failedConnection,
|
||||||
std::vector<std::shared_ptr<Request>> pendingRequests) {
|
std::vector<std::shared_ptr<Request>> pendingRequests) {
|
||||||
LOG_WARN(kRPC, << "RpcEngine::RpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << pendingRequests.size());
|
LOG_WARN(kRPC, << "RpcEngine::RpcCommsError called; status=\"" << status.ToString() << "\" conn=" << failedConnection.get() << " reqs=" << std::to_string(pendingRequests.size()));
|
||||||
|
|
||||||
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
|
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
|
||||||
|
|
||||||
|
@ -252,7 +252,7 @@ void RpcEngine::RpcCommsError(
|
||||||
head_action && head_action->action != RetryAction::FAIL;
|
head_action && head_action->action != RetryAction::FAIL;
|
||||||
|
|
||||||
if (haveRequests) {
|
if (haveRequests) {
|
||||||
LOG_TRACE(kRPC, << "Have " << pendingRequests.size() << " requests to resend");
|
LOG_TRACE(kRPC, << "Have " << std::to_string(pendingRequests.size()) << " requests to resend");
|
||||||
bool needNewConnection = !conn_;
|
bool needNewConnection = !conn_;
|
||||||
if (needNewConnection) {
|
if (needNewConnection) {
|
||||||
LOG_DEBUG(kRPC, << "Creating a new NN conection");
|
LOG_DEBUG(kRPC, << "Creating a new NN conection");
|
||||||
|
|
|
@ -53,7 +53,7 @@ Status SaslEngine::SetPasswordInfo(const std::string &id,
|
||||||
bool SaslEngine::ChooseMech(const std::vector<SaslMethod> &resp_auths) {
|
bool SaslEngine::ChooseMech(const std::vector<SaslMethod> &resp_auths) {
|
||||||
Status status = Status::OK();
|
Status status = Status::OK();
|
||||||
|
|
||||||
if (resp_auths.empty()) return NULL;
|
if (resp_auths.empty()) return false;
|
||||||
|
|
||||||
for (SaslMethod auth: resp_auths) {
|
for (SaslMethod auth: resp_auths) {
|
||||||
if ( auth.mechanism != "GSSAPI") continue; // Hack: only GSSAPI for now
|
if ( auth.mechanism != "GSSAPI") continue; // Hack: only GSSAPI for now
|
||||||
|
@ -74,7 +74,7 @@ bool SaslEngine::ChooseMech(const std::vector<SaslMethod> &resp_auths) {
|
||||||
// Clear out the chosen mech
|
// Clear out the chosen mech
|
||||||
chosen_mech_ = SaslMethod();
|
chosen_mech_ = SaslMethod();
|
||||||
|
|
||||||
return NULL;
|
return false;
|
||||||
} // choose_mech()
|
} // choose_mech()
|
||||||
|
|
||||||
} // namespace hdfs
|
} // namespace hdfs
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#ifndef TESTS_CONFIGURATION_H_
|
#ifndef TESTS_CONFIGURATION_H_
|
||||||
#define TESTS_CONFIGURATION_H_
|
#define TESTS_CONFIGURATION_H_
|
||||||
|
|
||||||
|
#include "hdfspp/config_parser.h"
|
||||||
#include "common/configuration.h"
|
#include "common/configuration.h"
|
||||||
#include "common/configuration_loader.h"
|
#include "common/configuration_loader.h"
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
|
@ -50,6 +51,28 @@ void simpleConfigStream(std::stringstream& out, Args... args) {
|
||||||
out << "</configuration>";
|
out << "</configuration>";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T, typename U>
|
||||||
|
void damagedConfigStreamProperty(std::stringstream& out, T key, U value) {
|
||||||
|
out << "<propertyy>"
|
||||||
|
<< "<name>" << key << "</name>"
|
||||||
|
<< "<value>" << value << "</value>"
|
||||||
|
<< "</property>";
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename T, typename U, typename... Args>
|
||||||
|
void damagedConfigStreamProperty(std::stringstream& out, T key, U value,
|
||||||
|
Args... args) {
|
||||||
|
damagedConfigStreamProperty(out, key, value);
|
||||||
|
damagedConfigStreamProperty(out, args...);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename... Args>
|
||||||
|
void damagedConfigStream(std::stringstream& out, Args... args) {
|
||||||
|
out << "<configuration>";
|
||||||
|
damagedConfigStreamProperty(out, args...);
|
||||||
|
out << "</configuration>";
|
||||||
|
}
|
||||||
|
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
optional<Configuration> simpleConfig(Args... args) {
|
optional<Configuration> simpleConfig(Args... args) {
|
||||||
std::stringstream stream;
|
std::stringstream stream;
|
||||||
|
@ -72,6 +95,16 @@ void writeSimpleConfig(const std::string& filename, Args... args) {
|
||||||
out << stream.rdbuf();
|
out << stream.rdbuf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename... Args>
|
||||||
|
void writeDamagedConfig(const std::string& filename, Args... args) {
|
||||||
|
std::stringstream stream;
|
||||||
|
damagedConfigStream(stream, args...);
|
||||||
|
|
||||||
|
std::ofstream out;
|
||||||
|
out.open(filename);
|
||||||
|
out << stream.rdbuf();
|
||||||
|
}
|
||||||
|
|
||||||
// TempDir: is deleted on destruction
|
// TempDir: is deleted on destruction
|
||||||
class TempFile {
|
class TempFile {
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "common/hdfs_configuration.h"
|
#include "common/hdfs_configuration.h"
|
||||||
#include "configuration_test.h"
|
#include "configuration_test.h"
|
||||||
#include <gmock/gmock.h>
|
#include <gmock/gmock.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
using ::testing::_;
|
using ::testing::_;
|
||||||
|
|
||||||
|
@ -111,6 +112,53 @@ TEST(HdfsConfigurationTest, TestDefaultConfigs) {
|
||||||
EXPECT_TRUE(config && "Parse streams");
|
EXPECT_TRUE(config && "Parse streams");
|
||||||
EXPECT_EQ("value2", config->GetWithDefault("key2", ""));
|
EXPECT_EQ("value2", config->GetWithDefault("key2", ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(HdfsConfigurationTest, TestConfigParserAPI) {
|
||||||
|
// Config parser API
|
||||||
|
{
|
||||||
|
TempDir tempDir;
|
||||||
|
TempFile coreSite(tempDir.path + "/core-site.xml");
|
||||||
|
writeSimpleConfig(coreSite.filename, "key1", "value1");
|
||||||
|
TempFile hdfsSite(tempDir.path + "/hdfs-site.xml");
|
||||||
|
writeSimpleConfig(hdfsSite.filename, "key2", "value2");
|
||||||
|
|
||||||
|
ConfigParser parser(tempDir.path);
|
||||||
|
|
||||||
|
EXPECT_EQ("value1", parser.get_string_or("key1", ""));
|
||||||
|
EXPECT_EQ("value2", parser.get_string_or("key2", ""));
|
||||||
|
|
||||||
|
auto stats = parser.ValidateResources();
|
||||||
|
|
||||||
|
EXPECT_EQ("core-site.xml", stats[0].first);
|
||||||
|
EXPECT_EQ("OK", stats[0].second.ToString());
|
||||||
|
|
||||||
|
EXPECT_EQ("hdfs-site.xml", stats[1].first);
|
||||||
|
EXPECT_EQ("OK", stats[1].second.ToString());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
TempDir tempDir;
|
||||||
|
TempFile coreSite(tempDir.path + "/core-site.xml");
|
||||||
|
writeSimpleConfig(coreSite.filename, "key1", "value1");
|
||||||
|
TempFile hdfsSite(tempDir.path + "/hdfs-site.xml");
|
||||||
|
writeDamagedConfig(hdfsSite.filename, "key2", "value2");
|
||||||
|
|
||||||
|
ConfigParser parser(tempDir.path);
|
||||||
|
|
||||||
|
EXPECT_EQ("value1", parser.get_string_or("key1", ""));
|
||||||
|
EXPECT_EQ("", parser.get_string_or("key2", ""));
|
||||||
|
|
||||||
|
auto stats = parser.ValidateResources();
|
||||||
|
|
||||||
|
EXPECT_EQ("core-site.xml", stats[0].first);
|
||||||
|
EXPECT_EQ("OK", stats[0].second.ToString());
|
||||||
|
|
||||||
|
EXPECT_EQ("hdfs-site.xml", stats[1].first);
|
||||||
|
EXPECT_EQ("Exception:The configuration file has invalid xml around character 74", stats[1].second.ToString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[])
|
int main(int argc, char *argv[])
|
||||||
|
|
|
@ -22,15 +22,29 @@
|
||||||
namespace hdfs {
|
namespace hdfs {
|
||||||
|
|
||||||
std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout) {
|
std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout) {
|
||||||
|
|
||||||
|
//This sets the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
|
||||||
|
//and loads default config files core-site.xml and hdfs-site.xml from the config path
|
||||||
|
hdfs::ConfigParser parser;
|
||||||
|
if(!parser.LoadDefaultResources()){
|
||||||
|
std::cerr << "Could not load default resources. " << std::endl;
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
auto stats = parser.ValidateResources();
|
||||||
|
//validating core-site.xml
|
||||||
|
if(!stats[0].second.ok()){
|
||||||
|
std::cerr << stats[0].first << " is invalid: " << stats[0].second.ToString() << std::endl;
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
//validating hdfs-site.xml
|
||||||
|
if(!stats[1].second.ok()){
|
||||||
|
std::cerr << stats[1].first << " is invalid: " << stats[1].second.ToString() << std::endl;
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
hdfs::Options options;
|
hdfs::Options options;
|
||||||
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
|
if(!parser.get_options(options)){
|
||||||
hdfs::ConfigurationLoader loader;
|
std::cerr << "Could not load Options object. " << std::endl;
|
||||||
//Loading default config files core-site.xml and hdfs-site.xml from the config path
|
exit(EXIT_FAILURE);
|
||||||
hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
|
|
||||||
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
|
|
||||||
if(config){
|
|
||||||
//Loading options from the config
|
|
||||||
options = config->GetOptions();
|
|
||||||
}
|
}
|
||||||
if(max_timeout){
|
if(max_timeout){
|
||||||
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
|
//TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish
|
||||||
|
|
|
@ -21,9 +21,6 @@
|
||||||
#define TOOLS_COMMON_H_
|
#define TOOLS_COMMON_H_
|
||||||
|
|
||||||
#include "hdfspp/hdfspp.h"
|
#include "hdfspp/hdfspp.h"
|
||||||
#include "hdfspp/uri.h"
|
|
||||||
#include "common/hdfs_configuration.h"
|
|
||||||
#include "common/configuration_loader.h"
|
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
namespace hdfs {
|
namespace hdfs {
|
||||||
|
|
Loading…
Reference in New Issue