HDFS-10464: libhdfs++: Implement GetPathInfo and ListDirectory. Contributed by Anatoli Shein.

This commit is contained in:
Bob Hansen 2016-06-02 10:38:05 -04:00 committed by James Clampffer
parent 18f4d2f42e
commit 0496bc5464
9 changed files with 543 additions and 30 deletions

View File

@ -30,6 +30,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <limits.h>
#define TO_STR_HELPER(X) #X
#define TO_STR(X) TO_STR_HELPER(X)
@ -56,7 +57,7 @@ static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs
tPort port;
hdfsFS hdfs;
struct hdfsBuilder *bld;
port = (tPort)nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort "
@ -109,7 +110,7 @@ static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path)
return ret;
} else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) {
fprintf(stderr, "hdfsGetDefaultBlockSizeAtPath(%s) got "
"%"PRId64", but we expected %d\n",
"%"PRId64", but we expected %d\n",
path, blockSize, TLH_DEFAULT_BLOCK_SIZE);
return EIO;
}
@ -157,6 +158,12 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
EXPECT_ZERO(doTestGetDefaultBlockSize(fs, paths->prefix));
/* There is no such directory.
* Check that errno is set to ENOENT
*/
char invalid_path[] = "/some_invalid/path";
EXPECT_NULL_WITH_ERRNO(hdfsListDirectory(fs, invalid_path, &numEntries), ENOENT);
/* There should be no entry in the directory. */
errno = EACCES; // see if errno is set to 0 on success
EXPECT_NULL_WITH_ERRNO(hdfsListDirectory(fs, paths->prefix, &numEntries), 0);
@ -198,11 +205,30 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
EXPECT_ZERO(hdfsCloseFile(fs, file));
/* There should be 1 entry in the directory. */
EXPECT_NONNULL(hdfsListDirectory(fs, paths->prefix, &numEntries));
hdfsFileInfo * dirList = hdfsListDirectory(fs, paths->prefix, &numEntries);
EXPECT_NONNULL(dirList);
if (numEntries != 1) {
fprintf(stderr, "hdfsListDirectory set numEntries to "
"%d on directory containing 1 file.", numEntries);
}
hdfsFreeFileInfo(dirList, numEntries);
/* Create many files for ListDirectory to page through */
int nFile;
for (nFile = 0; nFile < 10000; nFile++) {
char filename[PATH_MAX];
snprintf(filename, PATH_MAX, "%s/many_files_%d", paths->prefix, nFile);
file = hdfsOpenFile(fs, filename, O_WRONLY, 0, 0, 0);
EXPECT_NONNULL(file);
EXPECT_ZERO(hdfsCloseFile(fs, file));
}
dirList = hdfsListDirectory(fs, paths->prefix, &numEntries);
EXPECT_NONNULL(dirList);
if (numEntries != 10002) {
fprintf(stderr, "hdfsListDirectory set numEntries to "
"%d on directory containing 10002 files.", numEntries);
}
hdfsFreeFileInfo(dirList, numEntries);
/* Let's re-open the file for reading */
file = hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0);
@ -274,7 +300,25 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs,
snprintf(tmp, sizeof(tmp), "%s/nonexistent-file-name", paths->prefix);
EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, tmp, "ha3", NULL), ENOENT);
return 0;
//Test case: File does not exist
EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, invalid_path), ENOENT);
// Test case: No permission to access parent directory
// Trying to set permissions of the parent directory to 0
// by a super user, and then connecting as SomeGuy. Should
// receive permission denied, but receives fileInfo.
// EXPECT_ZERO(hdfsChmod(fs, paths->prefix, 0));
// EXPECT_ZERO(hdfsChmod(fs, paths->file2, 0));
// EXPECT_ZERO(hdfsDisconnect(fs));
// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, "SomeGuy"));
// EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, paths->file2), EACCES);
// EXPECT_ZERO(hdfsDisconnect(fs));
// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
// if (!fs) {
// return 1;
// }
return 0;
}
static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
@ -329,7 +373,7 @@ static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads)
for (i = 0; i < tlhNumThreads; i++) {
if (ti[i].success != 0) {
fprintf(stderr, "%s%d", sep, i);
sep = ", ";
sep = ", ";
}
}
fprintf(stderr, "]. FAILURE.\n");

View File

@ -22,6 +22,8 @@
#include "hdfspp/status.h"
#include "hdfspp/events.h"
#include "hdfspp/block_location.h"
#include "hdfspp/statinfo.h"
#include <functional>
#include <memory>
@ -169,6 +171,33 @@ class FileSystem {
const std::function<void(const Status &, FileHandle *)> &handler) = 0;
virtual Status Open(const std::string &path, FileHandle **handle) = 0;
/**
* Returns metadata about the file if the file/directory exists.
**/
virtual void
GetFileInfo(const std::string &path,
const std::function<void(const Status &, const StatInfo &)> &handler) = 0;
virtual Status GetFileInfo(const std::string &path, StatInfo & stat_info) = 0;
/**
* Retrieves the files contained in a directory and returns the metadata
* for each of them.
*
* The asynchronous method will return batches of files; the consumer must
* return true if they want more files to be delivered. The final bool
* parameter in the callback will be set to true if this is the final
* batch of files.
*
* The synchronous method will return all files in the directory.
*
* Path must be an absolute path in the hdfs filesytem (e.g. /tmp/foo/bar)
**/
virtual void
GetListing(const std::string &path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) = 0;
virtual Status GetListing(const std::string &path,
std::shared_ptr<std::vector<StatInfo>> & stat_infos) = 0;
/**
* Returns the locations of all known blocks for the indicated file, or an error
* if the information clould not be found
@ -178,7 +207,6 @@ class FileSystem {
virtual Status GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * locations) = 0;
/**
* Note that it is an error to destroy the filesystem from within a filesystem
* callback. It will lead to a deadlock and the termination of the process.

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HDFSPP_STATINFO_H_
#define HDFSPP_STATINFO_H_
namespace hdfs {
/**
* Information that is assumed to be unchanging about a file for the duration of
* the operations.
*/
struct StatInfo {
enum FileType {
IS_DIR = 1,
IS_FILE = 2,
IS_SYMLINK = 3
};
int file_type;
::std::string path;
unsigned long int length;
unsigned long int permissions; //Octal number as in POSIX permissions; e.g. 0777
::std::string owner;
::std::string group;
unsigned long int modification_time;
unsigned long int access_time;
::std::string symlink;
unsigned int block_replication;
unsigned long int blocksize;
unsigned long int fileid;
unsigned long int children_num;
StatInfo()
: file_type(0),
length(0),
permissions(0),
modification_time(0),
access_time(0),
block_replication(0),
blocksize(0),
fileid(0),
children_num(0) {
}
};
}
#endif

View File

@ -38,7 +38,8 @@ class Status {
static Status Exception(const char *expception_class_name, const char *error_message);
static Status Error(const char *error_message);
static Status AuthenticationFailed();
static Status Canceled();
static Status Canceled();
static Status PathNotFound(const char *msg);
// success
bool ok() const { return code_ == 0; }
@ -56,6 +57,7 @@ class Status {
kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported),
kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled),
kPermissionDenied = static_cast<unsigned>(std::errc::permission_denied),
kPathNotFound = static_cast<unsigned>(std::errc::no_such_file_or_directory),
kException = 256,
kAuthenticationFailed = 257,
};

View File

