HDFS-9118: libhdfs++ Add logging system. Contributed by James Clampffer

This commit is contained in:
James 2016-03-23 18:00:16 -04:00 committed by James Clampffer
parent 8f4a66ab8f
commit 0f1a278dd5
15 changed files with 1177 additions and 44 deletions

View File

@ -18,6 +18,8 @@
#ifndef LIBHDFSPP_HDFS_HDFSEXT
#define LIBHDFSPP_HDFS_HDFSEXT
#include <hdfspp/log.h>
/* get typdefs and #defines from libhdfs' hdfs.h to stay consistent */
#include <hdfs/hdfs.h>
@ -95,21 +97,72 @@ struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory);
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
LIBHDFS_EXTERNAL
int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
char **val);
/**
* Get a configuration integer from the settings currently read into the builder.
*
* @param key The key to find
* @param val (out param) The value. This will NOT be changed if the
* key isn't found.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
/**
* Get a configuration integer from the settings currently read into the builder.
*
* @param key The key to find
* @param val (out param) The value. This will NOT be changed if the
* key isn't found.
*
* @return 0 on success; nonzero error code otherwise.
* Failure to find the key is not an error.
*/
LIBHDFS_EXTERNAL
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val);
/**
* Client can supply a C style function pointer to be invoked any time something
* is logged. Unlike the C++ logger this will not filter by level or component,
* it is up to the consumer to throw away messages they don't want.
*
* Note: The callback provided must be reentrant, the library does not guarentee
* that there won't be concurrent calls.
* Note: Callback does not own the LogData struct. If the client would like to
* keep one around use hdfsCopyLogData/hdfsFreeLogData.
**/
LIBHDFS_EXTERNAL
void hdfsSetLogFunction(void (*hook)(LogData*));
/**
* Create a copy of the LogData object passed in and return a pointer to it.
* Returns null if it was unable to copy/
**/
LIBHDFS_EXTERNAL
LogData *hdfsCopyLogData(const LogData*);
/**
* Client must call this to dispose of the LogData created by hdfsCopyLogData.
**/
LIBHDFS_EXTERNAL
void hdfsFreeLogData(LogData*);
/**
* Enable loggind functionality for a component.
* Return 1 on failure, 0 otherwise.
**/
LIBHDFS_EXTERNAL
int hdfsEnableLoggingForComponent(int component);
/**
* Disable logging functionality for a component.
* Return 1 on failure, 0 otherwise.
**/
LIBHDFS_EXTERNAL
int hdfsDisableLoggingForComponent(int component);
/**
* Set level between trace and error.
* Return 1 on failure, 0 otherwise.
**/
LIBHDFS_EXTERNAL
int hdfsSetLoggingLevel(int component);
#ifdef __cplusplus
} /* end extern "C" */
#endif

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_HDFS_LOG
#define LIBHDFSPP_HDFS_LOG
#ifdef __cplusplus
extern "C" {
#endif
/**
* Things that are part of the public API but are specific to logging live here.
* Added to avoid including the whole public API into the implementation of the logger.
**/
/* logging levels, compatible with enum in lib/common/logging.cc */
#define HDFSPP_LOG_LEVEL_TRACE 0
#define HDFSPP_LOG_LEVEL_DEBUG 1
#define HDFSPP_LOG_LEVEL_INFO 2
#define HDFSPP_LOG_LEVEL_WARN 3
#define HDFSPP_LOG_LEVEL_ERROR 4
/* components emitting messages, compatible with enum lib/common/logging.cc */
#define HDFSPP_LOG_COMPONENT_UNKNOWN 1 << 0
#define HDFSPP_LOG_COMPONENT_RPC 1 << 1
#define HDFSPP_LOG_COMPONENT_BLOCKREADER 1 << 2
#define HDFSPP_LOG_COMPONENT_FILEHANDLE 1 << 3
#define HDFSPP_LOG_COMPONENT_FILESYSTEM 1 << 4
/**
* POD struct for C to consume (C++ interface gets to take advantage of RAII)
**/
typedef struct {
const char *msg;
int level;
int component;
const char *file_name;
int file_line;
} LogData;
#ifdef __cplusplus
} // end extern C
#endif
#endif

View File

@ -21,6 +21,7 @@
#include "fs/filesystem.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include "common/logging.h"
#include <hdfs/hdfs.h>
#include <hdfspp/hdfs_ext.h>
@ -601,3 +602,137 @@ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val
return ReportCaughtNonException();
}
}
/**
* Logging functions
**/
class CForwardingLogger : public LoggerInterface {
public:
CForwardingLogger() : callback_(nullptr) {};
// Converts LogMessage into LogData, a POD type,
// and invokes callback_ if it's not null.
void Write(const LogMessage& msg);
// pass in NULL to clear the hook
void SetCallback(void (*callback)(LogData*));
//return a copy, or null on failure.
static LogData *CopyLogData(const LogData*);
//free LogData allocated with CopyLogData
static void FreeLogData(LogData*);
private:
void (*callback_)(LogData*);
};
/**
* Plugin to forward message to a C function pointer
**/
void CForwardingLogger::Write(const LogMessage& msg) {
if(!callback_)
return;
const std::string text = msg.MsgString();
LogData data;
data.level = msg.level();
data.component = msg.component();
data.msg = text.c_str();
data.file_name = msg.file_name();
data.file_line = msg.file_line();
callback_(&data);
}
void CForwardingLogger::SetCallback(void (*callback)(LogData*)) {
callback_ = callback;
}
LogData *CForwardingLogger::CopyLogData(const LogData *orig) {
if(!orig)
return nullptr;
LogData *copy = (LogData*)malloc(sizeof(LogData));
if(!copy)
return nullptr;
copy->level = orig->level;
copy->component = orig->component;
if(orig->msg)
copy->msg = strdup(orig->msg);
copy->file_name = orig->file_name;
copy->file_line = orig->file_line;
return copy;
}
void CForwardingLogger::FreeLogData(LogData *data) {
if(!data)
return;
if(data->msg)
free((void*)data->msg);
// Inexpensive way to help catch use-after-free
memset(data, 0, sizeof(LogData));
free(data);
}
LogData *hdfsCopyLogData(LogData *data) {
return CForwardingLogger::CopyLogData(data);
}
void hdfsFreeLogData(LogData *data) {
CForwardingLogger::FreeLogData(data);
}
void hdfsSetLogFunction(void (*callback)(LogData*)) {
CForwardingLogger *logger = new CForwardingLogger();
logger->SetCallback(callback);
LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface>(logger));
}
static bool IsLevelValid(int component) {
if(component < HDFSPP_LOG_LEVEL_TRACE || component > HDFSPP_LOG_LEVEL_ERROR)
return false;
return true;
}
// should use __builtin_popcnt as optimization on some platforms
static int popcnt(int val) {
int bits = sizeof(val) * 8;
int count = 0;
for(int i=0; i<bits; i++) {
if((val >> i) & 0x1)
count++;
}
return count;
}
static bool IsComponentValid(int component) {
if(component < HDFSPP_LOG_COMPONENT_UNKNOWN || component > HDFSPP_LOG_COMPONENT_FILESYSTEM)
return false;
if(popcnt(component) != 1)
return false;
return true;
}
int hdfsEnableLoggingForComponent(int component) {
if(!IsComponentValid(component))
return 1;
LogManager::EnableLogForComponent(static_cast<LogSourceComponent>(component));
return 0;
}
int hdfsDisableLoggingForComponent(int component) {
if(!IsComponentValid(component))
return 1;
LogManager::DisableLogForComponent(static_cast<LogSourceComponent>(component));
return 0;
}
int hdfsSetLoggingLevel(int level) {
if(!IsLevelValid(level))
return 1;
LogManager::SetLogLevel(static_cast<LogLevel>(level));
return 0;
}

