From 0496bc54641d3ce284ab8832099c6a9d67ee9069 Mon Sep 17 00:00:00 2001 From: Bob Hansen Date: Thu, 2 Jun 2016 10:38:05 -0400 Subject: [PATCH] HDFS-10464: libhdfs++: Implement GetPathInfo and ListDirectory. Contributed by Anatoli Shein. --- .../libhdfs-tests/test_libhdfs_threaded.c | 54 +++- .../native/libhdfspp/include/hdfspp/hdfspp.h | 30 ++- .../libhdfspp/include/hdfspp/statinfo.h | 62 +++++ .../native/libhdfspp/include/hdfspp/status.h | 4 +- .../native/libhdfspp/lib/bindings/c/hdfs.cc | 128 +++++++++ .../native/libhdfspp/lib/common/status.cc | 10 + .../native/libhdfspp/lib/fs/filesystem.cc | 245 ++++++++++++++++-- .../main/native/libhdfspp/lib/fs/filesystem.h | 29 +++ .../main/native/libhdfspp/tests/hdfs_shim.c | 11 +- 9 files changed, 543 insertions(+), 30 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c index 887af8e68b1..027d1d716fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c @@ -30,6 +30,7 @@ #include #include #include +#include #define TO_STR_HELPER(X) #X #define TO_STR(X) TO_STR_HELPER(X) @@ -56,7 +57,7 @@ static int hdfsSingleNameNodeConnect(struct NativeMiniDfsCluster *cl, hdfsFS *fs tPort port; hdfsFS hdfs; struct hdfsBuilder *bld; - + port = (tPort)nmdGetNameNodePort(cl); if (port < 0) { fprintf(stderr, "hdfsSingleNameNodeConnect: nmdGetNameNodePort " @@ -109,7 +110,7 @@ static int doTestGetDefaultBlockSize(hdfsFS fs, const char *path) return ret; } else if (blockSize != TLH_DEFAULT_BLOCK_SIZE) { fprintf(stderr, "hdfsGetDefaultBlockSizeAtPath(%s) got " - "%"PRId64", but we expected %d\n", + "%"PRId64", but we expected %d\n", path, blockSize, TLH_DEFAULT_BLOCK_SIZE); return EIO; } @@ -157,6 +158,12 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs, EXPECT_ZERO(doTestGetDefaultBlockSize(fs, paths->prefix)); + /* There is no such directory. + * Check that errno is set to ENOENT + */ + char invalid_path[] = "/some_invalid/path"; + EXPECT_NULL_WITH_ERRNO(hdfsListDirectory(fs, invalid_path, &numEntries), ENOENT); + /* There should be no entry in the directory. */ errno = EACCES; // see if errno is set to 0 on success EXPECT_NULL_WITH_ERRNO(hdfsListDirectory(fs, paths->prefix, &numEntries), 0); @@ -198,11 +205,30 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs, EXPECT_ZERO(hdfsCloseFile(fs, file)); /* There should be 1 entry in the directory. */ - EXPECT_NONNULL(hdfsListDirectory(fs, paths->prefix, &numEntries)); + hdfsFileInfo * dirList = hdfsListDirectory(fs, paths->prefix, &numEntries); + EXPECT_NONNULL(dirList); if (numEntries != 1) { fprintf(stderr, "hdfsListDirectory set numEntries to " "%d on directory containing 1 file.", numEntries); } + hdfsFreeFileInfo(dirList, numEntries); + + /* Create many files for ListDirectory to page through */ + int nFile; + for (nFile = 0; nFile < 10000; nFile++) { + char filename[PATH_MAX]; + snprintf(filename, PATH_MAX, "%s/many_files_%d", paths->prefix, nFile); + file = hdfsOpenFile(fs, filename, O_WRONLY, 0, 0, 0); + EXPECT_NONNULL(file); + EXPECT_ZERO(hdfsCloseFile(fs, file)); + } + dirList = hdfsListDirectory(fs, paths->prefix, &numEntries); + EXPECT_NONNULL(dirList); + if (numEntries != 10002) { + fprintf(stderr, "hdfsListDirectory set numEntries to " + "%d on directory containing 10002 files.", numEntries); + } + hdfsFreeFileInfo(dirList, numEntries); /* Let's re-open the file for reading */ file = hdfsOpenFile(fs, paths->file1, O_RDONLY, 0, 0, 0); @@ -274,7 +300,25 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs, snprintf(tmp, sizeof(tmp), "%s/nonexistent-file-name", paths->prefix); EXPECT_NEGATIVE_ONE_WITH_ERRNO(hdfsChown(fs, tmp, "ha3", NULL), ENOENT); - return 0; + + //Test case: File does not exist + EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, invalid_path), ENOENT); + +// Test case: No permission to access parent directory +// Trying to set permissions of the parent directory to 0 +// by a super user, and then connecting as SomeGuy. Should +// receive permission denied, but receives fileInfo. +// EXPECT_ZERO(hdfsChmod(fs, paths->prefix, 0)); +// EXPECT_ZERO(hdfsChmod(fs, paths->file2, 0)); +// EXPECT_ZERO(hdfsDisconnect(fs)); +// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, "SomeGuy")); +// EXPECT_NULL_WITH_ERRNO(hdfsGetPathInfo(fs, paths->file2), EACCES); +// EXPECT_ZERO(hdfsDisconnect(fs)); +// EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL)); +// if (!fs) { +// return 1; +// } + return 0; } static int testHdfsOperationsImpl(struct tlhThreadInfo *ti) @@ -329,7 +373,7 @@ static int checkFailures(struct tlhThreadInfo *ti, int tlhNumThreads) for (i = 0; i < tlhNumThreads; i++) { if (ti[i].success != 0) { fprintf(stderr, "%s%d", sep, i); - sep = ", "; + sep = ", "; } } fprintf(stderr, "]. FAILURE.\n"); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 1e4455a87b7..ec26a55df49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -22,6 +22,8 @@ #include "hdfspp/status.h" #include "hdfspp/events.h" #include "hdfspp/block_location.h" +#include "hdfspp/statinfo.h" + #include #include @@ -169,6 +171,33 @@ class FileSystem { const std::function &handler) = 0; virtual Status Open(const std::string &path, FileHandle **handle) = 0; + /** + * Returns metadata about the file if the file/directory exists. + **/ + virtual void + GetFileInfo(const std::string &path, + const std::function &handler) = 0; + virtual Status GetFileInfo(const std::string &path, StatInfo & stat_info) = 0; + + /** + * Retrieves the files contained in a directory and returns the metadata + * for each of them. + * + * The asynchronous method will return batches of files; the consumer must + * return true if they want more files to be delivered. The final bool + * parameter in the callback will be set to true if this is the final + * batch of files. + * + * The synchronous method will return all files in the directory. + * + * Path must be an absolute path in the hdfs filesytem (e.g. /tmp/foo/bar) + **/ + virtual void + GetListing(const std::string &path, + const std::function> &, bool)> &handler) = 0; + virtual Status GetListing(const std::string &path, + std::shared_ptr> & stat_infos) = 0; + /** * Returns the locations of all known blocks for the indicated file, or an error * if the information clould not be found @@ -178,7 +207,6 @@ class FileSystem { virtual Status GetBlockLocations(const std::string & path, std::shared_ptr * locations) = 0; - /** * Note that it is an error to destroy the filesystem from within a filesystem * callback. It will lead to a deadlock and the termination of the process. diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h new file mode 100644 index 00000000000..a53ab8b6096 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef HDFSPP_STATINFO_H_ +#define HDFSPP_STATINFO_H_ + +namespace hdfs { + +/** + * Information that is assumed to be unchanging about a file for the duration of + * the operations. + */ +struct StatInfo { + enum FileType { + IS_DIR = 1, + IS_FILE = 2, + IS_SYMLINK = 3 + }; + + int file_type; + ::std::string path; + unsigned long int length; + unsigned long int permissions; //Octal number as in POSIX permissions; e.g. 0777 + ::std::string owner; + ::std::string group; + unsigned long int modification_time; + unsigned long int access_time; + ::std::string symlink; + unsigned int block_replication; + unsigned long int blocksize; + unsigned long int fileid; + unsigned long int children_num; + StatInfo() + : file_type(0), + length(0), + permissions(0), + modification_time(0), + access_time(0), + block_replication(0), + blocksize(0), + fileid(0), + children_num(0) { + } +}; + +} + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h index 6a3ce739a7a..7e8cdfa52a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h @@ -38,7 +38,8 @@ class Status { static Status Exception(const char *expception_class_name, const char *error_message); static Status Error(const char *error_message); static Status AuthenticationFailed(); - static Status Canceled(); + static Status Canceled(); + static Status PathNotFound(const char *msg); // success bool ok() const { return code_ == 0; } @@ -56,6 +57,7 @@ class Status { kUnimplemented = static_cast(std::errc::function_not_supported), kOperationCanceled = static_cast(std::errc::operation_canceled), kPermissionDenied = static_cast(std::errc::permission_denied), + kPathNotFound = static_cast(std::errc::no_such_file_or_directory), kException = 256, kAuthenticationFailed = 257, }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 8a73c41dc7c..0fe84795c8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -26,6 +26,9 @@ #include #include +#include +#include "limits.h" + #include #include #include @@ -146,6 +149,10 @@ static int Error(const Status &stat) { errnum = EACCES; default_message = "Permission denied"; break; + case Status::Code::kPathNotFound: + errnum = ENOENT; + default_message = "No such file or directory"; + break; default: errnum = ENOSYS; default_message = "Error: unrecognised code"; @@ -316,6 +323,127 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) { } } +void StatInfoToHdfsFileInfo(hdfsFileInfo * file_info, + const hdfs::StatInfo & stat_info) { + /* file or directory */ + if (stat_info.file_type == StatInfo::IS_DIR) { + file_info->mKind = kObjectKindDirectory; + } else if (stat_info.file_type == StatInfo::IS_FILE) { + file_info->mKind = kObjectKindFile; + } else { + file_info->mKind = kObjectKindFile; + LOG_WARN(kFileSystem, << "Symlink is not supported! Reporting as a file: "); + } + + /* the name of the file */ + char copyOfPath[PATH_MAX]; + strncpy(copyOfPath, stat_info.path.c_str(), PATH_MAX); + copyOfPath[PATH_MAX - 1] = '\0'; // in case strncpy ran out of space + + char * mName = basename(copyOfPath); + size_t mName_size = strlen(mName); + file_info->mName = new char[mName_size+1]; + strncpy(file_info->mName, basename(copyOfPath), mName_size + 1); + + /* the last modification time for the file in seconds */ + file_info->mLastMod = (tTime) stat_info.modification_time; + + /* the size of the file in bytes */ + file_info->mSize = (tOffset) stat_info.length; + + /* the count of replicas */ + file_info->mReplication = (short) stat_info.block_replication; + + /* the block size for the file */ + file_info->mBlockSize = (tOffset) stat_info.blocksize; + + /* the owner of the file */ + file_info->mOwner = new char[stat_info.owner.size() + 1]; + strncpy(file_info->mOwner, stat_info.owner.c_str(), stat_info.owner.size() + 1); + + /* the group associated with the file */ + file_info->mGroup = new char[stat_info.group.size() + 1]; + strncpy(file_info->mGroup, stat_info.group.c_str(), stat_info.group.size() + 1); + + /* the permissions associated with the file */ + file_info->mPermissions = (short) stat_info.permissions; + + /* the last access time for the file in seconds */ + file_info->mLastAccess = stat_info.access_time; +} + +hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { + try { + if (!CheckSystem(fs)) { + return nullptr; + } + + hdfs::StatInfo stat_info; + Status stat = fs->get_impl()->GetFileInfo(path, stat_info); + if (!stat.ok()) { + Error(stat); + return nullptr; + } + hdfsFileInfo *file_info = new hdfsFileInfo[1]; + StatInfoToHdfsFileInfo(file_info, stat_info); + return file_info; + } catch (const std::exception & e) { + ReportException(e); + return nullptr; + } catch (...) { + ReportCaughtNonException(); + return nullptr; + } +} + +hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { + try { + if (!CheckSystem(fs)) { + *numEntries = 0; + return nullptr; + } + + std::shared_ptr> stat_infos; + Status stat = fs->get_impl()->GetListing(path, stat_infos); + if (!stat.ok()) { + Error(stat); + *numEntries = 0; + return nullptr; + } + //Existing API expects nullptr if size is 0 + if(!stat_infos || stat_infos->size()==0){ + *numEntries = 0; + return nullptr; + } + *numEntries = stat_infos->size(); + hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos->size()]; + for(std::vector::size_type i = 0; i < stat_infos->size(); i++) { + StatInfoToHdfsFileInfo(&file_infos[i], stat_infos->at(i)); + } + + return file_infos; + } catch (const std::exception & e) { + ReportException(e); + *numEntries = 0; + return nullptr; + } catch (...) { + ReportCaughtNonException(); + *numEntries = 0; + return nullptr; + } +} + +void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) +{ + int i; + for (i = 0; i < numEntries; ++i) { + delete[] hdfsFileInfo[i].mName; + delete[] hdfsFileInfo[i].mOwner; + delete[] hdfsFileInfo[i].mGroup; + } + delete[] hdfsFileInfo; +} + tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, tSize length) { try diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc index e215a67cc0b..fd4df338087 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc @@ -26,6 +26,8 @@ namespace hdfs { const char * kStatusAccessControlException = "org.apache.hadoop.security.AccessControlException"; const char * kStatusSaslException = "javax.security.sasl.SaslException"; +const char * kPathNotFoundException = "org.apache.hadoop.fs.InvalidPathException"; +const char * kPathNotFoundException2 = "java.io.FileNotFoundException"; Status::Status(int code, const char *msg1) : code_(code) { if(msg1) { @@ -53,6 +55,10 @@ Status Status::InvalidArgument(const char *msg) { return Status(kInvalidArgument, msg); } +Status Status::PathNotFound(const char *msg){ + return Status(kPathNotFound, msg); +} + Status Status::ResourceUnavailable(const char *msg) { return Status(kResourceUnavailable, msg); } @@ -66,6 +72,10 @@ Status Status::Exception(const char *exception_class_name, const char *error_mes return Status(kPermissionDenied, error_message); else if (exception_class_name && (strcmp(exception_class_name, kStatusSaslException) == 0)) return AuthenticationFailed(); + else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException) == 0)) + return Status(kPathNotFound, error_message); + else if (exception_class_name && (strcmp(exception_class_name, kPathNotFoundException2) == 0)) + return Status(kPathNotFound, error_message); else return Status(kException, exception_class_name, error_message); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index d2e23b8461d..eace6fa98a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -71,30 +71,19 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, using ::hadoop::hdfs::GetBlockLocationsResponseProto; LOG_TRACE(kFileSystem, << "NameNodeOperations::GetBlockLocations(" - << FMT_THIS_ADDR << ", path=" << path << ", ...) called"); + << FMT_THIS_ADDR << ", path=" << path << ", ...) called"); - struct State { - GetBlockLocationsRequestProto req; - std::shared_ptr resp; - }; - - auto m = continuation::Pipeline::Create(); - auto &req = m->state().req; + GetBlockLocationsRequestProto req; req.set_src(path); req.set_offset(0); req.set_length(std::numeric_limits::max()); - m->state().resp.reset(new GetBlockLocationsResponseProto()); - State *s = &m->state(); - m->Push(continuation::Bind( - [this, s](const continuation::Continuation::Next &next) { - namenode_.GetBlockLocations(&s->req, s->resp, next); - })); + auto resp = std::make_shared(); - m->Run([this, handler](const Status &stat, const State &s) { + namenode_.GetBlockLocations(&req, resp, [resp, handler](const Status &stat) { if (stat.ok()) { auto file_info = std::make_shared(); - auto locations = s.resp->locations(); + auto locations = resp->locations(); file_info->file_length_ = locations.filelength(); file_info->last_block_complete_ = locations.islastblockcomplete(); @@ -117,11 +106,107 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, }); } +void NameNodeOperations::GetFileInfo(const std::string & path, + std::function handler) +{ + using ::hadoop::hdfs::GetFileInfoRequestProto; + using ::hadoop::hdfs::GetFileInfoResponseProto; + + LOG_TRACE(kFileSystem, << "NameNodeOperations::GetFileInfo(" + << FMT_THIS_ADDR << ", path=" << path << ") called"); + + GetFileInfoRequestProto req; + req.set_src(path); + + auto resp = std::make_shared(); + + namenode_.GetFileInfo(&req, resp, [resp, handler, path](const Status &stat) { + if (stat.ok()) { + // For non-existant files, the server will respond with an OK message but + // no fs in the protobuf. + if(resp -> has_fs()){ + struct StatInfo stat_info; + stat_info.path=path; + HdfsFileStatusProtoToStatInfo(stat_info, resp->fs()); + handler(stat, stat_info); + } else { + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + handler(statNew, StatInfo()); + } + } else { + handler(stat, StatInfo()); + } + }); +} + +void NameNodeOperations::GetListing( + const std::string & path, + std::function> &, bool)> handler, + const std::string & start_after) { + using ::hadoop::hdfs::GetListingRequestProto; + using ::hadoop::hdfs::GetListingResponseProto; + + LOG_TRACE( + kFileSystem, + << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); + + GetListingRequestProto req; + req.set_src(path); + req.set_startafter(start_after.c_str()); + req.set_needlocation(false); + + auto resp = std::make_shared(); + + namenode_.GetListing( + &req, + resp, + [resp, handler, path](const Status &stat) { + if (stat.ok()) { + if(resp -> has_dirlist()){ + std::shared_ptr> stat_infos(new std::vector); + for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { + StatInfo si; + si.path=fs.path(); + HdfsFileStatusProtoToStatInfo(si, fs); + stat_infos->push_back(si); + } + handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); + } else { + std::string errormsg = "No such file or directory: " + path; + Status statNew = Status::PathNotFound(errormsg.c_str()); + std::shared_ptr> stat_infos; + handler(statNew, stat_infos, false); + } + } else { + std::shared_ptr> stat_infos; + handler(stat, stat_infos, false); + } + }); +} + void NameNodeOperations::SetFsEventCallback(fs_event_callback callback) { engine_.SetFsEventCallback(callback); } +void NameNodeOperations::HdfsFileStatusProtoToStatInfo( + hdfs::StatInfo & stat_info, + const ::hadoop::hdfs::HdfsFileStatusProto & fs) { + stat_info.file_type = fs.filetype(); + stat_info.length = fs.length(); + stat_info.permissions = fs.permission().perm(); + stat_info.owner = fs.owner(); + stat_info.group = fs.group(); + stat_info.modification_time = fs.modification_time(); + stat_info.access_time = fs.access_time(); + stat_info.symlink = fs.symlink(); + stat_info.block_replication = fs.block_replication(); + stat_info.blocksize = fs.blocksize(); + stat_info.fileid = fs.fileid(); + stat_info.children_num = fs.childrennum(); +} + /***************************************************************************** * FILESYSTEM BASE CLASS ****************************************************************************/ @@ -339,7 +424,6 @@ Status FileSystemImpl::Open(const std::string &path, return stat; } - BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock) { BlockLocation result; @@ -380,6 +464,10 @@ BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto void FileSystemImpl::GetBlockLocations(const std::string & path, const std::function locations)> handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + auto conversion = [handler](const Status & status, std::shared_ptr fileInfo) { if (status.ok()) { auto result = std::make_shared(); @@ -407,6 +495,10 @@ void FileSystemImpl::GetBlockLocations(const std::string & path, Status FileSystemImpl::GetBlockLocations(const std::string & path, std::shared_ptr * fileBlockLocations) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetBlockLocations(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + if (!fileBlockLocations) return Status::InvalidArgument("Null pointer passed to GetBlockLocations"); @@ -433,6 +525,125 @@ Status FileSystemImpl::GetBlockLocations(const std::string & path, return stat; } +void FileSystemImpl::GetFileInfo( + const std::string &path, + const std::function &handler) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::GetFileInfo(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + nn_.GetFileInfo(path, [handler](const Status &stat, const StatInfo &stat_info) { + handler(stat, stat_info); + }); +} + +Status FileSystemImpl::GetFileInfo(const std::string &path, + StatInfo & stat_info) { + LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]GetFileInfo(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + auto callstate = std::make_shared>>(); + std::future> future(callstate->get_future()); + + /* wrap async FileSystem::GetFileInfo with promise to make it a blocking call */ + auto h = [callstate](const Status &s, const StatInfo &si) { + callstate->set_value(std::make_tuple(s, si)); + }; + + GetFileInfo(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + StatInfo info = std::get<1>(returnstate); + + if (!stat.ok()) { + return stat; + } + + stat_info = info; + return stat; +} + +/** + * Helper function for recursive GetListing calls. + * + * Some compilers don't like recursive lambdas, so we make the lambda call a + * method, which in turn creates a lambda calling itself. + */ +void FileSystemImpl::GetListingShim(const Status &stat, std::shared_ptr> &stat_infos, bool has_more, + std::string path, + const std::function>&, bool)> &handler) { + bool has_next = stat_infos && stat_infos->size() > 0; + bool get_more = handler(stat, stat_infos, has_more && has_next); + if (get_more && has_more && has_next ) { + auto callback = [this, path, handler](const Status &stat, std::shared_ptr> &stat_infos, bool has_more) { + GetListingShim(stat, stat_infos, has_more, path, handler); + }; + + std::string last = stat_infos->back().path; + nn_.GetListing(path, callback, last); + } +} + +void FileSystemImpl::GetListing( + const std::string &path, + const std::function>&, bool)> &handler) { + LOG_INFO(kFileSystem, << "FileSystemImpl::GetListing(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + // Caputure the state and push it into the shim + auto callback = [this, path, handler](const Status &stat, std::shared_ptr> &stat_infos, bool has_more) { + GetListingShim(stat, stat_infos, has_more, path, handler); + }; + + nn_.GetListing(path, callback); +} + +Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr> &stat_infos) { + LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]GetListing(" + << FMT_THIS_ADDR << ", path=" + << path << ") called"); + + // In this case, we're going to allocate the result on the heap and have the + // async code populate it. + auto results = std::make_shared>(); + + auto callstate = std::make_shared>(); + std::future future(callstate->get_future()); + + /* wrap async FileSystem::GetListing with promise to make it a blocking call. + * + Keep requesting more until we get the entire listing, and don't set the promise + * until we have the entire listing. + */ + auto h = [callstate, results](const Status &s, std::shared_ptr> si, bool has_more) -> bool { + if (si) { + results->insert(results->end(), si->begin(), si->end()); + } + + bool done = !s.ok() || !has_more; + if (done) { + callstate->set_value(s); + return false; + } + return true; + }; + + GetListing(path, h); + + /* block until promise is set */ + Status stat = future.get(); + + if (!stat.ok()) { + return stat; + } + + stat_infos = results; + return stat; +} void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 0a479a61186..869fd2dc075 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -28,6 +28,7 @@ #include "rpc/rpc_engine.h" #include "reader/block_reader.h" #include "reader/fileinfo.h" +#include "hdfspp/statinfo.h" #include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.hrpc.inl" @@ -64,8 +65,20 @@ public: void GetBlockLocations(const std::string & path, std::function)> handler); + void GetFileInfo(const std::string & path, + std::function handler); + + // start_after="" for initial call + void GetListing(const std::string & path, + std::function>&, bool)> handler, + const std::string & start_after = ""); + void SetFsEventCallback(fs_event_callback callback); + private: + static void HdfsFileStatusProtoToStatInfo(hdfs::StatInfo & si, const ::hadoop::hdfs::HdfsFileStatusProto & fs); + static void DirectoryListingProtoToStatInfo(std::shared_ptr> stat_infos, const ::hadoop::hdfs::DirectoryListingProto & dl); + ::asio::io_service * io_service_; RpcEngine engine_; ClientNamenodeProtocol namenode_; @@ -105,6 +118,17 @@ public: &handler) override; Status Open(const std::string &path, FileHandle **handle) override; + void GetFileInfo( + const std::string &path, + const std::function &handler) override; + + Status GetFileInfo(const std::string &path, StatInfo & stat_info) override; + + void GetListing( + const std::string &path, + const std::function> &, bool)> &handler) override; + + Status GetListing(const std::string &path, std::shared_ptr> &stat_infos) override; virtual void GetBlockLocations(const std::string & path, const std::function locations)> ) override; @@ -150,6 +174,11 @@ private: * exposes implementation details that may change at any time. **/ std::shared_ptr event_handlers_; + + void GetListingShim(const Status &stat, std::shared_ptr> &stat_infos, bool has_more, + std::string path, + const std::function>&, bool)> &handler); + }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c index cff837c4b8c..0737d0816f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c @@ -78,8 +78,7 @@ void hdfsFileFreeReadStatistics(struct hdfsReadStatistics *stats) { } hdfsFS hdfsConnectAsUser(const char* nn, tPort port, const char *user) { - REPORT_FUNCTION_NOT_IMPLEMENTED - return NULL; + return (hdfsFS) libhdfspp_hdfsConnectAsUser(nn, port, user); } hdfsFS hdfsConnect(const char* nn, tPort port) { @@ -294,16 +293,16 @@ int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) { hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { - return (hdfsFileInfo *)libhdfs_hdfsListDirectory(fs->libhdfsRep, path, numEntries); + return (hdfsFileInfo *)libhdfspp_hdfsListDirectory(fs->libhdfsppRep, path, numEntries); } hdfsFileInfo *hdfsGetPathInfo(hdfsFS fs, const char* path) { - return (hdfsFileInfo *)libhdfs_hdfsGetPathInfo(fs->libhdfsRep, path); + return (hdfsFileInfo *)libhdfspp_hdfsGetPathInfo(fs->libhdfsppRep, path); } void hdfsFreeFileInfo(hdfsFileInfo *hdfsFileInfo, int numEntries) { - return libhdfs_hdfsFreeFileInfo - ((libhdfs_hdfsFileInfo *) hdfsFileInfo, numEntries); + return libhdfspp_hdfsFreeFileInfo + ((libhdfspp_hdfsFileInfo *) hdfsFileInfo, numEntries); } int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo) {