@ -26,6 +26,9 @@
#include <hdfs/hdfs.h>
#include <hdfspp/hdfs_ext.h>
#include <libgen.h>
#include "limits.h"
#include <string>
#include <cstring>
#include <iostream>
@ -146,6 +149,10 @@ static int Error(const Status &stat) {
errnum = EACCES;
default_message = "Permission denied";
break;
case Status::Code::kPathNotFound:
errnum = ENOENT;
default_message = "No such file or directory";
break;
default:
errnum = ENOSYS;
default_message = "Error: unrecognised code";
@ -316,6 +323,127 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
}
}
void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info,
const hdfs::StatInfo & stat_info) {
/* file or directory */
if (stat_info.file_type == StatInfo::IS_DIR) {
file_info->mKind = kObjectKindDirectory;
} else if (stat_info.file_type == StatInfo::IS_FILE) {
file_info->mKind = kObjectKindFile;
} else {
file_info->mKind = kObjectKindFile;
LOG_WARN(kFileSystem, << "Symlink is not supported! Reporting as a file: ");
}
/* the name of the file */
char copyOfPath[PATH_MAX];
strncpy(copyOfPath, stat_info.path.c_str(), PATH_MAX);
copyOfPath[PATH_MAX - 1] = '\0'; // in case strncpy ran out of space
char * mName = basename(copyOfPath);
size_t mName_size = strlen(mName);
file_info->mName = new char[mName_size+1];
strncpy(file_info->mName, basename(copyOfPath), mName_size + 1);
/* the last modification time for the file in seconds */
file_info->mLastMod = (tTime) stat_info.modification_time;
/* the size of the file in bytes */
file_info->mSize = (tOffset) stat_info.length;
/* the count of replicas */
file_info->mReplication = (short) stat_info.block_replication;
/* the block size for the file */
file_info->mBlockSize = (tOffset) stat_info.blocksize;
/* the owner of the file */
file_info->mOwner = new char[stat_info.owner.size() + 1];
strncpy(file_info->mOwner, stat_info.owner.c_str(), stat_info.owner.size() + 1);
/* the group associated with the file */
file_info->mGroup = new char[stat_info.group.size() + 1];
strncpy(file_info->mGroup, stat_info.group.c_str(), stat_info.group.size() + 1);
/* the permissions associated with the file */
file_info->mPermissions = (short) stat_info.permissions;
/* the last access time for the file in seconds */
file_info->mLastAccess = stat_info.access_time;
}
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
try {
if (!CheckSystem(fs)) {
return nullptr;
}
hdfs::StatInfo stat_info;
Status stat = fs->get_impl()->GetFileInfo(path, stat_info);
if (!stat.ok()) {
Error(stat);
return nullptr;
}
hdfsFileInfo *file_info = new hdfsFileInfo[1];
StatInfoToHdfsFileInfo(file_info, stat_info);
return file_info;
} catch (const std::exception & e) {
ReportException(e);
return nullptr;
} catch (...) {
ReportCaughtNonException();
return nullptr;
}
}
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) {
try {
if (!CheckSystem(fs)) {
*numEntries = 0;
return nullptr;
}
std::shared_ptr<std::vector<StatInfo>> stat_infos;
Status stat = fs->get_impl()->GetListing(path, stat_infos);
if (!stat.ok()) {
Error(stat);
*numEntries = 0;
return nullptr;
}
//Existing API expects nullptr if size is 0
if(!stat_infos || stat_infos->size()==0){
*numEntries = 0;
return nullptr;
}
*numEntries = stat_infos->size();
hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos->size()];
for(std::vector<StatInfo>::size_type i = 0; i < stat_infos->size(); i++) {
StatInfoToHdfsFileInfo(&file_infos[i], stat_infos->at(i));
}
return file_infos;
} catch (const std::exception & e) {
ReportException(e);
*numEntries = 0;
return nullptr;
} catch (...) {
ReportCaughtNonException();
*numEntries = 0;
return nullptr;
}
}
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries)
{
int i;
for (i = 0; i < numEntries; ++i) {
delete[] hdfsFileInfo[i].mName;
delete[] hdfsFileInfo[i].mOwner;
delete[] hdfsFileInfo[i].mGroup;
}
delete[] hdfsFileInfo;
}
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
tSize length) {
try

View File

@ -26,6 +26,8 @@ namespace hdfs {
const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessControlException";
const char * kStatusSaslException = "javax.security.sasl.SaslException";
const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException";
const char * kPathNotFoundException2 = "java.io.FileNotFoundException";
Status::Status(int code, const char *msg1) : code_(code) {
if(msg1) {
@ -53,6 +55,10 @@ Status Status::InvalidArgument(const char *msg) {
return Status(kInvalidArgument, msg);
}
Status Status::PathNotFound(const char *msg){
return Status(kPathNotFound, msg);
}
Status Status::ResourceUnavailable(const char *msg) {
return Status(kResourceUnavailable, msg);
}
@ -66,6 +72,10 @@ Status Status::Exception(const char *exception_class_name, const char *error_mes
return Status(kPermissionDenied, error_message);
else if (exception_class_name && (strcmp(exception_class_name, kStatusSaslException) == 0))
return AuthenticationFailed();
else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException) == 0))
return Status(kPathNotFound, error_message);
else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException2) == 0))
return Status(kPathNotFound, error_message);
else
return Status(kException, exception_class_name, error_message);
}

View File