View File

@ -19,6 +19,6 @@ if(NEED_LINK_DL)
set(LIB_DL dl)
endif()
add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc)
add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc uri.cc util.cc retry_policy.cc cancel_tracker.cc logging.cc)
add_library(common $<TARGET_OBJECTS:common_obj> $<TARGET_OBJECTS:uriparser2_obj>)
target_link_libraries(common ${LIB_DL})

View File

@ -36,9 +36,9 @@ void IoServiceImpl::Run() {
io_service_.run();
break;
} catch (const std::exception & e) {
LOG_WARN() << "Unexpected exception in libhdfspp worker thread: " << e.what();
LOG_WARN(kFileSystem, << "Unexpected exception in libhdfspp worker thread: " << e.what());
} catch (...) {
LOG_WARN() << "Unexpected value not derived from std::exception in libhdfspp worker thread";
LOG_WARN(kFileSystem, << "Unexpected value not derived from std::exception in libhdfspp worker thread");
}
}
}

View File

@ -0,0 +1,216 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "logging.h"
#include <ctime>
#include <cstring>
#include <thread>
#include <iostream>
#include <sstream>
namespace hdfs
{
LogManager::LogManager() {}
std::unique_ptr<LoggerInterface> LogManager::logger_impl_(new StderrLogger());
std::mutex LogManager::impl_lock_;
uint32_t LogManager::component_mask_ = 0xFFFFFFFF;
uint32_t LogManager::level_threshold_ = kWarning;
void LogManager::DisableLogForComponent(LogSourceComponent c) {
// AND with all bits other than one we want to unset
std::lock_guard<std::mutex> impl_lock(impl_lock_);
component_mask_ &= ~c;
}
void LogManager::EnableLogForComponent(LogSourceComponent c) {
// OR with bit to set
std::lock_guard<std::mutex> impl_lock(impl_lock_);
component_mask_ |= c;
}
void LogManager::SetLogLevel(LogLevel level) {
std::lock_guard<std::mutex> impl_lock(impl_lock_);
level_threshold_ = level;
}
void LogManager::Write(const LogMessage& msg) {
std::lock_guard<std::mutex> impl_lock(impl_lock_);
if(logger_impl_)
logger_impl_->Write(msg);
}
void LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface> impl) {
std::lock_guard<std::mutex> impl_lock(impl_lock_);
logger_impl_.reset(impl.release());
}
/**
* Simple plugin to dump logs to stderr
**/
void StderrLogger::Write(const LogMessage& msg) {
std::stringstream formatted;
if(show_level_)
formatted << msg.level_string();
if(show_component_)
formatted << msg.component_string();
if(show_timestamp_) {
time_t current_time = std::time(nullptr);
char timestr[128];
memset(timestr, 0, 128);
int res = std::strftime(timestr, 128, "%a %b %e %H:%M:%S %Y", std::localtime(&current_time));
if(res > 0) {
formatted << '[' << (const char*)timestr << ']';
} else {
formatted << "[Error formatting timestamp]";
}
}
if(show_component_) {
formatted << "[Thread id = " << std::this_thread::get_id() << ']';
}
if(show_file_) {
// __FILE__ contains absolute path, which is giant if doing a build inside the
// Hadoop tree. Trim down to relative to libhdfspp/
std::string abs_path(msg.file_name());
size_t rel_path_idx = abs_path.find("libhdfspp/");
// Default to whole string if library is being built in an odd way
if(rel_path_idx == std::string::npos)
rel_path_idx = 0;
formatted << '[' << (const char*)&abs_path[rel_path_idx] << ":" << msg.file_line() << ']';
}
std::cerr << formatted.str() << " " << msg.MsgString() << std::endl;
}
void StderrLogger::set_show_timestamp(bool show) {
show_timestamp_ = show;
}
void StderrLogger::set_show_level(bool show) {
show_level_ = show;
}
void StderrLogger::set_show_thread(bool show) {
show_thread_ = show;
}
void StderrLogger::set_show_component(bool show) {
show_component_ = show;
}
LogMessage::~LogMessage() {
LogManager::Write(*this);
}
LogMessage& LogMessage::operator<<(const std::string *str) {
if(str)
msg_buffer_ << str;
else
msg_buffer_ << "<nullptr>";
return *this;
}
LogMessage& LogMessage::operator<<(const std::string& str) {
msg_buffer_ << str;
return *this;
}
LogMessage& LogMessage::operator<<(const char *str) {
if(str)
msg_buffer_ << str;
else
msg_buffer_ << "<nullptr>";
return *this;
}
LogMessage& LogMessage::operator<<(bool val) {
if(val)
msg_buffer_ << "true";
else
msg_buffer_ << "false";
return *this;
}
LogMessage& LogMessage::operator<<(int32_t val) {
msg_buffer_ << val;
return *this;
}
LogMessage& LogMessage::operator<<(uint32_t val) {
msg_buffer_ << val;
return *this;
}
LogMessage& LogMessage::operator<<(int64_t val) {
msg_buffer_ << val;
return *this;
}
LogMessage& LogMessage::operator<<(uint64_t val) {
msg_buffer_ << val;
return *this;
}
LogMessage& LogMessage::operator<<(void *ptr) {
msg_buffer_ << ptr;
return *this;
}
std::string LogMessage::MsgString() const {
return msg_buffer_.str();
}
const char * kLevelStrings[5] = {
"[TRACE ]",
"[DEBUG ]",
"[INFO ]",
"[WARN ]",
"[ERROR ]"
};
const char * LogMessage::level_string() const {
return kLevelStrings[level_];
}
const char * kComponentStrings[5] = {
"[Unknown ]",
"[RPC ]",
"[BlockReader ]",
"[FileHandle ]",
"[FileSystem ]"
};
const char * LogMessage::component_string() const {
switch(component_) {
case kRPC: return kComponentStrings[1];
case kBlockReader: return kComponentStrings[2];
case kFileHandle: return kComponentStrings[3];
case kFileSystem: return kComponentStrings[4];
default: return kComponentStrings[0];
}
}
}

