HDFS-11294: libhdfs++: Segfault in HA failover if DNS lookup for both Namenodes fails. Contributed by James Clampffer.
This commit is contained in:
parent
12942f679a
commit
48db24a430
|
@ -37,17 +37,23 @@ static constexpr const char * FS_NN_CONNECT_EVENT = "NN::connect";
|
||||||
static constexpr const char * FS_NN_READ_EVENT = "NN::read";
|
static constexpr const char * FS_NN_READ_EVENT = "NN::read";
|
||||||
static constexpr const char * FS_NN_WRITE_EVENT = "NN::write";
|
static constexpr const char * FS_NN_WRITE_EVENT = "NN::write";
|
||||||
|
|
||||||
|
static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect";
|
||||||
|
static constexpr const char * FILE_DN_READ_EVENT = "DN::read";
|
||||||
|
static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";
|
||||||
|
|
||||||
|
|
||||||
// NN failover event due to issues with the current NN; might be standby, might be dead.
|
// NN failover event due to issues with the current NN; might be standby, might be dead.
|
||||||
// Invokes the fs_event_callback using the nameservice name in the cluster string.
|
// Invokes the fs_event_callback using the nameservice name in the cluster string.
|
||||||
// The uint64_t value argument holds an address that can be reinterpreted as a const char *
|
// The uint64_t value argument holds an address that can be reinterpreted as a const char *
|
||||||
// and provides the full URI of the node the failover will attempt to connect to next.
|
// and provides the full URI of the node the failover will attempt to connect to next.
|
||||||
static constexpr const char * FS_NN_FAILOVER_EVENT = "NN::failover";
|
static constexpr const char * FS_NN_FAILOVER_EVENT = "NN::failover";
|
||||||
|
|
||||||
static constexpr const char * FILE_DN_CONNECT_EVENT = "DN::connect";
|
// Invoked when RpcConnection tries to use an empty set of endpoints to figure out
|
||||||
static constexpr const char * FILE_DN_READ_EVENT = "DN::read";
|
// which NN in a HA cluster to connect to.
|
||||||
static constexpr const char * FILE_DN_WRITE_EVENT = "DN::write";
|
static constexpr const char * FS_NN_EMPTY_ENDPOINTS_EVENT = "NN::bad_failover::no_endpoints";
|
||||||
|
|
||||||
|
|
||||||
|
// Invoked prior to determining if failed NN rpc calls should be retried or discarded.
|
||||||
|
static constexpr const char * FS_NN_PRE_RPC_RETRY_EVENT = "NN::rpc::get_retry_action";
|
||||||
|
|
||||||
class event_response {
|
class event_response {
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -41,6 +41,12 @@ using namespace std::placeholders;
|
||||||
|
|
||||||
static constexpr tPort kDefaultPort = 8020;
|
static constexpr tPort kDefaultPort = 8020;
|
||||||
|
|
||||||
|
/** Annotate what parts of the code below are implementatons of API functions
|
||||||
|
* and if they are normal vs. extended API.
|
||||||
|
*/
|
||||||
|
#define LIBHDFS_C_API
|
||||||
|
#define LIBHDFSPP_EXT_API
|
||||||
|
|
||||||
/* Separate the handles used by the C api from the C++ API*/
|
/* Separate the handles used by the C api from the C++ API*/
|
||||||
struct hdfs_internal {
|
struct hdfs_internal {
|
||||||
hdfs_internal(FileSystem *p) : filesystem_(p), working_directory_("/") {}
|
hdfs_internal(FileSystem *p) : filesystem_(p), working_directory_("/") {}
|
||||||
|
@ -79,6 +85,7 @@ struct hdfsFile_internal {
|
||||||
thread_local std::string errstr;
|
thread_local std::string errstr;
|
||||||
|
|
||||||
/* Fetch last error that happened in this thread */
|
/* Fetch last error that happened in this thread */
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsGetLastError(char *buf, int len) {
|
int hdfsGetLastError(char *buf, int len) {
|
||||||
//No error message
|
//No error message
|
||||||
if(errstr.empty()){
|
if(errstr.empty()){
|
||||||
|
@ -255,6 +262,7 @@ optional<std::string> getAbsolutePath(hdfsFS fs, const char* path) {
|
||||||
* C API implementations
|
* C API implementations
|
||||||
**/
|
**/
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsFileIsOpenForRead(hdfsFile file) {
|
int hdfsFileIsOpenForRead(hdfsFile file) {
|
||||||
/* files can only be open for reads at the moment, do a quick check */
|
/* files can only be open for reads at the moment, do a quick check */
|
||||||
if (!CheckHandle(file)){
|
if (!CheckHandle(file)){
|
||||||
|
@ -263,6 +271,7 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
|
||||||
return 1; // Update implementation when we get file writing
|
return 1; // Update implementation when we get file writing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsFileIsOpenForWrite(hdfsFile file) {
|
int hdfsFileIsOpenForWrite(hdfsFile file) {
|
||||||
/* files can only be open for reads at the moment, so return false */
|
/* files can only be open for reads at the moment, so return false */
|
||||||
CheckHandle(file);
|
CheckHandle(file);
|
||||||
|
@ -332,6 +341,7 @@ hdfsFS doHdfsConnect(optional<std::string> nn, optional<tPort> port, optional<st
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
|
hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
|
||||||
// Same idea as the first half of doHdfsConnect, but return the wrapped FS before
|
// Same idea as the first half of doHdfsConnect, but return the wrapped FS before
|
||||||
// connecting.
|
// connecting.
|
||||||
|
@ -367,6 +377,7 @@ hdfsFS hdfsAllocateFileSystem(struct hdfsBuilder *bld) {
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
|
int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
|
||||||
if(!CheckSystem(fs)) {
|
if(!CheckSystem(fs)) {
|
||||||
return ENODEV;
|
return ENODEV;
|
||||||
|
@ -420,24 +431,29 @@ int hdfsConnectAllocated(hdfsFS fs, struct hdfsBuilder *bld) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFS hdfsConnect(const char *nn, tPort port) {
|
hdfsFS hdfsConnect(const char *nn, tPort port) {
|
||||||
return hdfsConnectAsUser(nn, port, "");
|
return hdfsConnectAsUser(nn, port, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
|
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
|
||||||
return doHdfsConnect(std::string(nn), port, std::string(user), Options());
|
return doHdfsConnect(std::string(nn), port, std::string(user), Options());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) {
|
hdfsFS hdfsConnectAsUserNewInstance(const char* nn, tPort port, const char *user ) {
|
||||||
//libhdfspp always returns a new instance
|
//libhdfspp always returns a new instance
|
||||||
return doHdfsConnect(std::string(nn), port, std::string(user), Options());
|
return doHdfsConnect(std::string(nn), port, std::string(user), Options());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
|
hdfsFS hdfsConnectNewInstance(const char* nn, tPort port) {
|
||||||
//libhdfspp always returns a new instance
|
//libhdfspp always returns a new instance
|
||||||
return hdfsConnectAsUser(nn, port, "");
|
return hdfsConnectAsUser(nn, port, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsCancelPendingConnection(hdfsFS fs) {
|
int hdfsCancelPendingConnection(hdfsFS fs) {
|
||||||
// todo: stick an enum in hdfs_internal to check the connect state
|
// todo: stick an enum in hdfs_internal to check the connect state
|
||||||
if(!CheckSystem(fs)) {
|
if(!CheckSystem(fs)) {
|
||||||
|
@ -458,6 +474,7 @@ int hdfsCancelPendingConnection(hdfsFS fs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsDisconnect(hdfsFS fs) {
|
int hdfsDisconnect(hdfsFS fs) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -476,6 +493,7 @@ int hdfsDisconnect(hdfsFS fs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
|
hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
|
||||||
short replication, tSize blocksize) {
|
short replication, tSize blocksize) {
|
||||||
try
|
try
|
||||||
|
@ -512,6 +530,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
|
int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -528,6 +547,7 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
|
char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -556,6 +576,7 @@ char* hdfsGetWorkingDirectory(hdfsFS fs, char *buffer, size_t bufferSize) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
|
int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -582,6 +603,7 @@ int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsAvailable(hdfsFS fs, hdfsFile file) {
|
int hdfsAvailable(hdfsFS fs, hdfsFile file) {
|
||||||
//Since we do not have read ahead implemented, return 0 if fs and file are good;
|
//Since we do not have read ahead implemented, return 0 if fs and file are good;
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -591,6 +613,7 @@ int hdfsAvailable(hdfsFS fs, hdfsFile file) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
|
tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -604,6 +627,7 @@ tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
|
tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -633,6 +657,7 @@ tOffset hdfsGetDefaultBlockSizeAtPath(hdfsFS fs, const char *path) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
|
int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -659,6 +684,7 @@ int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
|
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -682,6 +708,7 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
tOffset hdfsGetCapacity(hdfsFS fs) {
|
tOffset hdfsGetCapacity(hdfsFS fs) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -705,6 +732,7 @@ tOffset hdfsGetCapacity(hdfsFS fs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
tOffset hdfsGetUsed(hdfsFS fs) {
|
tOffset hdfsGetUsed(hdfsFS fs) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -777,6 +805,7 @@ void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
|
||||||
file_info->mLastAccess = stat_info.access_time;
|
file_info->mLastAccess = stat_info.access_time;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsExists(hdfsFS fs, const char *path) {
|
int hdfsExists(hdfsFS fs, const char *path) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -800,6 +829,7 @@ int hdfsExists(hdfsFS fs, const char *path) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
|
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -828,6 +858,7 @@ hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
|
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -868,6 +899,7 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
|
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
|
||||||
{
|
{
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -880,6 +912,7 @@ void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
|
||||||
delete[] hdfsFileInfo;
|
delete[] hdfsFileInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsCreateDirectory(hdfsFS fs, const char* path) {
|
int hdfsCreateDirectory(hdfsFS fs, const char* path) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -904,6 +937,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
|
int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -927,6 +961,7 @@ int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
|
int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -951,6 +986,7 @@ int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsChmod(hdfsFS fs, const char* path, short mode){
|
int hdfsChmod(hdfsFS fs, const char* path, short mode){
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -977,6 +1013,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group){
|
int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group){
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -1003,6 +1040,7 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){
|
hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -1041,6 +1079,7 @@ hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
|
int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -1068,6 +1107,7 @@ int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
|
int hdfsDeleteSnapshot(hdfsFS fs, const char* path, const char* name) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -1125,6 +1165,7 @@ int hdfsRenameSnapshot(hdfsFS fs, const char* path, const char* old_name, const
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
|
int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -1148,6 +1189,7 @@ int hdfsAllowSnapshot(hdfsFS fs, const char* path) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
|
int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
|
||||||
try {
|
try {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -1171,6 +1213,7 @@ int hdfsDisallowSnapshot(hdfsFS fs, const char* path) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
|
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
|
||||||
tSize length) {
|
tSize length) {
|
||||||
try
|
try
|
||||||
|
@ -1193,6 +1236,7 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
|
tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -1215,12 +1259,14 @@ tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsUnbufferFile(hdfsFile file) {
|
int hdfsUnbufferFile(hdfsFile file) {
|
||||||
//Currently we are not doing any buffering
|
//Currently we are not doing any buffering
|
||||||
CheckHandle(file);
|
CheckHandle(file);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) {
|
int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -1239,6 +1285,7 @@ int hdfsFileGetReadStatistics(hdfsFile file, struct hdfsReadStatistics **stats)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsFileClearReadStatistics(hdfsFile file) {
|
int hdfsFileClearReadStatistics(hdfsFile file) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -1255,16 +1302,19 @@ int hdfsFileClearReadStatistics(hdfsFile file) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) {
|
int64_t hdfsReadStatisticsGetRemoteBytesRead(const struct hdfsReadStatistics *stats) {
|
||||||
return stats->totalBytesRead - stats->totalLocalBytesRead;
|
return stats->totalBytesRead - stats->totalLocalBytesRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
|
void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
delete stats;
|
delete stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 0 on success, -1 on error*/
|
/* 0 on success, -1 on error*/
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
|
int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -1287,6 +1337,7 @@ int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
|
tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -1326,7 +1377,7 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out)
|
int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -1402,6 +1453,7 @@ int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
|
int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if (blockLocations == nullptr)
|
if (blockLocations == nullptr)
|
||||||
|
@ -1422,6 +1474,7 @@ int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) {
|
char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) {
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
@ -1462,6 +1515,7 @@ char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsFreeHosts(char ***blockHosts) {
|
void hdfsFreeHosts(char ***blockHosts) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if (blockHosts == nullptr)
|
if (blockHosts == nullptr)
|
||||||
|
@ -1526,6 +1580,7 @@ event_response file_callback_glue(libhdfspp_file_event_callback handler,
|
||||||
return event_response::make_ok();
|
return event_response::make_ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
|
int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
|
||||||
{
|
{
|
||||||
fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3);
|
fs_event_callback callback = std::bind(fs_callback_glue, handler, cookie, _1, _2, _3);
|
||||||
|
@ -1533,7 +1588,7 @@ int hdfsPreAttachFSMonitor(libhdfspp_fs_event_callback handler, int64_t cookie)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie)
|
int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie)
|
||||||
{
|
{
|
||||||
file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4);
|
file_event_callback callback = std::bind(file_callback_glue, handler, cookie, _1, _2, _3, _4);
|
||||||
|
@ -1572,6 +1627,7 @@ hdfsBuilder::hdfsBuilder(const char * directory) :
|
||||||
config = LoadDefault(loader);
|
config = LoadDefault(loader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
struct hdfsBuilder *hdfsNewBuilder(void)
|
struct hdfsBuilder *hdfsNewBuilder(void)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -1587,18 +1643,21 @@ struct hdfsBuilder *hdfsNewBuilder(void)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
|
void hdfsBuilderSetNameNode(struct hdfsBuilder *bld, const char *nn)
|
||||||
{
|
{
|
||||||
errno = 0;
|
errno = 0;
|
||||||
bld->overrideHost = std::string(nn);
|
bld->overrideHost = std::string(nn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
|
void hdfsBuilderSetNameNodePort(struct hdfsBuilder *bld, tPort port)
|
||||||
{
|
{
|
||||||
errno = 0;
|
errno = 0;
|
||||||
bld->overridePort = port;
|
bld->overridePort = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
|
void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
|
||||||
{
|
{
|
||||||
errno = 0;
|
errno = 0;
|
||||||
|
@ -1607,12 +1666,14 @@ void hdfsBuilderSetUserName(struct hdfsBuilder *bld, const char *userName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
|
void hdfsBuilderSetForceNewInstance(struct hdfsBuilder *bld) {
|
||||||
//libhdfspp always returns a new instance, so nothing to do
|
//libhdfspp always returns a new instance, so nothing to do
|
||||||
(void)bld;
|
(void)bld;
|
||||||
errno = 0;
|
errno = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -1626,6 +1687,7 @@ void hdfsFreeBuilder(struct hdfsBuilder *bld)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
||||||
const char *val)
|
const char *val)
|
||||||
{
|
{
|
||||||
|
@ -1650,16 +1712,22 @@ int hdfsBuilderConfSetStr(struct hdfsBuilder *bld, const char *key,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
void hdfsConfStrFree(char *val)
|
void hdfsConfStrFree(char *val)
|
||||||
{
|
{
|
||||||
errno = 0;
|
errno = 0;
|
||||||
free(val);
|
free(val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
hdfsFS hdfsBuilderConnect(struct hdfsBuilder *bld) {
|
||||||
return doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions());
|
hdfsFS fs = doHdfsConnect(bld->overrideHost, bld->overridePort, bld->user, bld->config.GetOptions());
|
||||||
|
// Always free the builder
|
||||||
|
hdfsFreeBuilder(bld);
|
||||||
|
return fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsConfGetStr(const char *key, char **val)
|
int hdfsConfGetStr(const char *key, char **val)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -1674,6 +1742,7 @@ int hdfsConfGetStr(const char *key, char **val)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFS_C_API
|
||||||
int hdfsConfGetInt(const char *key, int32_t *val)
|
int hdfsConfGetInt(const char *key, int32_t *val)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -1706,6 +1775,7 @@ struct hdfsBuilder *hdfsNewBuilderFromDirectory(const char * configDirectory)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
|
int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key,
|
||||||
char **val)
|
char **val)
|
||||||
{
|
{
|
||||||
|
@ -1739,6 +1809,7 @@ bool isValidInt(int64_t value)
|
||||||
value <= std::numeric_limits<int>::max());
|
value <= std::numeric_limits<int>::max());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
|
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -1765,6 +1836,7 @@ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val)
|
int hdfsBuilderConfGetLong(struct hdfsBuilder *bld, const char *key, int64_t *val)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -1859,15 +1931,17 @@ void CForwardingLogger::FreeLogData(LogData *data) {
|
||||||
free(data);
|
free(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
LogData *hdfsCopyLogData(LogData *data) {
|
LogData *hdfsCopyLogData(LogData *data) {
|
||||||
return CForwardingLogger::CopyLogData(data);
|
return CForwardingLogger::CopyLogData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
void hdfsFreeLogData(LogData *data) {
|
void hdfsFreeLogData(LogData *data) {
|
||||||
CForwardingLogger::FreeLogData(data);
|
CForwardingLogger::FreeLogData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
void hdfsSetLogFunction(void (*callback)(LogData*)) {
|
void hdfsSetLogFunction(void (*callback)(LogData*)) {
|
||||||
CForwardingLogger *logger = new CForwardingLogger();
|
CForwardingLogger *logger = new CForwardingLogger();
|
||||||
logger->SetCallback(callback);
|
logger->SetCallback(callback);
|
||||||
|
@ -1900,6 +1974,7 @@ static bool IsComponentValid(int component) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsEnableLoggingForComponent(int component) {
|
int hdfsEnableLoggingForComponent(int component) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if(!IsComponentValid(component))
|
if(!IsComponentValid(component))
|
||||||
|
@ -1908,6 +1983,7 @@ int hdfsEnableLoggingForComponent(int component) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsDisableLoggingForComponent(int component) {
|
int hdfsDisableLoggingForComponent(int component) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if(!IsComponentValid(component))
|
if(!IsComponentValid(component))
|
||||||
|
@ -1916,6 +1992,7 @@ int hdfsDisableLoggingForComponent(int component) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LIBHDFSPP_EXT_API
|
||||||
int hdfsSetLoggingLevel(int level) {
|
int hdfsSetLoggingLevel(int level) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
if(!IsLevelValid(level))
|
if(!IsLevelValid(level))
|
||||||
|
@ -1923,3 +2000,8 @@ int hdfsSetLoggingLevel(int level) {
|
||||||
LogManager::SetLogLevel(static_cast<LogLevel>(level));
|
LogManager::SetLogLevel(static_cast<LogLevel>(level));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#undef LIBHDFS_C_API
|
||||||
|
#undef LIBHDFSPP_EXT_API
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -52,8 +52,10 @@ HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &se
|
||||||
|
|
||||||
active_info_ = servers[0];
|
active_info_ = servers[0];
|
||||||
standby_info_ = servers[1];
|
standby_info_ = servers[1];
|
||||||
LOG_INFO(kRPC, << "Active namenode url = " << active_info_.uri.str());
|
LOG_INFO(kRPC, << "HA enabled. Using the following namenodes from the configuration."
|
||||||
LOG_INFO(kRPC, << "Standby namenode url = " << standby_info_.uri.str());
|
<< "\nNote: Active namenode cannot be determined until a connection has been made.")
|
||||||
|
LOG_INFO(kRPC, << "First namenode url = " << active_info_.uri.str());
|
||||||
|
LOG_INFO(kRPC, << "Second namenode url = " << standby_info_.uri.str());
|
||||||
|
|
||||||
enabled_ = true;
|
enabled_ = true;
|
||||||
if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
|
if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
|
||||||
|
@ -64,51 +66,57 @@ HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &se
|
||||||
|
|
||||||
HANamenodeTracker::~HANamenodeTracker() {}
|
HANamenodeTracker::~HANamenodeTracker() {}
|
||||||
|
|
||||||
// Pass in endpoint from current connection, this will do a reverse lookup
|
bool HANamenodeTracker::GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints,
|
||||||
// and return the info for the standby node. It will also swap its state internally.
|
ResolvedNamenodeInfo& out)
|
||||||
ResolvedNamenodeInfo HANamenodeTracker::GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint) {
|
{
|
||||||
LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoint);
|
|
||||||
mutex_guard swap_lock(swap_lock_);
|
mutex_guard swap_lock(swap_lock_);
|
||||||
|
|
||||||
ResolvedNamenodeInfo failover_node;
|
// Cannot look up without a key.
|
||||||
|
if(current_endpoints.size() == 0) {
|
||||||
|
event_handlers_->call(FS_NN_EMPTY_ENDPOINTS_EVENT, active_info_.nameservice.c_str(),
|
||||||
|
0 /*Not much to say about context without endpoints*/);
|
||||||
|
LOG_ERROR(kRPC, << "HANamenodeTracker@" << this << "::GetFailoverAndUpdate requires at least 1 endpoint.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// Connected to standby, switch standby to active
|
LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoints[0]);
|
||||||
if(IsCurrentActive_locked(current_endpoint)) {
|
|
||||||
|
if(IsCurrentActive_locked(current_endpoints[0])) {
|
||||||
std::swap(active_info_, standby_info_);
|
std::swap(active_info_, standby_info_);
|
||||||
if(event_handlers_)
|
if(event_handlers_)
|
||||||
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
||||||
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
||||||
failover_node = active_info_;
|
out = active_info_;
|
||||||
} else if(IsCurrentStandby_locked(current_endpoint)) {
|
} else if(IsCurrentStandby_locked(current_endpoints[0])) {
|
||||||
// Connected to standby
|
// Connected to standby
|
||||||
if(event_handlers_)
|
if(event_handlers_)
|
||||||
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
|
||||||
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
|
||||||
failover_node = active_info_;
|
out = active_info_;
|
||||||
} else {
|
} else {
|
||||||
// Invalid state, throw for testing
|
// Invalid state (or a NIC was added that didn't show up during DNS)
|
||||||
std::string ep1 = format_endpoints(active_info_.endpoints);
|
std::stringstream errorMsg; // asio specializes endpoing operator<< for stringstream
|
||||||
std::string ep2 = format_endpoints(standby_info_.endpoints);
|
errorMsg << "Unable to find RPC connection in config. Looked for " << current_endpoints[0] << " in\n"
|
||||||
|
<< format_endpoints(active_info_.endpoints) << " and\n"
|
||||||
std::stringstream msg;
|
<< format_endpoints(standby_info_.endpoints) << std::endl;
|
||||||
msg << "Looked for " << current_endpoint << " in\n";
|
LOG_ERROR(kRPC, << errorMsg.str());
|
||||||
msg << ep1 << " and\n";
|
return false;
|
||||||
msg << ep2 << std::endl;
|
|
||||||
|
|
||||||
LOG_ERROR(kRPC, << "Unable to find RPC connection in config " << msg.str() << ". Bailing out.");
|
|
||||||
throw std::runtime_error(msg.str());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(failover_node.endpoints.empty()) {
|
// Extra DNS on swapped node to try and get EPs if it didn't already have them
|
||||||
LOG_WARN(kRPC, << "No endpoints for node " << failover_node.uri.str() << " attempting to resolve again");
|
if(out.endpoints.empty()) {
|
||||||
if(!ResolveInPlace(ioservice_, failover_node)) {
|
LOG_WARN(kRPC, << "No endpoints for node " << out.uri.str() << " attempting to resolve again");
|
||||||
LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << failover_node.uri.str()
|
if(!ResolveInPlace(ioservice_, out)) {
|
||||||
<< "failed. Please make sure your configuration is up to date.");
|
// Stuck retrying against the same NN that was able to be resolved in this case
|
||||||
|
LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << out.uri.str()
|
||||||
|
<< " failed. Please make sure your configuration is up to date.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return failover_node;
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const {
|
bool HANamenodeTracker::IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const {
|
||||||
for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
|
for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
|
||||||
if(ep.address() == active_info_.endpoints[i].address()) {
|
if(ep.address() == active_info_.endpoints[i].address()) {
|
||||||
|
|
|
@ -48,15 +48,18 @@ class HANamenodeTracker {
|
||||||
bool is_enabled() const { return enabled_; }
|
bool is_enabled() const { return enabled_; }
|
||||||
bool is_resolved() const { return resolved_; }
|
bool is_resolved() const { return resolved_; }
|
||||||
|
|
||||||
// Get node opposite of the current one if possible (swaps active/standby)
|
// Pass in vector of endpoints held by RpcConnection, use endpoints to infer node
|
||||||
|
// currently being used. Swap internal state and set out to other node.
|
||||||
// Note: This will always mutate internal state. Use IsCurrentActive/Standby to
|
// Note: This will always mutate internal state. Use IsCurrentActive/Standby to
|
||||||
// get info without changing state
|
// get info without changing state
|
||||||
ResolvedNamenodeInfo GetFailoverAndUpdate(::asio::ip::tcp::endpoint current_endpoint);
|
bool GetFailoverAndUpdate(const std::vector<::asio::ip::tcp::endpoint>& current_endpoints,
|
||||||
|
ResolvedNamenodeInfo& out);
|
||||||
|
|
||||||
|
private:
|
||||||
|
// See if endpoint ep is part of the list of endpoints for the active or standby NN
|
||||||
bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
bool IsCurrentActive_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
||||||
bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
bool IsCurrentStandby_locked(const ::asio::ip::tcp::endpoint &ep) const;
|
||||||
|
|
||||||
private:
|
|
||||||
// If HA should be enabled, according to our options and runtime info like # nodes provided
|
// If HA should be enabled, according to our options and runtime info like # nodes provided
|
||||||
bool enabled_;
|
bool enabled_;
|
||||||
// If we were able to resolve at least 1 HA namenode
|
// If we were able to resolve at least 1 HA namenode
|
||||||
|
|
|
@ -213,6 +213,11 @@ void RpcEngine::RpcCommsError(
|
||||||
optional<RetryAction> head_action = optional<RetryAction>();
|
optional<RetryAction> head_action = optional<RetryAction>();
|
||||||
|
|
||||||
// Filter out anything with too many retries already
|
// Filter out anything with too many retries already
|
||||||
|
if(event_handlers_) {
|
||||||
|
event_handlers_->call(FS_NN_PRE_RPC_RETRY_EVENT, "RpcCommsError",
|
||||||
|
reinterpret_cast<int64_t>(this));
|
||||||
|
}
|
||||||
|
|
||||||
for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
|
for (auto it = pendingRequests.begin(); it < pendingRequests.end();) {
|
||||||
auto req = *it;
|
auto req = *it;
|
||||||
|
|
||||||
|
@ -261,15 +266,34 @@ void RpcEngine::RpcCommsError(
|
||||||
// If HA is enabled and we have valid HA info then fail over to the standby (hopefully now active)
|
// If HA is enabled and we have valid HA info then fail over to the standby (hopefully now active)
|
||||||
if(head_action->action == RetryAction::FAILOVER_AND_RETRY && ha_persisted_info_) {
|
if(head_action->action == RetryAction::FAILOVER_AND_RETRY && ha_persisted_info_) {
|
||||||
|
|
||||||
for(unsigned int i=0; i<pendingRequests.size();i++)
|
for(unsigned int i=0; i<pendingRequests.size();i++) {
|
||||||
pendingRequests[i]->IncrementFailoverCount();
|
pendingRequests[i]->IncrementFailoverCount();
|
||||||
|
}
|
||||||
|
|
||||||
ResolvedNamenodeInfo new_active_nn_info =
|
ResolvedNamenodeInfo new_active_nn_info;
|
||||||
ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_[0]/*reverse lookup*/);
|
bool failoverInfoFound = ha_persisted_info_->GetFailoverAndUpdate(last_endpoints_, new_active_nn_info);
|
||||||
|
if(!failoverInfoFound) {
|
||||||
|
// This shouldn't be a common case, the set of endpoints was empty, likely due to DNS issues.
|
||||||
|
// Another possibility is a network device has been added or removed due to a VM starting or stopping.
|
||||||
|
|
||||||
LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " << new_active_nn_info.uri.str());
|
LOG_ERROR(kRPC, << "Failed to find endpoints for the alternate namenode."
|
||||||
|
<< "Make sure Namenode hostnames can be found with a DNS lookup.");
|
||||||
|
// Kill all pending RPC requests since there's nowhere for this to go
|
||||||
|
Status badEndpointStatus = Status::Error("No endpoints found for namenode");
|
||||||
|
|
||||||
|
for(unsigned int i=0; i<pendingRequests.size(); i++) {
|
||||||
|
std::shared_ptr<Request> sharedCurrentRequest = pendingRequests[i];
|
||||||
|
io_service().post([sharedCurrentRequest, badEndpointStatus]() {
|
||||||
|
sharedCurrentRequest->OnResponseArrived(nullptr, badEndpointStatus); // Never call back while holding a lock
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear request vector. This isn't a recoverable error.
|
||||||
|
pendingRequests.clear();
|
||||||
|
}
|
||||||
|
|
||||||
if(ha_persisted_info_->is_resolved()) {
|
if(ha_persisted_info_->is_resolved()) {
|
||||||
|
LOG_INFO(kRPC, << "Going to try connecting to alternate Namenode: " << new_active_nn_info.uri.str());
|
||||||
last_endpoints_ = new_active_nn_info.endpoints;
|
last_endpoints_ = new_active_nn_info.endpoints;
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail over. has info="
|
LOG_WARN(kRPC, << "It looks HA is turned on, but unable to fail over. has info="
|
||||||
|
|
|
@ -120,6 +120,12 @@ add_executable(user_lock_test user_lock_test.cc)
|
||||||
target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
|
target_link_libraries(user_lock_test fs gmock_main common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
|
||||||
add_memcheck_test(user_lock user_lock_test)
|
add_memcheck_test(user_lock user_lock_test)
|
||||||
|
|
||||||
|
add_executable(hdfs_config_connect_bugs_test hdfs_config_connect_bugs.cc)
|
||||||
|
target_link_libraries(hdfs_config_connect_bugs_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
|
||||||
|
add_memcheck_test(hdfs_config_connect_bugs hdfs_config_connect_bugs_test)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
# INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS
|
# INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS
|
||||||
|
@ -136,7 +142,7 @@ include_directories (
|
||||||
|
|
||||||
add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
|
add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
|
||||||
|
|
||||||
# TODO: get all of the mini dfs library bits here in one plase
|
# TODO: get all of the mini dfs library bits here in one place
|
||||||
# add_library(hdfspp_mini_cluster native_mini_dfs ${JAVA_JVM_LIBRARY} )
|
# add_library(hdfspp_mini_cluster native_mini_dfs ${JAVA_JVM_LIBRARY} )
|
||||||
|
|
||||||
#TODO: Link against full library rather than just parts
|
#TODO: Link against full library rather than just parts
|
||||||
|
@ -157,4 +163,8 @@ build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hd
|
||||||
link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
||||||
add_libhdfs_test (hdfs_ext hdfspp_test_shim_static)
|
add_libhdfs_test (hdfs_ext hdfspp_test_shim_static)
|
||||||
|
|
||||||
|
#build_libhdfs_test(hdfs_config_connect_bugs hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_config_connect_bugs.cc)
|
||||||
|
#link_libhdfs_test (hdfs_config_connect_bugs hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
|
||||||
|
#add_libhdfs_test (hdfs_config_connect_bugs hdfspp_test_shim_static)
|
||||||
|
|
||||||
endif(HADOOP_BUILD)
|
endif(HADOOP_BUILD)
|
||||||
|
|
|
@ -0,0 +1,136 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "hdfspp/hdfs_ext.h"
|
||||||
|
|
||||||
|
#include "configuration_test.h"
|
||||||
|
|
||||||
|
#include <google/protobuf/stubs/common.h>
|
||||||
|
|
||||||
|
#include <cstring>
|
||||||
|
#include <chrono>
|
||||||
|
#include <exception>
|
||||||
|
|
||||||
|
|
||||||
|
static const char *hdfs_11294_core_site_txt =
|
||||||
|
"<configuration>\n"
|
||||||
|
" <property name=\"fs.defaultFS\" value=\"hdfs://NAMESERVICE1\"/>\n"
|
||||||
|
" <property name=\"hadoop.security.authentication\" value=\"simple\"/>\n"
|
||||||
|
" <property name=\"ipc.client.connect.retry.interval\" value=\"1\">\n"
|
||||||
|
"</configuration>\n";
|
||||||
|
|
||||||
|
static const char *hdfs_11294_hdfs_site_txt =
|
||||||
|
"<configuration>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.nameservices</name>\n"
|
||||||
|
" <value>NAMESERVICE1</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.ha.namenodes.NAMESERVICE1</name>\n"
|
||||||
|
" <value>nn1, nn2</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.namenode.rpc-address.NAMESERVICE1.nn1</name>\n"
|
||||||
|
" <value>nonesuch1.apache.org:8020</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.namenode.servicerpc-address.NAMESERVICE1.nn1</name>\n"
|
||||||
|
" <value>nonesuch1.apache.org:8040</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.namenode.http-address.NAMESERVICE1.nn1</name>\n"
|
||||||
|
" <value>nonesuch1.apache.org:50070</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.namenode.rpc-address.NAMESERVICE1.nn2</name>\n"
|
||||||
|
" <value>nonesuch2.apache.org:8020</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.namenode.servicerpc-address.NAMESERVICE1.nn2</name>\n"
|
||||||
|
" <value>nonesuch2.apache.org:8040</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
" <property>\n"
|
||||||
|
" <name>dfs.namenode.http-address.NAMESERVICE1.nn2</name>\n"
|
||||||
|
" <value>nonesuch2.apache.org:50070</value>\n"
|
||||||
|
" </property>\n"
|
||||||
|
"</configuration>\n";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
namespace hdfs {
|
||||||
|
|
||||||
|
// Make sure we can set up a mini-cluster and connect to it
|
||||||
|
TEST(ConfigConnectBugs, Test_HDFS_11294) {
|
||||||
|
// Directory for hdfs config
|
||||||
|
TempDir td;
|
||||||
|
|
||||||
|
const std::string& tempDirPath = td.path;
|
||||||
|
const std::string coreSitePath = tempDirPath + "/core-site.xml";
|
||||||
|
const std::string hdfsSitePath = tempDirPath + "/hdfs-site.xml";
|
||||||
|
|
||||||
|
// Write configs
|
||||||
|
FILE *coreSite = fopen(coreSitePath.c_str(), "w");
|
||||||
|
EXPECT_NE(coreSite, nullptr);
|
||||||
|
int coreSiteLength = strlen(hdfs_11294_core_site_txt);
|
||||||
|
size_t res = fwrite(hdfs_11294_core_site_txt, 1, coreSiteLength, coreSite);
|
||||||
|
EXPECT_EQ(res, coreSiteLength);
|
||||||
|
EXPECT_EQ(fclose(coreSite), 0);
|
||||||
|
|
||||||
|
FILE *hdfsSite = fopen(hdfsSitePath.c_str(), "w");
|
||||||
|
EXPECT_NE(hdfsSite, nullptr);
|
||||||
|
int hdfsSiteLength = strlen(hdfs_11294_hdfs_site_txt);
|
||||||
|
res = fwrite(hdfs_11294_hdfs_site_txt, 1, hdfsSiteLength, hdfsSite);
|
||||||
|
EXPECT_EQ(res, hdfsSiteLength);
|
||||||
|
EXPECT_EQ(fclose(hdfsSite), 0);
|
||||||
|
|
||||||
|
// Load configs with new FS
|
||||||
|
hdfsBuilder *bld = hdfsNewBuilderFromDirectory(tempDirPath.c_str());
|
||||||
|
hdfsBuilderSetNameNode(bld, "NAMESERVICE1");
|
||||||
|
|
||||||
|
// In HDFS-11294 connecting would crash because DNS couldn't resolve
|
||||||
|
// endpoints but the RpcEngine would attempt to dereference a non existant
|
||||||
|
// element in a std::vector and crash. Test passes if connect doesn't crash.
|
||||||
|
hdfsFS fileSystem = hdfsBuilderConnect(bld);
|
||||||
|
|
||||||
|
// FS shouldn't be created if it can't connect.
|
||||||
|
EXPECT_EQ(fileSystem, nullptr);
|
||||||
|
|
||||||
|
// Verify it got to endpoint check
|
||||||
|
char errMsgBuf[100];
|
||||||
|
memset(errMsgBuf, 0, 100);
|
||||||
|
EXPECT_EQ( hdfsGetLastError(errMsgBuf, 100), 0);
|
||||||
|
EXPECT_STREQ(errMsgBuf, "Exception:No endpoints found for namenode");
|
||||||
|
|
||||||
|
|
||||||
|
// remove config files
|
||||||
|
EXPECT_EQ(remove(coreSitePath.c_str()), 0);
|
||||||
|
EXPECT_EQ(remove(hdfsSitePath.c_str()), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end namespace hdfs
|
||||||
|
|
||||||
|
int main(int argc, char *argv[]) {
|
||||||
|
// The following line must be executed to initialize Google Mock
|
||||||
|
// (and Google Test) before running the tests.
|
||||||
|
::testing::InitGoogleMock(&argc, argv);
|
||||||
|
int exit_code = RUN_ALL_TESTS();
|
||||||
|
google::protobuf::ShutdownProtobufLibrary();
|
||||||
|
|
||||||
|
return exit_code;
|
||||||
|
}
|
|
@ -398,14 +398,20 @@ TEST(RpcEngineTest, TestEventCallbacks)
|
||||||
io_service.stop();
|
io_service.stop();
|
||||||
ASSERT_TRUE(stat.ok());
|
ASSERT_TRUE(stat.ok());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// If you're adding event hooks you'll most likely need to update this.
|
||||||
|
// It's a brittle test but makes it hard to miss control flow changes in RPC retry.
|
||||||
|
for(const auto& m : callbacks)
|
||||||
|
std::cerr << m << std::endl;
|
||||||
io_service.run();
|
io_service.run();
|
||||||
ASSERT_TRUE(complete);
|
ASSERT_TRUE(complete);
|
||||||
ASSERT_EQ(8, callbacks.size());
|
ASSERT_EQ(9, callbacks.size());
|
||||||
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
|
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[0]); // error
|
||||||
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[1]); // reconnect
|
ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[1]); // figure out retry decision
|
||||||
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[2]); // makes an error
|
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[2]); // reconnect
|
||||||
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[3]); // reconnect
|
ASSERT_EQ(FS_NN_PRE_RPC_RETRY_EVENT, callbacks[3]); // makes an error
|
||||||
for (int i=4; i < 7; i++)
|
ASSERT_EQ(FS_NN_CONNECT_EVENT, callbacks[4]); // reconnect
|
||||||
|
for (int i=5; i < 8; i++)
|
||||||
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]);
|
ASSERT_EQ(FS_NN_READ_EVENT, callbacks[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue