HDFS-11028: libhdfs++: FileSystem needs to be able to cancel pending connections. Contributed by James Clampffer

This commit is contained in:
James 2017-01-23 18:19:13 -05:00 committed by James Clampffer
parent 58de2df860
commit 8783461e2e
20 changed files with 729 additions and 143 deletions

View File

@ -17,3 +17,4 @@
# #
add_subdirectory(cat) add_subdirectory(cat)
add_subdirectory(connect_cancel)

View File

@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default LIBHDFSPP_DIR to the default install location. You can override
# it by add -DLIBHDFSPP_DIR=... to your cmake invocation
set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
include_directories( ${LIBHDFSPP_DIR}/include )
link_directories( ${LIBHDFSPP_DIR}/lib )
add_executable(connect_cancel_c connect_cancel.c)
target_link_libraries(connect_cancel_c hdfspp uriparser2)
# Several examples in different languages need to produce executables with
# same names. To allow executables with same names we keep their CMake
# names different, but specify their executable names as follows:
set_target_properties( connect_cancel_c
PROPERTIES
OUTPUT_NAME "connect_cancel_c"
)

View File

@ -0,0 +1,107 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
/*
Attempt to connect to a cluster and use Control-C to bail out if it takes a while.
Valid config must be in environment variable $HADOOP_CONF_DIR
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include "hdfspp/hdfs_ext.h"
#include "common/util_c.h"
#define ERROR_BUFFER_SIZE 1024
// Global so signal handler can get at it
hdfsFS fs = NULL;
const char *catch_enter = "In signal handler, going to try and cancel.\n";
const char *catch_cancel = "hdfsCancelPendingConnect has been canceled in the signal handler.\n";
const char *catch_exit = "Exiting the signal handler.\n";
// Print to stdout without calling malloc or otherwise indirectly modify userspace state.
// Write calls to stdout may still interleave with stuff coming from elsewhere.
static void sighandler_direct_stdout(const char *msg) {
if(!msg)
return;
ssize_t res = write(1 /*posix stdout fd*/, msg, strlen(msg));
(void)res;
}
static void sig_catch(int val) {
// Beware of calling things that aren't reentrant e.g. malloc while in a signal handler.
sighandler_direct_stdout(catch_enter);
if(fs) {
hdfsCancelPendingConnection(fs);
sighandler_direct_stdout(catch_cancel);
}
sighandler_direct_stdout(catch_exit);
}
int main(int argc, char** argv) {
hdfsSetLoggingLevel(HDFSPP_LOG_LEVEL_INFO);
signal(SIGINT, sig_catch);
char error_text[ERROR_BUFFER_SIZE];
if (argc != 1) {
fprintf(stderr, "usage: ./connect_cancel_c\n");
ShutdownProtobufLibrary_C();
exit(EXIT_FAILURE);
}
const char *hdfsconfdir = getenv("HADOOP_CONF_DIR");
if(!hdfsconfdir) {
fprintf(stderr, "$HADOOP_CONF_DIR must be set\n");
ShutdownProtobufLibrary_C();
exit(EXIT_FAILURE);
}
struct hdfsBuilder* builder = hdfsNewBuilderFromDirectory(hdfsconfdir);
fs = hdfsAllocateFileSystem(builder);
if (fs == NULL) {
hdfsGetLastError(error_text, ERROR_BUFFER_SIZE);
fprintf(stderr, "hdfsAllocateFileSystem returned null.\n%s\n", error_text);
hdfsFreeBuilder(builder);
ShutdownProtobufLibrary_C();
exit(EXIT_FAILURE);
}
int connected = hdfsConnectAllocated(fs, builder);
if (connected != 0) {
hdfsGetLastError(error_text, ERROR_BUFFER_SIZE);
fprintf(stderr, "hdfsConnectAllocated errored.\n%s\n", error_text);
hdfsFreeBuilder(builder);
ShutdownProtobufLibrary_C();
exit(EXIT_FAILURE);
}
hdfsDisconnect(fs);
hdfsFreeBuilder(builder);
// Clean up static data and prevent valgrind memory leaks
ShutdownProtobufLibrary_C();
return 0;
}

View File

@ -19,3 +19,4 @@
add_subdirectory(cat) add_subdirectory(cat)
add_subdirectory(gendirs) add_subdirectory(gendirs)
add_subdirectory(find) add_subdirectory(find)
add_subdirectory(connect_cancel)

View File

@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default LIBHDFSPP_DIR to the default install location. You can override
# it by add -DLIBHDFSPP_DIR=... to your cmake invocation
set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
include_directories( ${LIBHDFSPP_DIR}/include )
link_directories( ${LIBHDFSPP_DIR}/lib )
add_executable(connect_cancel connect_cancel.cc)
target_link_libraries(connect_cancel hdfspp)