View File

@ -19,41 +19,184 @@
#ifndef LIB_COMMON_LOGGING_H_
#define LIB_COMMON_LOGGING_H_
#include "hdfspp/log.h"
#include <iostream>
#include <sstream>
#include <mutex>
#include <memory>
namespace hdfs {
/**
* Logging mechanism to provide lightweight logging to stderr as well as
* as a callback mechanism to allow C clients and larger third party libs
* to be used to handle logging. When adding a new log message to the
* library use the macros defined below (LOG_TRACE..LOG_ERROR) rather than
* using the LogMessage and LogManager objects directly.
**/
enum LogLevel {
kDebug,
kInfo,
kWarning,
kError,
kTrace = 0,
kDebug = 1,
kInfo = 2,
kWarning = 3,
kError = 4,
};
#define LOG_DEBUG() LogMessage(kDebug)
#define LOG_INFO() LogMessage(kInfo)
#define LOG_WARN() LogMessage(kWarning)
#define LOG_ERROR() LogMessage(kError)
enum LogSourceComponent {
kUnknown = 1 << 0,
kRPC = 1 << 1,
kBlockReader = 1 << 2,
kFileHandle = 1 << 3,
kFileSystem = 1 << 4,
};
class LogMessage {
#define LOG_TRACE(C, MSG) do { \
if(LogManager::ShouldLog(kTrace,C)) { \
LogMessage(kTrace, __FILE__, __LINE__, C) MSG; \
}} while (0);
#define LOG_DEBUG(C, MSG) do { \
if(LogManager::ShouldLog(kDebug,C)) { \
LogMessage(kDebug, __FILE__, __LINE__, C) MSG; \
}} while (0);
#define LOG_INFO(C, MSG) do { \
if(LogManager::ShouldLog(kInfo,C)) { \
LogMessage(kInfo, __FILE__, __LINE__, C) MSG; \
}} while (0);
#define LOG_WARN(C, MSG) do { \
if(LogManager::ShouldLog(kWarning,C)) { \
LogMessage(kWarning, __FILE__, __LINE__, C) MSG; \
}} while (0);
#define LOG_ERROR(C, MSG) do { \
if(LogManager::ShouldLog(kError,C)) { \
LogMessage(kError, __FILE__, __LINE__, C) MSG; \
}} while (0);
class LogMessage;
class LoggerInterface {
public:
LogMessage(const LogLevel &l) {
static constexpr const char * kLogLevelMessage[] = {"DEBUG", "INFO", "WARN", "ERROR"};
::std::cerr << "[" << kLogLevelMessage[(size_t)l] << "] ";
}
LoggerInterface() {};
virtual ~LoggerInterface() {};
~LogMessage() {
::std::cerr << std::endl;
}
/**
* User defined handling messages, common case would be printing somewhere.
**/
virtual void Write(const LogMessage& msg) = 0;
};
LogMessage& operator<<(const std::string& msg) {
::std::cerr << msg;
return *this;
}
LogMessage& operator<<(int x) {
::std::cerr << x;
return *this;
/**
* StderrLogger unsuprisingly dumps messages to stderr.
* This is the default logger if nothing else is explicitly set.
**/
class StderrLogger : public LoggerInterface {
public:
StderrLogger() : show_timestamp_(true), show_level_(true),
show_thread_(true), show_component_(true),
show_file_(true) {}
void Write(const LogMessage& msg);
void set_show_timestamp(bool show);
void set_show_level(bool show);
void set_show_thread(bool show);
void set_show_component(bool show);
private:
bool show_timestamp_;
bool show_level_;
bool show_thread_;
bool show_component_;
bool show_file_;
};
/**
* LogManager provides a thread safe static interface to the underlying
* logger implementation.
**/
class LogManager {
friend class LogMessage;
public:
// allow easy inlining
static bool ShouldLog(LogLevel level, LogSourceComponent source) {
std::lock_guard<std::mutex> impl_lock(impl_lock_);
if(level < level_threshold_)
return false;
if(!(source & component_mask_))
return false;
return true;
}
static void Write(const LogMessage & msg);
static void EnableLogForComponent(LogSourceComponent c);
static void DisableLogForComponent(LogSourceComponent c);
static void SetLogLevel(LogLevel level);
static void SetLoggerImplementation(std::unique_ptr<LoggerInterface> impl);
private:
// don't create instances of this
LogManager();
// synchronize all unsafe plugin calls
static std::mutex impl_lock_;
static std::unique_ptr<LoggerInterface> logger_impl_;
// component and level masking
static uint32_t component_mask_;
static uint32_t level_threshold_;
};
/**
* LogMessage contains message text, along with other metadata about the message.
* Note: For performance reasons a set of macros (see top of file) is used to
* create these inside of an if block. Do not instantiate these directly, doing
* so will cause the message to be uncontitionally logged. This minor inconvinience
* gives us a ~20% performance increase in the (common) case where few messages
* are worth logging; std::stringstream is expensive to construct.
**/
class LogMessage {
friend class LogManager;
public:
LogMessage(const LogLevel &l, const char *file, int line,
LogSourceComponent component = kUnknown) :
level_(l), component_(component), origin_file_(file), origin_line_(line){}
~LogMessage();
const char *level_string() const;
const char *component_string() const;
LogLevel level() const {return level_; }
LogSourceComponent component() const {return component_; }
int file_line() const {return origin_line_; }
const char * file_name() const {return origin_file_; }
//print as-is, indicates when a nullptr was passed in
LogMessage& operator<<(const char *);
LogMessage& operator<<(const std::string*);
LogMessage& operator<<(const std::string&);
//convert to a string "true"/"false"
LogMessage& operator<<(bool);
LogMessage& operator<<(int32_t);
LogMessage& operator<<(uint32_t);
LogMessage& operator<<(int64_t);
LogMessage& operator<<(uint64_t);
//print address as hex
LogMessage& operator<<(void *);
std::string MsgString() const;
private:
LogLevel level_;
LogSourceComponent component_;
const char *origin_file_;
const int origin_line_;
std::stringstream msg_buffer_;
};
}

