HDFS-11106: libhdfs++: Some refactoring to better organize files. Contributed by James Clampffer.

This commit is contained in:
James 2016-11-29 18:09:53 -05:00 committed by James Clampffer
parent 2524afbc20
commit dd7837c429
12 changed files with 629 additions and 572 deletions

View File

@ -19,6 +19,6 @@ if(NEED_LINK_DL)
set(LIB_DL dl)
endif()
add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc)
add_library(common_obj OBJECT status.cc sasl_digest_md5.cc hdfs_ioservice.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc libhdfs_events_impl.cc auth_info.cc namenode_info.cc)
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
target_link_libraries(common ${LIB_DL})

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "util.h"
#include <array>
#include <functional>
#include <algorithm>
namespace hdfs {
std::string Base64Encode(const std::string &src) {
//encoded size is (sizeof(buf) + 2) / 3 * 4
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
std::string ret;
int i = 0;
int j = 0;
unsigned char char_array_3[3];
unsigned char char_array_4[4];
unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]);
unsigned int in_len = src.size();
while (in_len--) {
char_array_3[i++] = *(bytes_to_encode++);
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for(i = 0; (i <4) ; i++)
ret += base64_chars[char_array_4[i]];
i = 0;
}
}
if (i) {
for(j = i; j < 3; j++)
char_array_3[j] = '\0';
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for (j = 0; (j < i + 1); j++)
ret += base64_chars[char_array_4[j]];
while((i++ < 3))
ret += '=';
}
return ret;
}
}

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "hdfs_public_api.h"
#include "hdfs_ioservice.h"
#include "common/logging.h"
@ -27,6 +27,10 @@ IoService::~IoService() {}
IoService *IoService::New() { return new IoServiceImpl(); }
void IoServiceImpl::Run() {
// The IoService executes callbacks provided by library users in the context of worker threads,
// there is no way of preventing those callbacks from throwing but we can at least prevent them
// from escaping this library and crashing the process.
// As recommended in http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
asio::io_service::work work(io_service_);
for(;;)

View File

@ -16,8 +16,8 @@
* limitations under the License.
*/
#ifndef COMMON_HDFS_PUBLIC_API_H_
#define COMMON_HDFS_PUBLIC_API_H_
#ifndef COMMON_HDFS_IOSERVICE_H_
#define COMMON_HDFS_IOSERVICE_H_
#include "hdfspp/hdfspp.h"
@ -25,6 +25,13 @@
namespace hdfs {
/*
* A thin wrapper over the asio::io_service.
* -In the future this could own the worker threads that execute io tasks which
* makes it easier to share IoServices between FileSystems. See HDFS-10796 for
* rationale.
*/
class IoServiceImpl : public IoService {
public:
virtual void Run() override;

View File

@ -20,12 +20,14 @@
#include "common/util_c.h"
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <exception>
#include <sstream>
#include <iostream>
#include <iomanip>
#include <thread>
namespace hdfs {
bool ReadDelimitedPBMessage(::google::protobuf::io::CodedInputStream *in,
@ -73,6 +75,53 @@ std::string GetRandomClientName() {
return oss.str();
}
std::string Base64Encode(const std::string &src) {
//encoded size is (sizeof(buf) + 2) / 3 * 4
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
std::string ret;
int i = 0;
int j = 0;
unsigned char char_array_3[3];
unsigned char char_array_4[4];
unsigned const char *bytes_to_encode = reinterpret_cast<unsigned const char *>(&src[i]);
unsigned int in_len = src.size();
while (in_len--) {
char_array_3[i++] = *(bytes_to_encode++);
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for(i = 0; (i <4) ; i++)
ret += base64_chars[char_array_4[i]];
i = 0;
}
}
if (i) {
for(j = i; j < 3; j++)
char_array_3[j] = '\0';
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for (j = 0; (j < i + 1); j++)
ret += base64_chars[char_array_4[j]];
while((i++ < 3))
ret += '=';
}
return ret;
}
std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
std::string err;
if(sock && sock->is_open()) {
@ -117,3 +166,5 @@ bool IsHighBitSet(uint64_t num) {
void ShutdownProtobufLibrary_C() {
google::protobuf::ShutdownProtobufLibrary();
}

View File

@ -18,7 +18,7 @@
#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_
#include "common/hdfs_public_api.h"
#include "common/hdfs_ioservice.h"
#include "common/async_stream.h"
#include "ClientNamenodeProtocol.pb.h"
#include "common/libhdfs_events_impl.h"

View File

@ -16,6 +16,6 @@
# limitations under the License.
#
add_library(fs_obj OBJECT filesystem.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc)
add_library(fs_obj OBJECT filesystem.cc filesystem_sync.cc filehandle.cc bad_datanode_tracker.cc namenode_operations.cc)
add_dependencies(fs_obj proto)
add_library(fs $<TARGET_OBJECTS:fs_obj>)

View File

@ -18,7 +18,7 @@
#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_
#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_
#include "common/hdfs_public_api.h"
#include "common/hdfs_ioservice.h"
#include "common/async_stream.h"
#include "common/cancel_tracker.h"
#include "common/libhdfs_events_impl.h"

View File

@ -32,8 +32,7 @@
namespace hdfs {
static const char kNamenodeProtocol[] =
"org.apache.hadoop.hdfs.protocol.ClientProtocol";
static const char kNamenodeProtocol[] = "org.apache.hadoop.hdfs.protocol.ClientProtocol";
static const int kNamenodeProtocolVersion = 1;
using ::asio::ip::tcp;
@ -203,26 +202,6 @@ void FileSystemImpl::Connect(const std::string &server,
});
}
Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR
<< ", server=" << server << ", service=" << service << ") called");
/* synchronized */
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callback = [stat](const Status &s, FileSystem *fs) {
(void)fs;
stat->set_value(s);
};
Connect(server, service, callback);
/* block until promise is set */
auto s = future.get();
return s;
}
void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &, FileSystem *)> &handler) {
std::string scheme = options_.defaultFS.get_scheme();
@ -248,25 +227,6 @@ void FileSystemImpl::ConnectToDefaultFs(const std::function<void(const Status &,
Connect(host, port_as_string, handler);
}
Status FileSystemImpl::ConnectToDefaultFs() {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callback = [stat](const Status &s, FileSystem *fs) {
(void)fs;
stat->set_value(s);
};
ConnectToDefaultFs(callback);
/* block until promise is set */
auto s = future.get();
return s;
}
int FileSystemImpl::AddWorkerThread() {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
<< FMT_THIS_ADDR << ") called."
@ -297,38 +257,6 @@ void FileSystemImpl::Open(
});
}
Status FileSystemImpl::Open(const std::string &path,
FileHandle **handle) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
/* wrap async FileSystem::Open with promise to make it a blocking call */
auto h = [callstate](const Status &s, FileHandle *is) {
callstate->set_value(std::make_tuple(s, is));
};
Open(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
FileHandle *file_handle = std::get<1>(returnstate);
if (!stat.ok()) {
delete file_handle;
return stat;
}
if (!file_handle) {
return stat;
}
*handle = file_handle;
return stat;
}
BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
{
@ -411,39 +339,6 @@ void FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset
nn_.GetBlockLocations(path, offset, length, conversion);
}
Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::shared_ptr<FileBlockLocation> * fileBlockLocations)
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
if (!fileBlockLocations)
return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>();
std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future());
/* wrap async call with promise/future to make it blocking */
auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) {
callstate->set_value(std::make_tuple(s,blockInfo));
};
GetBlockLocations(path, offset, length, callback);
/* wait for async to finish */
auto returnstate = future.get();
auto stat = std::get<0>(returnstate);
if (!stat.ok()) {
return stat;
}
*fileBlockLocations = std::get<1>(returnstate);
return stat;
}
void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
const std::function<void(const Status &, const uint64_t &)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetPreferredBlockSize("
@ -453,33 +348,6 @@ void FileSystemImpl::GetPreferredBlockSize(const std::string &path,
nn_.GetPreferredBlockSize(path, handler);
}
Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>();
std::future<std::tuple<Status, uint64_t>> future(callstate->get_future());
/* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */
auto h = [callstate](const Status &s, const uint64_t & bsize) {
callstate->set_value(std::make_tuple(s, bsize));
};
GetPreferredBlockSize(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
uint64_t size = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
block_size = size;
return stat;
}
void FileSystemImpl::SetReplication(const std::string & path, int16_t replication, std::function<void(const Status &)> handler) {
LOG_DEBUG(kFileSystem,
@ -499,27 +367,6 @@ void FileSystemImpl::SetReplication(const std::string & path, int16_t replicatio
nn_.SetReplication(path, replication, handler);
}
Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
", replication=" << replication << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetReplication with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetReplication(path, replication, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime,
std::function<void(const Status &)> handler) {
@ -535,27 +382,6 @@ void FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t
nn_.SetTimes(path, mtime, atime, handler);
}
Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
", mtime=" << mtime << ", atime=" << atime << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetTimes with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetTimes(path, mtime, atime, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::GetFileInfo(
const std::string &path,
@ -567,34 +393,6 @@ void FileSystemImpl::GetFileInfo(
nn_.GetFileInfo(path, handler);
}
Status FileSystemImpl::GetFileInfo(const std::string &path,
StatInfo & stat_info) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>();
std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
/* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */
auto h = [callstate](const Status &s, const StatInfo &si) {
callstate->set_value(std::make_tuple(s, si));
};
GetFileInfo(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
StatInfo info = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
stat_info = info;
return stat;
}
void FileSystemImpl::GetFsStats(
const std::function<void(const Status &, const FsInfo &)> &handler) {
@ -604,32 +402,6 @@ void FileSystemImpl::GetFsStats(
nn_.GetFsStats(handler);
}
Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>();
std::future<std::tuple<Status, FsInfo>> future(callstate->get_future());
/* wrap async FileSystem::GetFsStats with promise to make it a blocking call */
auto h = [callstate](const Status &s, const FsInfo &si) {
callstate->set_value(std::make_tuple(s, si));
};
GetFsStats(h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
FsInfo info = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
fs_info = info;
return stat;
}
/**
* Helper function for recursive GetListing calls.
@ -666,43 +438,6 @@ void FileSystemImpl::GetListing(
nn_.GetListing(path, callback);
}
Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
if (!stat_infos) {
return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL");
}
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::GetListing with promise to make it a blocking call.
*
Keep requesting more until we get the entire listing, and don't set the promise
* until we have the entire listing.
*/
auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool {
if (!si.empty()) {
stat_infos->insert(stat_infos->end(), si.begin(), si.end());
}
bool done = !s.ok() || !has_more;
if (done) {
callstate->set_value(s);
return false;
}
return true;
};
GetListing(path, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent,
std::function<void(const Status &)> handler) {
@ -724,27 +459,6 @@ void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool
nn_.Mkdirs(path, permissions, createparent, handler);
}
Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
", permissions=" << permissions << ", createparent=" << createparent << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::Mkdirs with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
Mkdirs(path, permissions, createparent, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::Delete(const std::string &path, bool recursive,
const std::function<void(const Status &)> &handler) {
@ -759,26 +473,6 @@ void FileSystemImpl::Delete(const std::string &path, bool recursive,
nn_.Delete(path, recursive, handler);
}
Status FileSystemImpl::Delete(const std::string &path, bool recursive) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::Delete with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
Delete(path, recursive, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath,
const std::function<void(const Status &)> &handler) {
@ -798,26 +492,6 @@ void FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPa
nn_.Rename(oldPath, newPath, handler);
}
Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::Rename with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
Rename(oldPath, newPath, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::SetPermission(const std::string & path,
uint16_t permissions, const std::function<void(const Status &)> &handler) {
@ -837,25 +511,6 @@ void FileSystemImpl::SetPermission(const std::string & path,
nn_.SetPermission(path, permissions, handler);
}
Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetPermission with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetPermission(path, permissions, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
void FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
const std::string & groupname, const std::function<void(const Status &)> &handler) {
@ -870,25 +525,6 @@ void FileSystemImpl::SetOwner(const std::string & path, const std::string & user
nn_.SetOwner(path, username, groupname, handler);
}
Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
const std::string & groupname) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetOwner with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetOwner(path, username, groupname, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
/**
* Helper function for recursive Find calls.
@ -1016,50 +652,6 @@ void FileSystemImpl::Find(
nn_.GetListing("/", callback);
}
Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find("
<< FMT_THIS_ADDR << ", path="
<< path << ", name="
<< name << ") called");
if (!stat_infos) {
return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL");
}
// In this case, we're going to have the async code populate stat_infos.
std::promise<void> promise = std::promise<void>();
std::future<void> future(promise.get_future());
Status status = Status::OK();
/**
* Keep requesting more until we get the entire listing. Set the promise
* when we have the entire listing to stop.
*
* Find guarantees that the handler will only be called once at a time,
* so we do not need any locking here
*/
auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool {
if (!si.empty()) {
stat_infos->insert(stat_infos->end(), si.begin(), si.end());
}
if (!s.ok() && status.ok()){
//We make sure we set 'status' only on the first error.
status = s;
}
if (!has_more_results) {
promise.set_value();
return false;
}
return true;
};
Find(path, name, maxdepth, h);
/* block until promise is set */
future.get();
return status;
}
void FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name,
@ -1075,27 +667,6 @@ void FileSystemImpl::CreateSnapshot(const std::string &path,
nn_.CreateSnapshot(path, name, handler);
}
Status FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
CreateSnapshot(path, name, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::DeleteSnapshot(const std::string &path,
const std::string &name,
@ -1115,27 +686,6 @@ void FileSystemImpl::DeleteSnapshot(const std::string &path,
nn_.DeleteSnapshot(path, name, handler);
}
Status FileSystemImpl::DeleteSnapshot(const std::string &path,
const std::string &name) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
DeleteSnapshot(path, name, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::AllowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) {
@ -1150,26 +700,6 @@ void FileSystemImpl::AllowSnapshot(const std::string &path,
nn_.AllowSnapshot(path, handler);
}
Status FileSystemImpl::AllowSnapshot(const std::string &path) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
AllowSnapshot(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::DisallowSnapshot(const std::string &path,
const std::function<void(const Status &)> &handler) {
@ -1184,26 +714,6 @@ void FileSystemImpl::DisallowSnapshot(const std::string &path,
nn_.DisallowSnapshot(path, handler);
}
Status FileSystemImpl::DisallowSnapshot(const std::string &path) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
DisallowSnapshot(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
// It is far too easy to destroy the filesystem (and thus the threadpool)

View File

@ -0,0 +1,555 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "filesystem.h"
#include <future>
#include <tuple>
#define FMT_THIS_ADDR "this=" << (void*)this
// Note: This is just a place to hold boilerplate async to sync shim code,
// place actual filesystem logic in filesystem.cc
//
//
// Shim pattern pseudocode
//
// Status MySynchronizedMethod(method_args):
// let stat = a promise<Status> wrapped in a shared_ptr
//
// Create a lambda that captures stat and any other variables that need to
// be set based on the async operation. When invoked set variables with the
// arguments passed (possibly do some translation), then set stat to indicate
// the return status of the async call.
//
// invoke MyAsyncMethod(method_args, handler_lambda)
//
// block until stat value has been set while async work takes place
//
// return stat
namespace hdfs {
Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR
<< ", server=" << server << ", service=" << service << ") called");
/* synchronized */
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callback = [stat](const Status &s, FileSystem *fs) {
(void)fs;
stat->set_value(s);
};
Connect(server, service, callback);
/* block until promise is set */
auto s = future.get();
return s;
}
Status FileSystemImpl::ConnectToDefaultFs() {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callback = [stat](const Status &s, FileSystem *fs) {
(void)fs;
stat->set_value(s);
};
ConnectToDefaultFs(callback);
/* block until promise is set */
auto s = future.get();
return s;
}
Status FileSystemImpl::Open(const std::string &path,
FileHandle **handle) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Open("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
/* wrap async FileSystem::Open with promise to make it a blocking call */
auto h = [callstate](const Status &s, FileHandle *is) {
callstate->set_value(std::make_tuple(s, is));
};
Open(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
FileHandle *file_handle = std::get<1>(returnstate);
if (!stat.ok()) {
delete file_handle;
return stat;
}
if (!file_handle) {
return stat;
}
*handle = file_handle;
return stat;
}
Status FileSystemImpl::GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length,
std::shared_ptr<FileBlockLocation> * fileBlockLocations)
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
if (!fileBlockLocations)
return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>();
std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future());
/* wrap async call with promise/future to make it blocking */
auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) {
callstate->set_value(std::make_tuple(s,blockInfo));
};
GetBlockLocations(path, offset, length, callback);
/* wait for async to finish */
auto returnstate = future.get();
auto stat = std::get<0>(returnstate);
if (!stat.ok()) {
return stat;
}
*fileBlockLocations = std::get<1>(returnstate);
return stat;
}
Status FileSystemImpl::GetPreferredBlockSize(const std::string &path, uint64_t & block_size) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetPreferredBlockSize("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, uint64_t>>>();
std::future<std::tuple<Status, uint64_t>> future(callstate->get_future());
/* wrap async FileSystem::GetPreferredBlockSize with promise to make it a blocking call */
auto h = [callstate](const Status &s, const uint64_t & bsize) {
callstate->set_value(std::make_tuple(s, bsize));
};
GetPreferredBlockSize(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
uint64_t size = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
block_size = size;
return stat;
}
Status FileSystemImpl::SetReplication(const std::string & path, int16_t replication) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetReplication(" << FMT_THIS_ADDR << ", path=" << path <<
", replication=" << replication << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetReplication with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetReplication(path, replication, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::SetTimes(const std::string & path, uint64_t mtime, uint64_t atime) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetTimes(" << FMT_THIS_ADDR << ", path=" << path <<
", mtime=" << mtime << ", atime=" << atime << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetTimes with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetTimes(path, mtime, atime, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::GetFileInfo(const std::string &path,
StatInfo & stat_info) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>();
std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
/* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */
auto h = [callstate](const Status &s, const StatInfo &si) {
callstate->set_value(std::make_tuple(s, si));
};
GetFileInfo(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
StatInfo info = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
stat_info = info;
return stat;
}
Status FileSystemImpl::GetFsStats(FsInfo & fs_info) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]GetFsStats(" << FMT_THIS_ADDR << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, FsInfo>>>();
std::future<std::tuple<Status, FsInfo>> future(callstate->get_future());
/* wrap async FileSystem::GetFsStats with promise to make it a blocking call */
auto h = [callstate](const Status &s, const FsInfo &si) {
callstate->set_value(std::make_tuple(s, si));
};
GetFsStats(h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
FsInfo info = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
fs_info = info;
return stat;
}
Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetListing("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
if (!stat_infos) {
return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL");
}
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::GetListing with promise to make it a blocking call.
*
Keep requesting more until we get the entire listing, and don't set the promise
* until we have the entire listing.
*/
auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool {
if (!si.empty()) {
stat_infos->insert(stat_infos->end(), si.begin(), si.end());
}
bool done = !s.ok() || !has_more;
if (done) {
callstate->set_value(s);
return false;
}
return true;
};
GetListing(path, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
Status FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool createparent) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]Mkdirs(" << FMT_THIS_ADDR << ", path=" << path <<
", permissions=" << permissions << ", createparent=" << createparent << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::Mkdirs with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
Mkdirs(path, permissions, createparent, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::Delete(const std::string &path, bool recursive) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]Delete(" << FMT_THIS_ADDR << ", path=" << path << ", recursive=" << recursive << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::Delete with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
Delete(path, recursive, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::Rename(const std::string &oldPath, const std::string &newPath) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]Rename(" << FMT_THIS_ADDR << ", oldPath=" << oldPath << ", newPath=" << newPath << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::Rename with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
Rename(oldPath, newPath, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::SetPermission(const std::string & path, uint16_t permissions) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetPermission(" << FMT_THIS_ADDR << ", path=" << path << ", permissions=" << permissions << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetPermission with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetPermission(path, permissions, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username,
const std::string & groupname) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::SetOwner with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
SetOwner(path, username, groupname, h);
/* block until promise is set */
Status stat = future.get();
return stat;
}
Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]Find("
<< FMT_THIS_ADDR << ", path="
<< path << ", name="
<< name << ") called");
if (!stat_infos) {
return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL");
}
// In this case, we're going to have the async code populate stat_infos.
std::promise<void> promise = std::promise<void>();
std::future<void> future(promise.get_future());
Status status = Status::OK();
/**
* Keep requesting more until we get the entire listing. Set the promise
* when we have the entire listing to stop.
*
* Find guarantees that the handler will only be called once at a time,
* so we do not need any locking here
*/
auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool {
if (!si.empty()) {
stat_infos->insert(stat_infos->end(), si.begin(), si.end());
}
if (!s.ok() && status.ok()){
//We make sure we set 'status' only on the first error.
status = s;
}
if (!has_more_results) {
promise.set_value();
return false;
}
return true;
};
Find(path, name, maxdepth, h);
/* block until promise is set */
future.get();
return status;
}
Status FileSystemImpl::CreateSnapshot(const std::string &path,
const std::string &name) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]CreateSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::CreateSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
CreateSnapshot(path, name, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::DeleteSnapshot(const std::string &path,
const std::string &name) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]DeleteSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ", name=" << name << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::DeleteSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
DeleteSnapshot(path, name, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::AllowSnapshot(const std::string &path) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]AllowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::AllowSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
AllowSnapshot(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
Status FileSystemImpl::DisallowSnapshot(const std::string &path) {
LOG_DEBUG(kFileSystem,
<< "FileSystemImpl::[sync]DisallowSnapshot(" << FMT_THIS_ADDR << ", path=" << path << ") called");
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::DisallowSnapshot with promise to make it a blocking call */
auto h = [callstate](const Status &s) {
callstate->set_value(s);
};
DisallowSnapshot(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = returnstate;
return stat;
}
}

View File

@ -72,7 +72,7 @@ target_link_libraries(remote_block_reader_test test_common reader proto common c
add_memcheck_test(remote_block_reader remote_block_reader_test)
add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(sasl_digest_md5_test common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(sasl_digest_md5 sasl_digest_md5_test)
add_executable(retry_policy_test retry_policy_test.cc)

View File

@ -18,6 +18,7 @@
#include "common/sasl_authenticator.h"
#include <gtest/gtest.h>
#include <google/protobuf/stubs/common.h>
namespace hdfs {
@ -40,5 +41,7 @@ TEST(DigestMD5AuthenticatorTest, TestResponse) {
ASSERT_TRUE(status.ok());
ASSERT_TRUE(result.find("response=3a286c2c385b92a06ebc66d58b8c4330") !=
std::string::npos);
google::protobuf::ShutdownProtobufLibrary();
}
}