From 64f0e784907f1310bd5865767311fae4677506fe Mon Sep 17 00:00:00 2001 From: James Date: Tue, 24 Nov 2015 15:30:08 -0500 Subject: [PATCH] HDFS-9368. Implement reads with implicit offset state in libhdfs++. Contributed by James Clampffer. --- .../native/libhdfspp/lib/bindings/c/hdfs.cc | 101 +++++++++++++++--- .../libhdfspp/lib/bindings/c/hdfs_cpp.cc | 62 ++++++++++- .../libhdfspp/lib/bindings/c/hdfs_cpp.h | 22 +++- .../main/native/libhdfspp/lib/fs/filesystem.h | 2 +- .../native/libhdfspp/lib/fs/inputstream.cc | 2 + 5 files changed, 169 insertions(+), 20 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 9b985a932ad..853e9d3947a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -58,6 +58,43 @@ static void ReportError(int errnum, std::string msg) { #endif } +/* Convert Status wrapped error into appropriate errno and return code */ +static int Error(const Status &stat) { + int code = stat.code(); + switch (code) { + case Status::Code::kOk: + return 0; + case Status::Code::kInvalidArgument: + ReportError(EINVAL, "Invalid argument"); + break; + case Status::Code::kResourceUnavailable: + ReportError(EAGAIN, "Resource temporarily unavailable"); + break; + case Status::Code::kUnimplemented: + ReportError(ENOSYS, "Function not implemented"); + break; + case Status::Code::kException: + ReportError(EINTR, "Exception raised"); + break; + default: + ReportError(ENOSYS, "Error: unrecognised code"); + } + return -1; +} + +/* return false on failure */ +bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { + if (!fs) { + ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); + return false; + } + if (!file) { + ReportError(EBADF, "Cannot perform FS operations with null File handle."); + return false; + } + return true; +} + /** * C API implementations **/ @@ -110,28 +147,66 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, } int hdfsCloseFile(hdfsFS fs, hdfsFile file) { - if (!fs) { - ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); - return -1; - } - if (!file) { - ReportError(EBADF, "Cannot perform FS operations with null File handle."); + if (!CheckSystemAndHandle(fs, file)) { return -1; } + delete file; return 0; } tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, tSize length) { - if (!fs) { - ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); - return -1; - } - if (!file) { - ReportError(EBADF, "Cannot perform FS operations with null File handle."); + if (!CheckSystemAndHandle(fs, file)) { return -1; } - return file->get_impl()->Pread(buffer, length, position); + size_t len = length; + Status stat = file->get_impl()->Pread(buffer, &len, position); + if (!stat.ok()) { + return Error(stat); + } + return (tSize)len; +} + +tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) { + if (!CheckSystemAndHandle(fs, file)) { + return -1; + } + + size_t len = length; + Status stat = file->get_impl()->Read(buffer, &len); + if (!stat.ok()) { + return Error(stat); + } + + return (tSize)len; +} + +int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) { + if (!CheckSystemAndHandle(fs, file)) { + return -1; + } + + off_t desired = desiredPos; + Status stat = file->get_impl()->Seek(&desired, std::ios_base::beg); + if (!stat.ok()) { + return Error(stat); + } + + return (int)desired; +} + +tOffset hdfsTell(hdfsFS fs, hdfsFile file) { + if (!CheckSystemAndHandle(fs, file)) { + return -1; + } + + ssize_t offset = 0; + Status stat = file->get_impl()->Seek(&offset, std::ios_base::cur); + if (!stat.ok()) { + return Error(stat); + } + + return offset; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc index 702ed1f45a4..2795b7e7044 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc @@ -35,7 +35,9 @@ namespace hdfs { -ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) { +FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){}; + +Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) { auto stat = std::make_shared>(); std::future future(stat->get_future()); @@ -49,7 +51,7 @@ ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) { contacted_datanode = dn; }; - input_stream_->PositionRead(buf, nbyte, offset, callback); + input_stream_->PositionRead(buf, *nbyte, offset, callback); /* wait for async to finish */ auto s = future.get(); @@ -62,9 +64,61 @@ ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) { impl->bad_node_tracker_->AddBadNode(contacted_datanode); } - return -1; + return s; } - return (ssize_t)read_count; + *nbyte = (size_t)read_count; + return Status::OK(); +} + +Status FileHandle::Read(void *buf, size_t *nbyte) { + Status stat = Pread(buf, nbyte, offset_); + if (!stat.ok()) { + return stat; + } + + offset_ += *nbyte; + return Status::OK(); +} + +Status FileHandle::Seek(off_t *offset, std::ios_base::seekdir whence) { + off_t new_offset = -1; + + switch (whence) { + case std::ios_base::beg: + new_offset = *offset; + break; + case std::ios_base::cur: + new_offset = offset_ + *offset; + break; + case std::ios_base::end: + new_offset = static_cast(input_stream_.get()) + ->get_file_length() + + *offset; + break; + default: + /* unsupported */ + return Status::InvalidArgument("Invalid Seek whence argument"); + } + + if (!CheckSeekBounds(new_offset)) { + return Status::InvalidArgument("Seek offset out of bounds"); + } + offset_ = new_offset; + + *offset = offset_; + return Status::OK(); +} + +/* return false if seek will be out of bounds */ +bool FileHandle::CheckSeekBounds(ssize_t desired_position) { + ssize_t file_length = + static_cast(input_stream_.get())->get_file_length(); + + if (desired_position < 0 || desired_position >= file_length) { + return false; + } + + return true; } bool FileHandle::IsOpenForRead() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h index 0abe9f74dc7..b36ee8f92dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h @@ -24,6 +24,7 @@ #include #include #include +#include #include "libhdfspp/hdfs.h" #include "fs/bad_datanode_tracker.h" @@ -42,14 +43,31 @@ class HadoopFileSystem; class FileHandle { public: virtual ~FileHandle(){}; - ssize_t Pread(void *buf, size_t nbyte, off_t offset); + /** + * Note: The nbyte argument for Read and Pread as well as the + * offset argument for Seek are in/out parameters. + * + * For Read and Pread the value referenced by nbyte should + * be set to the number of bytes to read. Before returning + * the value referenced will be set by the callee to the number + * of bytes that was successfully read. + * + * For Seek the value referenced by offset should be the number + * of bytes to shift from the specified whence position. The + * referenced value will be set to the new offset before returning. + **/ + Status Pread(void *buf, size_t *nbyte, off_t offset); + Status Read(void *buf, size_t *nbyte); + Status Seek(off_t *offset, std::ios_base::seekdir whence); bool IsOpenForRead(); private: /* handle should only be created by fs */ friend class HadoopFileSystem; - FileHandle(InputStream *is) : input_stream_(is){}; + FileHandle(InputStream *is); + bool CheckSeekBounds(ssize_t desired_position); std::unique_ptr input_stream_; + off_t offset_; }; class HadoopFileSystem { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 5471bba35ba..dfe8b0c7721 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -72,7 +72,7 @@ class InputStreamImpl : public InputStream { const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, const MutableBufferSequence &buffers, const Handler &handler); - + uint64_t get_file_length() const; private: FileSystemImpl *fs_; unsigned long long file_length_; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc index c1628dd21c7..0b78c931866 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc @@ -43,4 +43,6 @@ void InputStreamImpl::PositionRead( handler) { AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler); } + +uint64_t InputStreamImpl::get_file_length() const { return file_length_; } }