View File

@ -0,0 +1,158 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include "hdfspp/hdfspp.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include <google/protobuf/stubs/common.h>
#include <signal.h>
#include <unistd.h>
#include <thread>
#include <iostream>
// Simple example of how to cancel an async connect call.
// Here Control-C (SIGINT) is caught in order to invoke the FS level cancel and
// properly tear down the process. Valgrind should show no leaked memory on exit
// when cancel has been called. URI parsing code is omitted and defaultFs from
// /etc/hadoop/conf or $HADOOP_CONF_DIR is always used.
// Scoped globally to make it simple to reference from the signal handler.
std::shared_ptr<hdfs::FileSystem> fs;
const std::string catch_enter("In signal handler, going to try and cancel FileSystem::Connect.\n");
const std::string catch_cancel("FileSystem::Cancel has been canceled in the signal handler.\n");
const std::string catch_exit("Exiting the signal handler.\n");
// Avoid IO reentrancy issues, see comments in signal handler below.
// It's possible that the write interleaves with another write call,
// but it won't corrupt the stack or heap.
static void sighandler_direct_stdout(const std::string &msg) {
ssize_t res = ::write(1 /*posix stdout FD*/, msg.data(), msg.size());
// In production you'd want to check res, but error handling code will
// need to be fairly application specific if it's going to properly
// avoid reentrant calls to malloc.
(void)res;
}
// Signal handler to make a SIGINT call cancel rather than exit().
static void sig_catch(int val) {
(void)val;
// This is avoiding the tricky bits of signal handling, notably that the
// underlying string manipulation and IO functions used by the the logger
// are unlikely to be reentrant.
//
// Production code could mask out all logging on handler entry and enable
// it again on exit; here we just assume it's "good enough" and some
// (possibly broken) log messages are better than none.
sighandler_direct_stdout(catch_enter);
if(fs) {
// This will invoke the callback immediately with an OperationCanceled status
fs->CancelPendingConnect();
sighandler_direct_stdout(catch_cancel);
}
sighandler_direct_stdout(catch_exit);
}
int main(int arg_token_count, const char **args) {
(void)args;
if(arg_token_count != 1) {
std::cerr << "usage: ./connect_cancel";
google::protobuf::ShutdownProtobufLibrary();
exit(EXIT_FAILURE);
}
// Register signal handle to asynchronously invoke cancel from outside the main thread.
signal(SIGINT, sig_catch);
// Generic setup/config code much like the other examples.
hdfs::Options options;
//Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
hdfs::ConfigurationLoader loader;
//Loading default config files core-site.xml and hdfs-site.xml from the config path
hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>();
//TODO: HDFS-9539 - after this is resolved, valid config will always be returned.
if(config){
//Loading options from the config
options = config->GetOptions();
}
// Start an IoService and some worker threads
std::shared_ptr<hdfs::IoService> service = hdfs::IoService::MakeShared();
if(nullptr == service) {
std::cerr << "Unable to create IoService" << std::endl;
fs.reset();
// Nasty hack to clean up for valgrind since we don't have the C++17 optional<T>::reset method
config = decltype(config)();
google::protobuf::ShutdownProtobufLibrary();
exit(EXIT_FAILURE);
}
unsigned int worker_count = service->InitDefaultWorkers();
if(worker_count < 1) {
std::cerr << "Unable to create IoService worker threads";
fs.reset();
service->Stop();
config = decltype(config)();
google::protobuf::ShutdownProtobufLibrary();
exit(EXIT_FAILURE);
}
// Set up and connect to the FileSystem
fs.reset(hdfs::FileSystem::New(service, "", options));
if(nullptr == fs) {
std::cerr << "Unable to create FileSystem" << std::endl;
fs.reset();
service->Stop();
config = decltype(config)();
google::protobuf::ShutdownProtobufLibrary();
exit(EXIT_FAILURE);
}
hdfs::Status status = fs->ConnectToDefaultFs();
if (!status.ok()) {
if(!options.defaultFS.get_host().empty()){
std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl;
} else {
std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl;
}
fs.reset();
service->Stop();
config = decltype(config)();
google::protobuf::ShutdownProtobufLibrary();
exit(EXIT_FAILURE);
}
fs.reset();
service->Stop();
config = decltype(config)();
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -299,6 +299,7 @@ int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cook
* objects; NULL on error or empty result. * objects; NULL on error or empty result.
* errno is set to non-zero on error or zero on success. * errno is set to non-zero on error or zero on success.
**/ **/
LIBHDFS_EXTERNAL
hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries); hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries);
@ -314,6 +315,7 @@ hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t
* @param name Name to be given to the created snapshot (may be NULL) * @param name Name to be given to the created snapshot (may be NULL)
* @return 0 on success, corresponding errno on failure * @return 0 on success, corresponding errno on failure
**/ **/
LIBHDFS_EXTERNAL
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name); int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name);
/** /**
@ -324,6 +326,7 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name);
* @param name Name of the snapshot to be deleted (must be non-blank) * @param name Name of the snapshot to be deleted (must be non-blank)
* @return 0 on success, corresponding errno on failure * @return 0 on success, corresponding errno on failure
**/ **/
LIBHDFS_EXTERNAL
int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name); int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name);
/** /**
@ -333,6 +336,7 @@ int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name);
* @param path Path to the directory to be made snapshottable (must be non-blank) * @param path Path to the directory to be made snapshottable (must be non-blank)
* @return 0 on success, corresponding errno on failure * @return 0 on success, corresponding errno on failure
**/ **/
LIBHDFS_EXTERNAL
int hdfsAllowSnapshot(hdfsFS fs, const char* path); int hdfsAllowSnapshot(hdfsFS fs, const char* path);
/** /**
@ -342,8 +346,35 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path);
* @param path Path to the directory to be made non-snapshottable (must be non-blank) * @param path Path to the directory to be made non-snapshottable (must be non-blank)
* @return 0 on success, corresponding errno on failure * @return 0 on success, corresponding errno on failure
**/ **/
LIBHDFS_EXTERNAL
int hdfsDisallowSnapshot(hdfsFS fs, const char* path); int hdfsDisallowSnapshot(hdfsFS fs, const char* path);
/**
* Create a FileSystem based on the builder but don't connect
* @param bld Used to populate config options in the same manner as hdfsBuilderConnect.
* Does not free builder.
**/
LIBHDFS_EXTERNAL
hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld);
/**
* Connect a FileSystem created with hdfsAllocateFileSystem
* @param fs A disconnected FS created with hdfsAllocateFileSystem
* @param bld The same or exact copy of the builder used for Allocate, we still need a few fields.
* Does not free builder.
* @return 0 on success, corresponding errno on failure
**/
LIBHDFS_EXTERNAL
int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld);
/**
* Cancel a pending connection on a FileSystem
* @param fs A fs in the process of connecting using hdfsConnectAllocated in another thread.
* @return 0 on success, corresponding errno on failure
**/
LIBHDFS_EXTERNAL
int hdfsCancelPendingConnection(hdfsFS fs);
#ifdef __cplusplus #ifdef __cplusplus
} /* end extern "C" */ } /* end extern "C" */