View File

@ -18,12 +18,15 @@
#include "filehandle.h"
#include "common/continuation/continuation.h"
#include "common/logging.h"
#include "connection/datanodeconnection.h"
#include "reader/block_reader.h"
#include <future>
#include <tuple>
#define FMT_THIS_ADDR "this=" << (void*)this
namespace hdfs {
using ::hadoop::hdfs::LocatedBlocksProto;
@ -35,11 +38,17 @@ FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string
std::shared_ptr<BadDataNodeTracker> bad_data_nodes)
: io_service_(io_service), client_name_(client_name), file_info_(file_info),
bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::FileHandleImpl("
<< FMT_THIS_ADDR << ", ...) called");
}
void FileHandleImpl::PositionRead(
void *buf, size_t nbyte, uint64_t offset,
const std::function<void(const Status &, size_t)> &handler) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::PositionRead("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", nbyte=" << nbyte << ") called");
/* prevent usage after cancelation */
if(cancel_state_->is_canceled()) {
handler(Status::Canceled(), 0);
@ -61,6 +70,10 @@ void FileHandleImpl::PositionRead(
}
Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", nbyte=" << *nbyte << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
std::future<std::tuple<Status, size_t>> future(callstate->get_future());
@ -84,6 +97,10 @@ Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
}
Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::Read("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", nbyte=" << *nbyte << ") called");
Status stat = PositionRead(buf, nbyte, offset_);
if(!stat.ok()) {
return stat;
@ -94,6 +111,9 @@ Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
}
Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::Seek("
<< ", offset=" << *offset << ", ...) called");
if(cancel_state_->is_canceled()) {
return Status::Canceled();
}
@ -146,6 +166,9 @@ void FileHandleImpl::AsyncPreadSome(
using ::hadoop::hdfs::DatanodeInfoProto;
using ::hadoop::hdfs::LocatedBlockProto;
LOG_TRACE(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
<< FMT_THIS_ADDR << ", ...) called");
if(cancel_state_->is_canceled()) {
handler(Status::Canceled(), "", 0);
return;
@ -161,6 +184,8 @@ void FileHandleImpl::AsyncPreadSome(
});
if (block == file_info_->blocks_.end()) {
LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome(" << FMT_THIS_ADDR
<< ", ...) Cannot find corresponding blocks");
handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0);
return;
}
@ -179,6 +204,9 @@ void FileHandleImpl::AsyncPreadSome(
});
if (it == datanodes.end()) {
LOG_WARN(kFileHandle, << "FileHandleImpl::AsyncPreadSome("
<< FMT_THIS_ADDR << ", ...) No datanodes available");
handler(Status::ResourceUnavailable("No datanodes available"), "", 0);
return;
}
@ -224,6 +252,11 @@ std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReader
std::shared_ptr<DataNodeConnection> dn)
{
std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_);
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateBlockReader(" << FMT_THIS_ADDR
<< ", ..., dnconn=" << dn.get()
<< ") called. New BlockReader = " << reader.get());
readers_.AddReader(reader);
return reader;
}
@ -232,10 +265,15 @@ std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection(
::asio::io_service * io_service,
const ::hadoop::hdfs::DatanodeInfoProto & dn,
const hadoop::common::TokenProto * token) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::CreateDataNodeConnection("
<< FMT_THIS_ADDR << ", ...) called");
return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token);
}
void FileHandleImpl::CancelOperations() {
LOG_INFO(kFileHandle, << "FileHandleImpl::CancelOperations("
<< FMT_THIS_ADDR << ") called");
cancel_state_->set_canceled();
/* Push update to BlockReaders that may be hung in an asio call */

View File

@ -19,6 +19,7 @@
#include "filesystem.h"
#include "common/continuation/asio.h"
#include "common/util.h"
#include "common/logging.h"
#include <asio/ip/tcp.hpp>
@ -29,6 +30,8 @@
#include <iostream>
#include <pwd.h>
#define FMT_THIS_ADDR "this=" << (void*)this
namespace hdfs {
static const char kNamenodeProtocol[] =
@ -66,6 +69,9 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
using ::hadoop::hdfs::GetBlockLocationsRequestProto;
using ::hadoop::hdfs::GetBlockLocationsResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
<< FMT_THIS_ADDR << ", path=" << path << ", ...) called");
struct State {
GetBlockLocationsRequestProto req;
std::shared_ptr<GetBlockLocationsResponseProto> resp;
@ -158,6 +164,9 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n
kNamenodeProtocolVersion), client_name_(GetRandomClientName()),
bad_node_tracker_(std::make_shared<BadDataNodeTracker>())
{
LOG_TRACE(kFileSystem, << "FileSystemImpl::FileSystemImpl("
<< FMT_THIS_ADDR << ") called");
// Poor man's move
io_service = nullptr;
@ -169,6 +178,9 @@ FileSystemImpl::FileSystemImpl(IoService *&io_service, const std::string &user_n
}
FileSystemImpl::~FileSystemImpl() {
LOG_TRACE(kFileSystem, << "FileSystemImpl::~FileSystemImpl("
<< FMT_THIS_ADDR << ") called");
/**
* Note: IoService must be stopped before getting rid of worker threads.
* Once worker threads are joined and deleted the service can be deleted.
@ -180,6 +192,10 @@ FileSystemImpl::~FileSystemImpl() {
void FileSystemImpl::Connect(const std::string &server,
const std::string &service,
const std::function<void(const Status &, FileSystem * fs)> &handler) {
LOG_INFO(kFileSystem, << "FileSystemImpl::Connect(" << FMT_THIS_ADDR
<< ", server=" << server << ", service="
<< service << ") called");
/* IoService::New can return nullptr */
if (!io_service_) {
handler (Status::Error("Null IoService"), this);
@ -191,6 +207,9 @@ void FileSystemImpl::Connect(const std::string &server,
}
Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Connect(" << FMT_THIS_ADDR
<< ", server=" << server << ", service=" << service << ") called");
/* synchronized */
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
@ -252,6 +271,10 @@ Status FileSystemImpl::ConnectToDefaultFs() {
int FileSystemImpl::AddWorkerThread() {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::AddWorkerThread("
<< FMT_THIS_ADDR << ") called."
<< " Existing thread count = " << worker_threads_.size());
auto service_task = [](IoService *service) { service->Run(); };
worker_threads_.push_back(
WorkerPtr(new std::thread(service_task, io_service_.get())));
@ -261,6 +284,9 @@ int FileSystemImpl::AddWorkerThread() {
void FileSystemImpl::Open(
const std::string &path,
const std::function<void(const Status &, FileHandle *)> &handler) {
LOG_INFO(kFileSystem, << "FileSystemImpl::Open("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) {
handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_)
@ -270,6 +296,10 @@ void FileSystemImpl::Open(
Status FileSystemImpl::Open(const std::string &path,
FileHandle **handle) {
LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Open("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>();
std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future());
@ -302,9 +332,9 @@ void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
// from within one of the worker threads, leading to a deadlock. Let's
// provide some explicit protection.
if(t->get_id() == std::this_thread::get_id()) {
//TODO: When we get good logging support, add it in here
std::cerr << "FATAL: Attempted to destroy a thread pool from within a "
"callback of the thread pool.\n";
LOG_ERROR(kFileSystem, << "FileSystemImpl::WorkerDeleter::operator(treadptr="
<< t << ") : FATAL: Attempted to destroy a thread pool"
"from within a callback of the thread pool!");
}
t->join();
delete t;

View File

@ -19,12 +19,17 @@
#include "reader/datatransfer.h"
#include "common/continuation/continuation.h"
#include "common/continuation/asio.h"
#include "common/logging.h"
#include <future>
namespace hdfs {
#define FMT_CONT_AND_PARENT_ADDR "this=" << (void*)this << ", parent=" << (void*)parent_
#define FMT_CONT_AND_READER_ADDR "this=" << (void*)this << ", reader=" << (void*)reader_
#define FMT_THIS_ADDR "this=" << (void*)this
hadoop::hdfs::OpReadBlockProto
ReadBlockProto(const std::string &client_name, bool verify_checksum,
const hadoop::common::TokenProto *token,
@ -54,6 +59,10 @@ void BlockReaderImpl::AsyncRequestBlock(
const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
uint64_t offset, const std::function<void(Status)> &handler) {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncRequestBlock("
<< FMT_THIS_ADDR << ", ..., length="
<< length << ", offset=" << offset << ", ...) called");
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
@ -103,6 +112,10 @@ Status BlockReaderImpl::RequestBlock(
const std::string &client_name,
const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length,
uint64_t offset) {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlock("
<< FMT_THIS_ADDR <<"..., length="
<< length << ", offset=" << offset << ") called");
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
AsyncRequestBlock(client_name, block, length, offset,
@ -121,6 +134,9 @@ struct BlockReaderImpl::ReadPacketHeader
ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacketHeader::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
parent_->packet_data_read_bytes_ = 0;
parent_->packet_len_ = 0;
auto handler = [next, this](const asio::error_code &ec, size_t) {
@ -178,6 +194,9 @@ struct BlockReaderImpl::ReadChecksum : continuation::Continuation {
ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadChecksum::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
auto parent = parent_;
if (parent->state_ != kReadChecksum) {
next(Status::OK());
@ -216,6 +235,9 @@ struct BlockReaderImpl::ReadData : continuation::Continuation {
}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadData::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
auto handler =
[next, this](const asio::error_code &ec, size_t transferred) {
Status status;
@ -251,6 +273,9 @@ struct BlockReaderImpl::ReadPadding : continuation::Continuation {
parent, bytes_transferred_, asio::buffer(padding_))) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPadding::Run("
<< FMT_CONT_AND_PARENT_ADDR << ") called");
if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) {
next(Status::OK());
return;
@ -282,6 +307,8 @@ struct BlockReaderImpl::AckRead : continuation::Continuation {
AckRead(BlockReaderImpl *parent) : parent_(parent) {}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AckRead::Run(" << FMT_CONT_AND_PARENT_ADDR << ") called");
if (parent_->bytes_to_read_ > 0) {
next(Status::OK());
return;
@ -314,6 +341,8 @@ void BlockReaderImpl::AsyncReadPacket(
const std::function<void(const Status &, size_t bytes_transferred)> &handler) {
assert(state_ != kOpen && "Not connected");
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadPacket called");
struct State {
std::shared_ptr<size_t> bytes_transferred;
};
@ -337,6 +366,8 @@ void BlockReaderImpl::AsyncReadPacket(
size_t
BlockReaderImpl::ReadPacket(const MutableBuffers &buffers,
Status *status) {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadPacket called");
size_t transferred = 0;
auto done = std::make_shared<std::promise<void>>();
auto future = done->get_future();
@ -361,6 +392,9 @@ struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation {
}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::RequestBlockContinuation::Run("
<< FMT_CONT_AND_READER_ADDR << ") called");
reader_->AsyncRequestBlock(client_name_, &block_, length_,
offset_, next);
}
@ -381,6 +415,8 @@ struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation {
}
virtual void Run(const Next &next) override {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::ReadBlockContinuation::Run("
<< FMT_CONT_AND_READER_ADDR << ") called");
*transferred_ = 0;
next_ = next;
OnReadData(Status::OK(), 0);
@ -415,6 +451,8 @@ void BlockReaderImpl::AsyncReadBlock(
size_t offset,
const MutableBuffers &buffers,
const std::function<void(const Status &, size_t)> handler) {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::AsyncReadBlock("
<< FMT_THIS_ADDR << ") called");
auto m = continuation::Pipeline<size_t>::Create(cancel_state_);
size_t * bytesTransferred = &m->state();
@ -432,6 +470,8 @@ void BlockReaderImpl::AsyncReadBlock(
}
void BlockReaderImpl::CancelOperation() {
LOG_TRACE(kBlockReader, << "BlockReaderImpl::CancelOperation("
<< FMT_THIS_ADDR << ") called");
/* just forward cancel to DNConnection */
dn_->Cancel();
}

View File

@ -191,7 +191,7 @@ void RpcConnection::HandleRpcResponse(std::shared_ptr<Response> response) {
auto req = RemoveFromRunningQueue(h.callid());
if (!req) {
LOG_WARN() << "RPC response with Unknown call id " << h.callid();
LOG_WARN(kRPC, << "RPC response with Unknown call id " << h.callid());
return;
}

View File

@ -63,11 +63,14 @@ template <class NextLayer>
RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine)
: RpcConnection(engine),
options_(engine->options()),
next_layer_(engine->io_service()) {}
next_layer_(engine->io_service()) {
LOG_TRACE(kRPC, << "RpcConnectionImpl::RpcConnectionImpl called");
}
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Connect(
const std::vector<::asio::ip::tcp::endpoint> &server, RpcCallback &handler) {
LOG_TRACE(kRPC, << "RpcConnectionImpl::Connect called");
auto connectionSuccessfulReq = std::make_shared<Request>(
engine_, [handler](::google::protobuf::io::CodedInputStream *is,
const Status &status) {
@ -105,6 +108,8 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
auto shared_this = RpcConnectionImpl<NextLayer>::shared_from_this();
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
LOG_TRACE(kRPC, << "RpcConnectionImpl::ConnectComplete called");
Status status = ToStatus(ec);
if (status.ok()) {
StartReading();
@ -131,6 +136,9 @@ void RpcConnectionImpl<NextLayer>::ConnectComplete(const ::asio::error_code &ec)
template <class NextLayer>
void RpcConnectionImpl<NextLayer>::HandshakeComplete(const Status &s) {
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
LOG_TRACE(kRPC, << "RpcConnectionImpl::HandshakeComplete called");
if (s.ok()) {
FlushPendingRequests();
} else {
@ -143,6 +151,8 @@ template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Handshake(RpcCallback &handler) {
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
LOG_TRACE(kRPC, << "RpcConnectionImpl::Handshake called");
auto shared_this = shared_from_this();
auto handshake_packet = PrepareHandshakePacket();
::asio::async_write(next_layer_, asio::buffer(*handshake_packet),
@ -163,9 +173,11 @@ void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec,
using std::placeholders::_2;
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnSendCompleted called");
request_over_the_wire_.reset();
if (ec) {
LOG_WARN() << "Network error during RPC write: " << ec.message();
LOG_WARN(kRPC, << "Network error during RPC write: " << ec.message());
CommsError(ToStatus(ec));
return;
}
@ -180,6 +192,8 @@ void RpcConnectionImpl<NextLayer>::FlushPendingRequests() {
// Lock should be held
assert(lock_held(connection_state_lock_));
LOG_TRACE(kRPC, << "RpcConnectionImpl::FlushPendingRequests called");
if (pending_requests_.empty()) {
return;
}
@ -233,6 +247,8 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
using std::placeholders::_2;
std::lock_guard<std::mutex> state_lock(connection_state_lock_);
LOG_TRACE(kRPC, << "RpcConnectionImpl::OnRecvCompleted called");
std::shared_ptr<RpcConnection> shared_this = shared_from_this();
switch (ec.value()) {
@ -243,7 +259,7 @@ void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec,
// The event loop has been shut down. Ignore the error.
return;
default:
LOG_WARN() << "Network error during RPC read: " << ec.message();
LOG_WARN(kRPC, << "Network error during RPC read: " << ec.message());
CommsError(ToStatus(ec));
return;
}
@ -281,6 +297,8 @@ template <class NextLayer>
void RpcConnectionImpl<NextLayer>::Disconnect() {
assert(lock_held(connection_state_lock_)); // Must be holding lock before calling
LOG_INFO(kRPC, << "RpcConnectionImpl::Disconnect called");
request_over_the_wire_.reset();
if (connected_) {
next_layer_.cancel();

View File

@ -18,6 +18,7 @@
#include "rpc_engine.h"
#include "rpc_connection.h"
#include "common/util.h"
#include "common/logging.h"
#include "optional.hpp"
#include <future>
@ -38,11 +39,15 @@ RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options,
protocol_version_(protocol_version),
retry_policy_(std::move(MakeRetryPolicy(options))),
call_id_(0),
retry_timer(*io_service) {}
retry_timer(*io_service) {
LOG_DEBUG(kRPC, << "RpcEngine::RpcEngine called");
}
void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
RpcCallback &handler) {
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
LOG_DEBUG(kRPC, << "RpcEngine::Connect called");
last_endpoints_ = server;
conn_ = NewConnection();
@ -50,6 +55,7 @@ void RpcEngine::Connect(const std::vector<::asio::ip::tcp::endpoint> &server,
}
void RpcEngine::Shutdown() {
LOG_DEBUG(kRPC, << "RpcEngine::Shutdown called");
io_service_->post([this]() {
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
conn_->Disconnect();
@ -58,6 +64,7 @@ void RpcEngine::Shutdown() {
}
std::unique_ptr<const RetryPolicy> RpcEngine::MakeRetryPolicy(const Options &options) {
LOG_DEBUG(kRPC, << "RpcEngine::MakeRetryPolicy called");
if (options.max_rpc_retries > 0) {
return std::unique_ptr<RetryPolicy>(new FixedDelayRetryPolicy(options.rpc_retry_delay_ms, options.max_rpc_retries));
} else {
@ -74,6 +81,9 @@ void RpcEngine::AsyncRpc(
const std::shared_ptr<::google::protobuf::MessageLite> &resp,
const std::function<void(const Status &)> &handler) {
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
LOG_TRACE(kRPC, << "RpcEngine::AsyncRpc called");
if (!conn_) {
conn_ = NewConnection();
conn_->ConnectAndFlush(last_endpoints_);
@ -84,6 +94,9 @@ void RpcEngine::AsyncRpc(
Status RpcEngine::Rpc(
const std::string &method_name, const ::google::protobuf::MessageLite *req,
const std::shared_ptr<::google::protobuf::MessageLite> &resp) {
LOG_TRACE(kRPC, << "RpcEngine::Rpc called");
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
AsyncRpc(method_name, req, resp,
@ -93,12 +106,16 @@ Status RpcEngine::Rpc(
std::shared_ptr<RpcConnection> RpcEngine::NewConnection()
{
LOG_DEBUG(kRPC, << "RpcEngine::NewConnection called");
return std::make_shared<RpcConnectionImpl<::asio::ip::tcp::socket>>(this);
}
Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
std::shared_ptr<std::string> resp) {
LOG_TRACE(kRPC, << "RpcEngine::RawRpc called");
std::shared_ptr<RpcConnection> conn;
{
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
@ -119,6 +136,8 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req,
void RpcEngine::AsyncRpcCommsError(
const Status &status,
std::vector<std::shared_ptr<Request>> pendingRequests) {
LOG_ERROR(kRPC, << "RpcEngine::AsyncRpcCommsError called");
io_service().post([this, status, pendingRequests]() {
RpcCommsError(status, pendingRequests);
});
@ -129,6 +148,8 @@ void RpcEngine::RpcCommsError(
std::vector<std::shared_ptr<Request>> pendingRequests) {
(void)status;
LOG_ERROR(kRPC, << "RpcEngine::RpcCommsError called");
std::lock_guard<std::mutex> state_lock(engine_state_lock_);
auto head_action = optional<RetryAction>();

View File

@ -103,8 +103,13 @@ add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfs
build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c)
link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static)
endif(HADOOP_BUILD)
add_executable(hdfs_builder_test hdfs_builder_test.cc)
target_link_libraries(hdfs_builder_test test_common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(hdfs_builder_test hdfs_builder_test)
add_executable(logging_test logging_test.cc)
target_link_libraries(logging_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(logging_test logging_test)

View File

@ -0,0 +1,374 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <common/logging.h>
#include <bindings/c/hdfs.cc>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <iostream>
using namespace hdfs;
struct log_state {
int trace_count;
int debug_count;
int info_count;
int warning_count;
int error_count;
int origin_unknown;
int origin_rpc;
int origin_blockreader;
int origin_filehandle;
int origin_filesystem;
std::string msg;
log_state() {
reset();
}
void reset() {
trace_count = 0;
debug_count = 0;
info_count = 0;
warning_count = 0;
error_count = 0;
origin_unknown = 0;
origin_rpc = 0;
origin_blockreader = 0;
origin_filehandle = 0;
origin_filesystem = 0;
msg = "";
}
};
log_state log_state_instance;
void process_log_msg(LogData *data) {
if(data->msg)
log_state_instance.msg = data->msg;
switch(data->level) {
case HDFSPP_LOG_LEVEL_TRACE:
log_state_instance.trace_count++;
break;
case HDFSPP_LOG_LEVEL_DEBUG:
log_state_instance.debug_count++;
break;
case HDFSPP_LOG_LEVEL_INFO:
log_state_instance.info_count++;
break;
case HDFSPP_LOG_LEVEL_WARN:
log_state_instance.warning_count++;
break;
case HDFSPP_LOG_LEVEL_ERROR:
log_state_instance.error_count++;
break;
default:
//should never happen
std::cout << "foo" << std::endl;
ASSERT_FALSE(true);
}
switch(data->component) {
case HDFSPP_LOG_COMPONENT_UNKNOWN:
log_state_instance.origin_unknown++;
break;
case HDFSPP_LOG_COMPONENT_RPC:
log_state_instance.origin_rpc++;
break;
case HDFSPP_LOG_COMPONENT_BLOCKREADER:
log_state_instance.origin_blockreader++;
break;
case HDFSPP_LOG_COMPONENT_FILEHANDLE:
log_state_instance.origin_filehandle++;
break;
case HDFSPP_LOG_COMPONENT_FILESYSTEM:
log_state_instance.origin_filesystem++;
break;
default:
std::cout << "bar" << std::endl;
ASSERT_FALSE(true);
}
}
void reset_log_counters() {
log_state_instance.reset();
}
void assert_nothing_logged() {
if(log_state_instance.trace_count || log_state_instance.debug_count ||
log_state_instance.info_count || log_state_instance.warning_count ||
log_state_instance.error_count) {
ASSERT_FALSE(true);
}
}
void assert_trace_logged() { ASSERT_TRUE(log_state_instance.trace_count > 0); }
void assert_debug_logged() { ASSERT_TRUE(log_state_instance.debug_count > 0); }
void assert_info_logged() { ASSERT_TRUE(log_state_instance.info_count > 0); }
void assert_warning_logged() { ASSERT_TRUE(log_state_instance.warning_count > 0); }
void assert_error_logged() { ASSERT_TRUE(log_state_instance.error_count > 0); }
void assert_no_trace_logged() { ASSERT_EQ(log_state_instance.trace_count, 0); }
void assert_no_debug_logged() { ASSERT_EQ(log_state_instance.debug_count, 0); }
void assert_no_info_logged() { ASSERT_EQ(log_state_instance.info_count, 0); }
void assert_no_warning_logged() { ASSERT_EQ(log_state_instance.warning_count, 0); }
void assert_no_error_logged() { ASSERT_EQ(log_state_instance.error_count, 0); }
void assert_unknown_logged() { ASSERT_TRUE(log_state_instance.origin_unknown > 0); }
void assert_rpc_logged() { ASSERT_TRUE(log_state_instance.origin_rpc > 0); }
void assert_blockreader_logged() { ASSERT_TRUE(log_state_instance.origin_blockreader > 0); }
void assert_filehandle_logged() { ASSERT_TRUE(log_state_instance.origin_filehandle > 0); }
void assert_filesystem_logged() { ASSERT_TRUE(log_state_instance.origin_filesystem > 0); }
void assert_no_unknown_logged() { ASSERT_EQ(log_state_instance.origin_unknown, 0); }
void assert_no_rpc_logged() { ASSERT_EQ(log_state_instance.origin_rpc, 0); }
void assert_no_blockreader_logged() { ASSERT_EQ(log_state_instance.origin_blockreader, 0); }
void assert_no_filehandle_logged() { ASSERT_EQ(log_state_instance.origin_filehandle, 0); }
void assert_no_filesystem_logged() { ASSERT_EQ(log_state_instance.origin_filesystem, 0); }
void log_all_components_at_level(LogLevel lvl) {
if(lvl == kTrace) {
LOG_TRACE(kUnknown, << 'a');
LOG_TRACE(kRPC, << 'b');
LOG_TRACE(kBlockReader, << 'c');
LOG_TRACE(kFileHandle, << 'd');
LOG_TRACE(kFileSystem, << 'e');
} else if (lvl == kDebug) {
LOG_DEBUG(kUnknown, << 'a');
LOG_DEBUG(kRPC, << 'b');
LOG_DEBUG(kBlockReader, << 'c');
LOG_DEBUG(kFileHandle, << 'd');
LOG_DEBUG(kFileSystem, << 'e');
} else if (lvl == kInfo) {
LOG_INFO(kUnknown, << 'a');
LOG_INFO(kRPC, << 'b');
LOG_INFO(kBlockReader, << 'c');
LOG_INFO(kFileHandle, << 'd');
LOG_INFO(kFileSystem, << 'e');
} else if (lvl == kWarning) {
LOG_WARN(kUnknown, << 'a');
LOG_WARN(kRPC, << 'b');
LOG_WARN(kBlockReader, << 'c');
LOG_WARN(kFileHandle, << 'd');
LOG_WARN(kFileSystem, << 'e');
} else if (lvl == kError) {
LOG_ERROR(kUnknown, << 'a');
LOG_ERROR(kRPC, << 'b');
LOG_ERROR(kBlockReader, << 'c');
LOG_ERROR(kFileHandle, << 'd');
LOG_ERROR(kFileSystem, << 'e');
} else {
// A level was added and not accounted for here
ASSERT_TRUE(false);
}
}
// make sure everything can be masked
TEST(LoggingTest, MaskAll) {
LogManager::DisableLogForComponent(kUnknown);
LogManager::DisableLogForComponent(kRPC);
LogManager::DisableLogForComponent(kBlockReader);
LogManager::DisableLogForComponent(kFileHandle);
LogManager::DisableLogForComponent(kFileSystem);
// use trace so anything that isn't masked should come through
LogManager::SetLogLevel(kTrace);
log_state_instance.reset();
log_all_components_at_level(kError);
assert_nothing_logged();
log_state_instance.reset();
}
// make sure components can be masked individually
TEST(LoggingTest, MaskOne) {
LogManager::DisableLogForComponent(kUnknown);
LogManager::DisableLogForComponent(kRPC);
LogManager::DisableLogForComponent(kBlockReader);
LogManager::DisableLogForComponent(kFileHandle);
LogManager::DisableLogForComponent(kFileSystem);
LogManager::SetLogLevel(kTrace);
// Unknown - aka component not provided
LogManager::EnableLogForComponent(kUnknown);
log_all_components_at_level(kError);
assert_unknown_logged();
assert_error_logged();
assert_no_rpc_logged();
assert_no_blockreader_logged();
assert_no_filehandle_logged();
assert_no_filesystem_logged();
log_state_instance.reset();
LogManager::DisableLogForComponent(kUnknown);
// RPC
LogManager::EnableLogForComponent(kRPC);
log_all_components_at_level(kError);
assert_rpc_logged();
assert_error_logged();
assert_no_unknown_logged();
assert_no_blockreader_logged();
assert_no_filehandle_logged();
assert_no_filesystem_logged();
log_state_instance.reset();
LogManager::DisableLogForComponent(kRPC);
// BlockReader
LogManager::EnableLogForComponent(kBlockReader);
log_all_components_at_level(kError);
assert_blockreader_logged();
assert_error_logged();
assert_no_unknown_logged();
assert_no_rpc_logged();
assert_no_filehandle_logged();
assert_no_filesystem_logged();
log_state_instance.reset();
LogManager::DisableLogForComponent(kBlockReader);
// FileHandle
LogManager::EnableLogForComponent(kFileHandle);
log_all_components_at_level(kError);
assert_filehandle_logged();
assert_error_logged();
assert_no_unknown_logged();
assert_no_rpc_logged();
assert_no_blockreader_logged();
assert_no_filesystem_logged();
log_state_instance.reset();
LogManager::DisableLogForComponent(kFileHandle);
// FileSystem
LogManager::EnableLogForComponent(kFileSystem);
log_all_components_at_level(kError);
assert_filesystem_logged();
assert_error_logged();
assert_no_unknown_logged();
assert_no_rpc_logged();
assert_no_blockreader_logged();
assert_no_filehandle_logged();
log_state_instance.reset();
LogManager::DisableLogForComponent(kFileSystem);
}
TEST(LoggingTest, Levels) {
// should be safe to focus on one component if MaskOne passes
LogManager::EnableLogForComponent(kUnknown);
LogManager::SetLogLevel(kError);
LOG_TRACE(kUnknown, << "a");
LOG_DEBUG(kUnknown, << "b");
LOG_INFO(kUnknown,<< "c");
LOG_WARN(kUnknown, << "d");
assert_nothing_logged();
LOG_ERROR(kUnknown, << "e");
assert_error_logged();
assert_unknown_logged();
log_state_instance.reset();
// anything >= warning
LogManager::SetLogLevel(kWarning);
LOG_TRACE(kUnknown, << "a");
LOG_DEBUG(kUnknown, << "b");
LOG_INFO(kUnknown, << "c");
assert_nothing_logged();
LOG_WARN(kUnknown, << "d");
assert_warning_logged();
LOG_ERROR(kUnknown, << "e");
assert_error_logged();
log_state_instance.reset();
// anything >= info
LogManager::SetLogLevel(kInfo);
LOG_TRACE(kUnknown, << "a");
LOG_DEBUG(kUnknown, << "b");
assert_nothing_logged();
LOG_INFO(kUnknown, << "c");
assert_info_logged();
LOG_WARN(kUnknown, << "d");
assert_warning_logged();
LOG_ERROR(kUnknown, << "e");
assert_error_logged();
log_state_instance.reset();
// anything >= debug
LogManager::SetLogLevel(kDebug);
LOG_TRACE(kUnknown, << "a");
assert_nothing_logged();
LOG_DEBUG(kUnknown, << "b");
assert_debug_logged();
assert_no_info_logged();
assert_no_warning_logged();
assert_no_error_logged();
LOG_INFO(kUnknown, << "c");
assert_info_logged();
assert_no_warning_logged();
assert_no_error_logged();
LOG_WARN(kUnknown, << "d");
assert_warning_logged();
assert_no_error_logged();
LOG_ERROR(kUnknown, << "e");
assert_error_logged();
log_state_instance.reset();
// anything
LogManager::SetLogLevel(kTrace);
assert_nothing_logged();
LOG_TRACE(kUnknown, << "a");
assert_trace_logged();
log_state_instance.reset();
LOG_DEBUG(kUnknown, << "b");
assert_debug_logged();
log_state_instance.reset();
LOG_INFO(kUnknown, << "c");
assert_info_logged();
log_state_instance.reset();
LOG_WARN(kUnknown, << "d");
assert_warning_logged();
log_state_instance.reset();
LOG_ERROR(kUnknown, << "e");
assert_error_logged();
}
TEST(LoggingTest, Text) {
LogManager::EnableLogForComponent(kRPC);
std::string text;
LOG_ERROR(kRPC, << text);
ASSERT_EQ(text, log_state_instance.msg);
}
int main(int argc, char *argv[]) {
CForwardingLogger *logger = new CForwardingLogger();
logger->SetCallback(process_log_msg);
LogManager::SetLoggerImplementation(std::unique_ptr<LoggerInterface>(logger));
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.
::testing::InitGoogleMock(&argc, argv);
int res = RUN_ALL_TESTS();
google::protobuf::ShutdownProtobufLibrary();
return res;
}