@ -71,30 +71,19 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
using ::hadoop::hdfs::GetBlockLocationsResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations("
<< FMT_THIS_ADDR << ", path=" << path << ", ...) called");
<< FMT_THIS_ADDR << ", path=" << path << ", ...) called");
struct State {
GetBlockLocationsRequestProto req;
std::shared_ptr<GetBlockLocationsResponseProto> resp;
};
auto m = continuation::Pipeline<State>::Create();
auto &req = m->state().req;
GetBlockLocationsRequestProto req;
req.set_src(path);
req.set_offset(0);
req.set_length(std::numeric_limits<long long>::max());
m->state().resp.reset(new GetBlockLocationsResponseProto());
State *s = &m->state();
m->Push(continuation::Bind(
[this, s](const continuation::Continuation::Next &next) {
namenode_.GetBlockLocations(&s->req, s->resp, next);
}));
auto resp = std::make_shared<GetBlockLocationsResponseProto>();
m->Run([this, handler](const Status &stat, const State &s) {
namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) {
if (stat.ok()) {
auto file_info = std::make_shared<struct FileInfo>();
auto locations = s.resp->locations();
auto locations = resp->locations();
file_info->file_length_ = locations.filelength();
file_info->last_block_complete_ = locations.islastblockcomplete();
@ -117,11 +106,107 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
});
}
void NameNodeOperations::GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler)
{
using ::hadoop::hdfs::GetFileInfoRequestProto;
using ::hadoop::hdfs::GetFileInfoResponseProto;
LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo("
<< FMT_THIS_ADDR << ", path=" << path << ") called");
GetFileInfoRequestProto req;
req.set_src(path);
auto resp = std::make_shared<GetFileInfoResponseProto>();
namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) {
if (stat.ok()) {
// For non-existant files, the server will respond with an OK message but
// no fs in the protobuf.
if(resp -> has_fs()){
struct StatInfo stat_info;
stat_info.path=path;
HdfsFileStatusProtoToStatInfo(stat_info, resp->fs());
handler(stat, stat_info);
} else {
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
handler(statNew, StatInfo());
}
} else {
handler(stat, StatInfo());
}
});
}
void NameNodeOperations::GetListing(
const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler,
const std::string & start_after) {
using ::hadoop::hdfs::GetListingRequestProto;
using ::hadoop::hdfs::GetListingResponseProto;
LOG_TRACE(
kFileSystem,
<< "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called");
GetListingRequestProto req;
req.set_src(path);
req.set_startafter(start_after.c_str());
req.set_needlocation(false);
auto resp = std::make_shared<GetListingResponseProto>();
namenode_.GetListing(
&req,
resp,
[resp, handler, path](const Status &stat) {
if (stat.ok()) {
if(resp -> has_dirlist()){
std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>);
for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) {
StatInfo si;
si.path=fs.path();
HdfsFileStatusProtoToStatInfo(si, fs);
stat_infos->push_back(si);
}
handler(stat, stat_infos, resp->dirlist().remainingentries() > 0);
} else {
std::string errormsg = "No such file or directory: " + path;
Status statNew = Status::PathNotFound(errormsg.c_str());
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(statNew, stat_infos, false);
}
} else {
std::shared_ptr<std::vector<StatInfo>> stat_infos;
handler(stat, stat_infos, false);
}
});
}
void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) {
engine_.SetFsEventCallback(callback);
}
void NameNodeOperations::HdfsFileStatusProtoToStatInfo(
hdfs::StatInfo & stat_info,
const ::hadoop::hdfs::HdfsFileStatusProto & fs) {
stat_info.file_type = fs.filetype();
stat_info.length = fs.length();
stat_info.permissions = fs.permission().perm();
stat_info.owner = fs.owner();
stat_info.group = fs.group();
stat_info.modification_time = fs.modification_time();
stat_info.access_time = fs.access_time();
stat_info.symlink = fs.symlink();
stat_info.block_replication = fs.block_replication();
stat_info.blocksize = fs.blocksize();
stat_info.fileid = fs.fileid();
stat_info.children_num = fs.childrennum();
}
/*****************************************************************************
* FILESYSTEM BASE CLASS
****************************************************************************/
@ -339,7 +424,6 @@ Status FileSystemImpl::Open(const std::string &path,
return stat;
}
BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
{
BlockLocation result;
@ -380,6 +464,10 @@ BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto
void FileSystemImpl::GetBlockLocations(const std::string & path,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler)
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) {
if (status.ok()) {
auto result = std::make_shared<FileBlockLocation>();
@ -407,6 +495,10 @@ void FileSystemImpl::GetBlockLocations(const std::string & path,
Status FileSystemImpl::GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * fileBlockLocations)
{
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
if (!fileBlockLocations)
return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
@ -433,6 +525,125 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path,
return stat;
}
void FileSystemImpl::GetFileInfo(
const std::string &path,
const std::function<void(const Status &, const StatInfo &)> &handler) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
nn_.GetFileInfo(path, [handler](const Status &stat, const StatInfo &stat_info) {
handler(stat, stat_info);
});
}
Status FileSystemImpl::GetFileInfo(const std::string &path,
StatInfo & stat_info) {
LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, StatInfo>>>();
std::future<std::tuple<Status, StatInfo>> future(callstate->get_future());
/* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */
auto h = [callstate](const Status &s, const StatInfo &si) {
callstate->set_value(std::make_tuple(s, si));
};
GetFileInfo(path, h);
/* block until promise is set */
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
StatInfo info = std::get<1>(returnstate);
if (!stat.ok()) {
return stat;
}
stat_info = info;
return stat;
}
/**
* Helper function for recursive GetListing calls.
*
* Some compilers don't like recursive lambdas, so we make the lambda call a
* method, which in turn creates a lambda calling itself.
*/
void FileSystemImpl::GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more,
std::string path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
bool has_next = stat_infos && stat_infos->size() > 0;
bool get_more = handler(stat, stat_infos, has_more && has_next);
if (get_more && has_more && has_next ) {
auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
GetListingShim(stat, stat_infos, has_more, path, handler);
};
std::string last = stat_infos->back().path;
nn_.GetListing(path, callback, last);
}
}
void FileSystemImpl::GetListing(
const std::string &path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) {
LOG_INFO(kFileSystem, << "FileSystemImpl::GetListing("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
// Caputure the state and push it into the shim
auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) {
GetListingShim(stat, stat_infos, has_more, path, handler);
};
nn_.GetListing(path, callback);
}
Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) {
LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]GetListing("
<< FMT_THIS_ADDR << ", path="
<< path << ") called");
// In this case, we're going to allocate the result on the heap and have the
// async code populate it.
auto results = std::make_shared<std::vector<StatInfo>>();
auto callstate = std::make_shared<std::promise<Status>>();
std::future<Status> future(callstate->get_future());
/* wrap async FileSystem::GetListing with promise to make it a blocking call.
*
Keep requesting more until we get the entire listing, and don't set the promise
* until we have the entire listing.
*/
auto h = [callstate, results](const Status &s, std::shared_ptr<std::vector<StatInfo>> si, bool has_more) -> bool {
if (si) {
results->insert(results->end(), si->begin(), si->end());
}
bool done = !s.ok() || !has_more;
if (done) {
callstate->set_value(s);
return false;
}
return true;
};
GetListing(path, h);
/* block until promise is set */
Status stat = future.get();
if (!stat.ok()) {
return stat;
}
stat_infos = results;
return stat;
}
void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {

View File

@ -28,6 +28,7 @@
#include "rpc/rpc_engine.h"
#include "reader/block_reader.h"
#include "reader/fileinfo.h"
#include "hdfspp/statinfo.h"
#include "ClientNamenodeProtocol.pb.h"
#include "ClientNamenodeProtocol.hrpc.inl"
@ -64,8 +65,20 @@ public:
void GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
void GetFileInfo(const std::string & path,
std::function<void(const Status &, const StatInfo &)> handler);
// start_after="" for initial call
void GetListing(const std::string & path,
std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler,
const std::string & start_after = "");
void SetFsEventCallback(fs_event_callback callback);
private:
static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs);
static void DirectoryListingProtoToStatInfo(std::shared_ptr<std::vector<StatInfo>> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl);
::asio::io_service * io_service_;
RpcEngine engine_;
ClientNamenodeProtocol namenode_;
@ -105,6 +118,17 @@ public:
&handler) override;
Status Open(const std::string &path, FileHandle **handle) override;
void GetFileInfo(
const std::string &path,
const std::function<void(const Status &, const StatInfo &)> &handler) override;
Status GetFileInfo(const std::string &path, StatInfo & stat_info) override;
void GetListing(
const std::string &path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) override;
Status GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) override;
virtual void GetBlockLocations(const std::string & path,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override;
@ -150,6 +174,11 @@ private:
* exposes implementation details that may change at any time.
**/
std::shared_ptr<LibhdfsEvents> event_handlers_;
void GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more,
std::string path,
const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler);
};
}

View File

@ -78,8 +78,7 @@ void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) {
}
hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) {
REPORT_FUNCTION_NOT_IMPLEMENTED
return NULL;
return (hdfsFS) libhdfspp_hdfsConnectAsUser(nn, port, user);
}
hdfsFS hdfsConnect(const char* nn, tPort port) {
@ -294,16 +293,16 @@ int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path,
int *numEntries) {
return (hdfsFileInfo *)libhdfs_hdfsListDirectory(fs->libhdfsRep, path, numEntries);
return (hdfsFileInfo *)libhdfspp_hdfsListDirectory(fs->libhdfsppRep, path, numEntries);
}
hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) {
return (hdfsFileInfo *)libhdfs_hdfsGetPathInfo(fs->libhdfsRep, path);
return (hdfsFileInfo *)libhdfspp_hdfsGetPathInfo(fs->libhdfsppRep, path);
}
void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) {
return libhdfs_hdfsFreeFileInfo
((libhdfs_hdfsFileInfo *) hdfsFileInfo, numEntries);
return libhdfspp_hdfsFreeFileInfo
((libhdfspp_hdfsFileInfo *) hdfsFileInfo, numEntries);
}
int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo) {