View File

@ -216,6 +216,12 @@ class FileSystem {
const std::function<void(const Status &, FileSystem *)> &handler) = 0; const std::function<void(const Status &, FileSystem *)> &handler) = 0;
virtual Status ConnectToDefaultFs() = 0; virtual Status ConnectToDefaultFs() = 0;
/**
* Cancels any attempts to connect to the HDFS cluster.
* FileSystem is expected to be destroyed after invoking this.
*/
virtual bool CancelPendingConnect() = 0;
/** /**
* Open a file on HDFS. The call issues an RPC to the NameNode to * Open a file on HDFS. The call issues an RPC to the NameNode to
* gather the locations of all blocks in the file and to return a * gather the locations of all blocks in the file and to return a

View File

@ -332,6 +332,94 @@ hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<st
} }
} }
hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
// Same idea as the first half of doHdfsConnect, but return the wrapped FS before
// connecting.
try {
errno = 0;
std::shared_ptr<IoService> io_service = IoService::MakeShared();
int io_thread_count = bld->config.GetOptions().io_threads_;
if(io_thread_count < 1) {
io_service->InitDefaultWorkers();
} else {
io_service->InitWorkers(io_thread_count);
}
FileSystem *fs = FileSystem::New(io_service, bld->user.value_or(""), bld->config.GetOptions());
if (!fs) {
ReportError(ENODEV, "Could not create FileSystem object");
return nullptr;
}
if (fsEventCallback) {
fs->SetFsEventCallback(fsEventCallback.value());
}
return new hdfs_internal(fs);
} catch (const std::exception &e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr;
}
return nullptr;
}
int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
if(!CheckSystem(fs)) {
return ENODEV;
}
if(!bld) {
ReportError(ENODEV, "No hdfsBuilder object supplied");
return ENODEV;
}
// Get C++ FS to do connect
FileSystem *fsImpl = fs->get_impl();
if(!fsImpl) {
ReportError(ENODEV, "Null FileSystem implementation");
return ENODEV;
}
// Unpack the required bits of the hdfsBuilder
optional<std::string> nn = bld->overrideHost;
optional<tPort> port = bld->overridePort;
optional<std::string> user = bld->user;
// try-catch in case some of the third-party stuff throws
try {
Status status;
if (nn || port) {
if (!port) {
port = kDefaultPort;
}
std::string port_as_string = std::to_string(*port);
status = fsImpl->Connect(nn.value_or(""), port_as_string);
} else {
status = fsImpl->ConnectToDefaultFs();
}
if (!status.ok()) {
Error(status);
return ENODEV;
}
// 0 to indicate a good connection
return 0;
} catch (const std::exception & e) {
ReportException(e);
return ENODEV;
} catch (...) {
ReportCaughtNonException();
return ENODEV;
}
return 0;
}
hdfsFS hdfsConnect(const char *nn, tPort port) { hdfsFS hdfsConnect(const char *nn, tPort port) {
return hdfsConnectAsUser(nn, port, ""); return hdfsConnectAsUser(nn, port, "");
} }
@ -350,6 +438,26 @@ hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
return hdfsConnectAsUser(nn, port, ""); return hdfsConnectAsUser(nn, port, "");
} }
int hdfsCancelPendingConnection(hdfsFS fs) {
// todo: stick an enum in hdfs_internal to check the connect state
if(!CheckSystem(fs)) {
return ENODEV;
}
FileSystem *fsImpl = fs->get_impl();
if(!fsImpl) {
ReportError(ENODEV, "Null FileSystem implementation");
return ENODEV;
}
bool canceled = fsImpl->CancelPendingConnect();
if(canceled) {
return 0;
} else {
return EINTR;
}
}
int hdfsDisconnect(hdfsFS fs) { int hdfsDisconnect(hdfsFS fs) {
try try
{ {

View File

@ -34,24 +34,6 @@ namespace asio_continuation {
using namespace continuation; using namespace continuation;
template <class Stream, class MutableBufferSequence>
class ReadContinuation : public Continuation {
public:
ReadContinuation(std::shared_ptr<Stream>& stream, const MutableBufferSequence &buffer)
: stream_(stream), buffer_(buffer) {}
virtual void Run(const Next &next) override {
auto handler =
[next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); };
asio::async_read(*stream_, buffer_, handler);
}
private:
// prevent construction from raw ptr
ReadContinuation(Stream *stream, MutableBufferSequence &buffer);
std::shared_ptr<Stream> stream_;
MutableBufferSequence buffer_;
};
template <class Stream, class ConstBufferSequence> template <class Stream, class ConstBufferSequence>
class WriteContinuation : public Continuation { class WriteContinuation : public Continuation {
public: public:
@ -71,80 +53,12 @@ private:
ConstBufferSequence buffer_; ConstBufferSequence buffer_;
}; };
template <class Socket, class Iterator>
class ConnectContinuation : public Continuation {
public:
ConnectContinuation(Socket *socket, Iterator begin, Iterator end,
Iterator *connected_endpoint)
: socket_(socket), begin_(begin), end_(end),
connected_endpoint_(connected_endpoint) {}
virtual void Run(const Next &next) override {
auto handler = [this, next](const asio::error_code &ec, Iterator it) {
if (connected_endpoint_) {
*connected_endpoint_ = it;
}
next(ToStatus(ec));
};
asio::async_connect(*socket_, begin_, end_, handler);
}
private:
Socket *socket_;
Iterator begin_;
Iterator end_;
Iterator *connected_endpoint_;
};
template <class OutputIterator>
class ResolveContinuation : public Continuation {
public:
ResolveContinuation(::asio::io_service *io_service, const std::string &server,
const std::string &service, OutputIterator result)
: resolver_(*io_service), query_(server, service), result_(result) {}
virtual void Run(const Next &next) override {
using resolver = ::asio::ip::tcp::resolver;
auto handler =
[this, next](const asio::error_code &ec, resolver::iterator it) {
if (!ec) {
std::copy(it, resolver::iterator(), result_);
}
next(ToStatus(ec));
};
resolver_.async_resolve(query_, handler);
}
private:
::asio::ip::tcp::resolver resolver_;
::asio::ip::tcp::resolver::query query_;
OutputIterator result_;
};
template <class Stream, class ConstBufferSequence> template <class Stream, class ConstBufferSequence>
static inline Continuation *Write(std::shared_ptr<Stream> stream, static inline Continuation *Write(std::shared_ptr<Stream> stream,
const ConstBufferSequence &buffer) { const ConstBufferSequence &buffer) {
return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer); return new WriteContinuation<Stream, ConstBufferSequence>(stream, buffer);
} }
template <class Stream, class MutableBufferSequence>
static inline Continuation *Read(std::shared_ptr<Stream> stream,
const MutableBufferSequence &buffer) {
return new ReadContinuation<Stream, MutableBufferSequence>(stream, buffer);
}
template <class Socket, class Iterator>
static inline Continuation *Connect(Socket *socket, Iterator begin,
Iterator end) {
return new ConnectContinuation<Socket, Iterator>(socket, begin, end, nullptr);
}
template <class OutputIterator>
static inline Continuation *
Resolve(::asio::io_service *io_service, const std::string &server,
const std::string &service, OutputIterator result) {
return new ResolveContinuation<OutputIterator>(io_service, server, service, result);
}
} }
} }

View File

@ -131,18 +131,6 @@ template <class State> inline void Pipeline<State>::Run(UserHandler &&handler) {
Schedule(Status::OK()); Schedule(Status::OK());
} }
template <class Handler> class BindContinuation : public Continuation {
public:
BindContinuation(const Handler &handler) : handler_(handler) {}
virtual void Run(const Next &next) override { handler_(next); }
private:
Handler handler_;
};
template <class Handler> static inline Continuation *Bind(const Handler &handler) {
return new BindContinuation<Handler>(handler);
}
} }
} }

View File

@ -71,6 +71,17 @@ std::vector<std::string> SplitOnComma(const std::string &s, bool include_empty_s
return res; return res;
} }
std::string RemoveSpaces(const std::string &str) {
std::string res;
for(unsigned int i=0; i<str.size(); i++) {
char curr = str[i];
if(curr != ' ') {
res += curr;
}
}
return res;
}
// Prepend hdfs:// to string if there isn't already a scheme // Prepend hdfs:// to string if there isn't already a scheme
// Converts unset optional into empty string // Converts unset optional into empty string
std::string PrependHdfsScheme(optional<std::string> str) { std::string PrependHdfsScheme(optional<std::string> str) {
@ -92,6 +103,8 @@ struct ha_parse_error : public std::exception {
}; };
std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string &nameservice) { std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string &nameservice) {
LOG_TRACE(kRPC, << "HDFSConfiguration@" << this << "::LookupNameService( nameservice=" << nameservice<< " ) called");
std::vector<NamenodeInfo> namenodes; std::vector<NamenodeInfo> namenodes;
try { try {
// Find namenodes that belong to nameservice // Find namenodes that belong to nameservice
@ -104,8 +117,10 @@ std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string
else else
throw ha_parse_error("unable to find " + service_nodes); throw ha_parse_error("unable to find " + service_nodes);
for(auto it=namenode_ids.begin(); it != namenode_ids.end(); it++) for(unsigned int i=0; i<namenode_ids.size(); i++) {
LOG_INFO(kRPC, << "Namenode: " << *it); namenode_ids[i] = RemoveSpaces(namenode_ids[i]);
LOG_INFO(kRPC, << "Namenode: " << namenode_ids[i]);
}
} }
// should this error if we only find 1 NN? // should this error if we only find 1 NN?
@ -123,7 +138,11 @@ std::vector<NamenodeInfo> HdfsConfiguration::LookupNameService(const std::string
} }
URI uri = node_uri.value(); URI uri = node_uri.value();
if(uri.str() == "") {
LOG_WARN(kRPC, << "Attempted to read info for nameservice " << nameservice << " node " << dom_node_name << " but didn't find anything.")
} else {
LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString()); LOG_INFO(kRPC, << "Read the following HA Namenode URI from config" << uri.GetDebugString());
}
NamenodeInfo node(nameservice, *node_id, uri); NamenodeInfo node(nameservice, *node_id, uri);
namenodes.push_back(node); namenodes.push_back(node);

View File

@ -18,7 +18,7 @@
#include "namenode_info.h" #include "namenode_info.h"
#include "common/continuation/asio.h" #include "common/util.h"
#include "common/logging.h" #include "common/logging.h"
#include <sstream> #include <sstream>
@ -71,62 +71,107 @@ bool ResolveInPlace(::asio::io_service *ioservice, ResolvedNamenodeInfo &info) {
return true; return true;
} }
typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
// RAII wrapper
class ScopedResolver {
private:
::asio::io_service *io_service_;
std::string host_;
std::string port_;
::asio::ip::tcp::resolver::query query_;
::asio::ip::tcp::resolver resolver_;
endpoint_vector endpoints_;
// Caller blocks on access if resolution isn't finished
std::shared_ptr<std::promise<Status>> result_status_;
public:
ScopedResolver(::asio::io_service *service, const std::string &host, const std::string &port) :
io_service_(service), host_(host), port_(port), query_(host, port), resolver_(*io_service_)
{
if(!io_service_)
LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << " passed nullptr to io_service");
}
~ScopedResolver() {
resolver_.cancel();
}
bool BeginAsyncResolve() {
// result_status_ would only exist if this was previously called. Invalid state.
if(result_status_) {
LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: may only be called once per instance");
return false;
} else if(!io_service_) {
LOG_ERROR(kAsyncRuntime, << "ScopedResolver@" << this << "::BeginAsyncResolve invalid call: null io_service");
return false;
}
// Now set up the promise, set it in async_resolve's callback
result_status_ = std::make_shared<std::promise<Status>>();
// Callback to pull a copy of endpoints out of resolver and set promise
auto callback = [this](const asio::error_code &ec, ::asio::ip::tcp::resolver::iterator out) {
if(!ec) {
std::copy(out, ::asio::ip::tcp::resolver::iterator(), std::back_inserter(endpoints_));
}
result_status_->set_value( ToStatus(ec) );
};
resolver_.async_resolve(query_, callback);
return true;
}
Status Join() {
if(!result_status_) {
std::ostringstream errmsg;
errmsg << "ScopedResolver@" << this << "Join invalid call: promise never set";
return Status::InvalidArgument(errmsg.str().c_str());
}
std::future<Status> future_result = result_status_->get_future();
Status res = future_result.get();
return res;
}
endpoint_vector GetEndpoints() {
// Explicitly return by value to decouple lifecycles.
return endpoints_;
}
};
std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) { std::vector<ResolvedNamenodeInfo> BulkResolve(::asio::io_service *ioservice, const std::vector<NamenodeInfo> &nodes) {
using namespace asio_continuation; std::vector< std::unique_ptr<ScopedResolver> > resolvers;
resolvers.reserve(nodes.size());
typedef std::vector<asio::ip::tcp::endpoint> endpoint_vector;
typedef Pipeline<endpoint_vector> resolve_pipeline_t;
std::vector<std::pair<resolve_pipeline_t*, std::shared_ptr<std::promise<Status>>>> pipelines;
pipelines.reserve(nodes.size());
std::vector<ResolvedNamenodeInfo> resolved_info; std::vector<ResolvedNamenodeInfo> resolved_info;
// This must never reallocate once async ops begin
resolved_info.reserve(nodes.size()); resolved_info.reserve(nodes.size());
for(unsigned int i=0; i<nodes.size(); i++) { for(unsigned int i=0; i<nodes.size(); i++) {
std::string host = nodes[i].get_host(); std::string host = nodes[i].get_host();
std::string port = nodes[i].get_port(); std::string port = nodes[i].get_port();
ResolvedNamenodeInfo resolved; resolvers.emplace_back(new ScopedResolver(ioservice, host, port));
resolved = nodes[i]; resolvers[i]->BeginAsyncResolve();
resolved_info.push_back(resolved);
// build the pipeline
resolve_pipeline_t *pipeline = resolve_pipeline_t::Create();
auto resolve_step = Resolve(ioservice, host, port, std::back_inserter(pipeline->state()));
pipeline->Push(resolve_step);
// make a status associated with current pipeline
std::shared_ptr<std::promise<Status>> active_stat = std::make_shared<std::promise<Status>>();
pipelines.push_back(std::make_pair(pipeline, active_stat));
pipeline->Run([i,active_stat, &resolved_info](const Status &s, const endpoint_vector &ends){
resolved_info[i].endpoints = ends;
active_stat->set_value(s);
});
} }
// Join all async operations // Join all async operations
std::vector<ResolvedNamenodeInfo> return_set; for(unsigned int i=0; i < resolvers.size(); i++) {
for(unsigned int i=0; i<pipelines.size();i++) { Status asyncReturnStatus = resolvers[i]->Join();
std::shared_ptr<std::promise<Status>> promise = pipelines[i].second;
std::future<Status> future = promise->get_future(); ResolvedNamenodeInfo info;
Status stat = future.get(); info = nodes[i];
// Clear endpoints if we hit an error if(asyncReturnStatus.ok()) {
if(!stat.ok()) { // Copy out endpoints if things went well
LOG_WARN(kRPC, << "Unable to resolve endpoints for " << nodes[i].uri.str()); info.endpoints = resolvers[i]->GetEndpoints();
resolved_info[i].endpoints.clear(); } else {
} LOG_ERROR(kAsyncRuntime, << "Unabled to resolve endpoints for host: " << nodes[i].get_host()
<< " port: " << nodes[i].get_port());
} }
resolved_info.push_back(info);
}
return resolved_info; return resolved_info;
} }
} }

View File

@ -19,6 +19,7 @@
#define LIB_COMMON_UTIL_H_ #define LIB_COMMON_UTIL_H_
#include "hdfspp/status.h" #include "hdfspp/status.h"
#include "common/logging.h"
#include <sstream> #include <sstream>
#include <mutex> #include <mutex>
@ -112,6 +113,73 @@ inline asio::ip::tcp::socket *get_asio_socket_ptr<asio::ip::tcp::socket>
//Check if the high bit is set //Check if the high bit is set
bool IsHighBitSet(uint64_t num); bool IsHighBitSet(uint64_t num);
// Provide a way to do an atomic swap on a callback.
// SetCallback, AtomicSwapCallback, and GetCallback can only be called once each.
// AtomicSwapCallback and GetCallback must only be called after SetCallback.
//
// We can't throw on error, and since the callback is templated it's tricky to
// generate generic dummy callbacks. Complain loudly in the log and get good
// test coverage. It shouldn't be too hard to avoid invalid states.
template <typename CallbackType>
class SwappableCallbackHolder {
private:
std::mutex state_lock_;
CallbackType callback_;
bool callback_set_ = false;
bool callback_swapped_ = false;
bool callback_accessed_ = false;
public:
bool IsCallbackSet() {
mutex_guard swap_lock(state_lock_);
return callback_set_;
}
bool IsCallbackAccessed() {
mutex_guard swap_lock(state_lock_);
return callback_accessed_;
}
bool SetCallback(const CallbackType& callback) {
mutex_guard swap_lock(state_lock_);
if(callback_set_ || callback_swapped_ || callback_accessed_) {
LOG_ERROR(kAsyncRuntime, << "SetCallback violates access invariants.")
return false;
}
callback_ = callback;
callback_set_ = true;
return true;
}
CallbackType AtomicSwapCallback(const CallbackType& replacement, bool& swapped) {
mutex_guard swap_lock(state_lock_);
if(!callback_set_ || callback_swapped_) {
LOG_ERROR(kAsyncRuntime, << "AtomicSwapCallback violates access invariants.")
swapped = false;
} else if (callback_accessed_) {
// Common case where callback has been invoked but caller may not know
LOG_DEBUG(kAsyncRuntime, << "AtomicSwapCallback called after callback has been accessed");
return false;
}
CallbackType old = callback_;
callback_ = replacement;
callback_swapped_ = true;
swapped = true;
return old;
}
CallbackType GetCallback() {
mutex_guard swap_lock(state_lock_);
if(!callback_set_ || callback_accessed_) {
LOG_ERROR(kAsyncRuntime, << "GetCallback violates access invariants.")
}
callback_accessed_ = true;
return callback_;
}
};
} }
#endif #endif

View File

@ -202,6 +202,7 @@ void FileSystemImpl::Connect(const std::string &server,
LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR
<< ", server=" << server << ", service=" << ", server=" << server << ", service="
<< service << ") called"); << service << ") called");
connect_callback_.SetCallback(handler);
/* IoService::New can return nullptr */ /* IoService::New can return nullptr */
if (!io_service_) { if (!io_service_) {
@ -236,8 +237,8 @@ void FileSystemImpl::Connect(const std::string &server,
} }
nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this, handler](const Status & s) { nn_.Connect(cluster_name_, /*server, service*/ resolved_namenodes, [this](const Status & s) {
handler(s, this); connect_callback_.GetCallback()(s, this);
}); });
} }
@ -286,6 +287,43 @@ int FileSystemImpl::WorkerThreadCount() {
} }
} }
bool FileSystemImpl::CancelPendingConnect() {
if(!connect_callback_.IsCallbackSet()) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called before Connect started");
return false;
}
if(connect_callback_.IsCallbackAccessed()) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl@" << this << "::CancelPendingConnect called after Connect completed");
return false;
}
// First invoke callback, then do proper teardown in RpcEngine and RpcConnection
ConnectCallback noop_callback = [](const Status &stat, FileSystem *fs) {
LOG_DEBUG(kFileSystem, << "Dummy callback invoked for canceled FileSystem@" << fs << "::Connect with status: " << stat.ToString());
};
bool callback_swapped = false;
ConnectCallback original_callback = connect_callback_.AtomicSwapCallback(noop_callback, callback_swapped);
if(callback_swapped) {
// Take original callback and invoke it as if it was canceled.
LOG_DEBUG(kFileSystem, << "Swapped in dummy callback. Invoking connect callback with canceled status.");
std::function<void(void)> wrapped_callback = [original_callback, this](){
// handling code expected to check status before dereferenceing 'this'
original_callback(Status::Canceled(), this);
};
io_service_->PostTask(wrapped_callback);
} else {
LOG_INFO(kFileSystem, << "Unable to cancel FileSystem::Connect. It hasn't been invoked yet or may have already completed.")
return false;
}
// Now push cancel down to clean up where possible and make sure the RpcEngine
// won't try to do retries in the background. The rest of the memory cleanup
// happens when this FileSystem is deleted by the user.
return nn_.CancelPendingConnect();
}
void FileSystemImpl::Open( void FileSystemImpl::Open(
const std::string &path, const std::string &path,
const std::function<void(const Status &, FileHandle *)> &handler) { const std::function<void(const Status &, FileHandle *)> &handler) {

View File

@ -46,6 +46,8 @@ namespace hdfs {
class FileSystemImpl : public FileSystem { class FileSystemImpl : public FileSystem {
public: public:
MEMCHECKED_CLASS(FileSystemImpl) MEMCHECKED_CLASS(FileSystemImpl)
typedef std::function<void(const Status &, FileSystem *)> ConnectCallback;
explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options); explicit FileSystemImpl(IoService *&io_service, const std::string& user_name, const Options &options);
explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options); explicit FileSystemImpl(std::shared_ptr<IoService>, const std::string& user_name, const Options &options);
~FileSystemImpl() override; ~FileSystemImpl() override;
@ -61,6 +63,9 @@ public:
const std::function<void(const Status &, FileSystem *)> &handler) override; const std::function<void(const Status &, FileSystem *)> &handler) override;
virtual Status ConnectToDefaultFs() override; virtual Status ConnectToDefaultFs() override;
/* Cancel connection if FS is in the middle of one */
virtual bool CancelPendingConnect() override;
virtual void Open(const std::string &path, virtual void Open(const std::string &path,
const std::function<void(const Status &, FileHandle *)> const std::function<void(const Status &, FileHandle *)>
&handler) override; &handler) override;
@ -197,6 +202,9 @@ private:
NameNodeOperations nn_; NameNodeOperations nn_;
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
// Keep connect callback around in case it needs to be canceled
SwappableCallbackHolder<ConnectCallback> connect_callback_;
/** /**
* Runtime event monitoring handlers. * Runtime event monitoring handlers.
* Note: This is really handy to have for advanced usage but * Note: This is really handy to have for advanced usage but

View File

@ -45,6 +45,10 @@ void NameNodeOperations::Connect(const std::string &cluster_name,
engine_.Connect(cluster_name, servers, handler); engine_.Connect(cluster_name, servers, handler);
} }
bool NameNodeOperations::CancelPendingConnect() {
return engine_.CancelPendingConnect();
}
void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, void NameNodeOperations::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler) std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler)
{ {

View File

@ -52,6 +52,8 @@ public:
const std::vector<ResolvedNamenodeInfo> &servers, const std::vector<ResolvedNamenodeInfo> &servers,
std::function<void(const Status &)> &&handler); std::function<void(const Status &)> &&handler);
bool CancelPendingConnect();
void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler); std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);

View File

@ -150,7 +150,8 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
protocol_version_(protocol_version), protocol_version_(protocol_version),
call_id_(0), call_id_(0),
retry_timer(*io_service), retry_timer(*io_service),
event_handlers_(std::make_shared<LibhdfsEvents>()) event_handlers_(std::make_shared<LibhdfsEvents>()),
connect_canceled_(false)
{ {
LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called"); LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
@ -182,6 +183,16 @@ void RpcEngine::Connect(const std::string &cluster_name,
conn_->Connect(last_endpoints_, auth_info_, handler); conn_->Connect(last_endpoints_, auth_info_, handler);
} }
bool RpcEngine::CancelPendingConnect() {
if(connect_canceled_) {
LOG_DEBUG(kRPC, << "RpcEngine@" << this << "::CancelPendingConnect called more than once");
return false;
}
connect_canceled_ = true;
return true;
}
void RpcEngine::Shutdown() { void RpcEngine::Shutdown() {
LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called"); LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
io_service_->post([this]() { io_service_->post([this]() {
@ -250,6 +261,14 @@ void RpcEngine::AsyncRpc(
LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called"); LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called");
// In case user-side code isn't checking the status of Connect before doing RPC
if(connect_canceled_) {
io_service_->post(
[handler](){ handler(Status::Canceled()); }
);
return;
}
if (!conn_) { if (!conn_) {
conn_ = InitializeConnection(); conn_ = InitializeConnection();
conn_->ConnectAndFlush(last_endpoints_); conn_->ConnectAndFlush(last_endpoints_);

View File

@ -353,6 +353,8 @@ class RpcEngine : public LockFreeRpcEngine {
const std::vector<ResolvedNamenodeInfo> servers, const std::vector<ResolvedNamenodeInfo> servers,
RpcCallback &handler); RpcCallback &handler);
bool CancelPendingConnect();
void AsyncRpc(const std::string &method_name, void AsyncRpc(const std::string &method_name,
const ::google::protobuf::MessageLite *req, const ::google::protobuf::MessageLite *req,
const std::shared_ptr<::google::protobuf::MessageLite> &resp, const std::shared_ptr<::google::protobuf::MessageLite> &resp,
@ -418,6 +420,9 @@ private:
std::mutex engine_state_lock_; std::mutex engine_state_lock_;
// Once Connect has been canceled there is no going back
bool connect_canceled_;
// Keep endpoint info for all HA connections, a non-null ptr indicates // Keep endpoint info for all HA connections, a non-null ptr indicates
// that HA info was found in the configuation. // that HA info was found in the configuation.
std::unique_ptr<HANamenodeTracker> ha_persisted_info_; std::unique_ptr<HANamenodeTracker> ha_persisted_info_;