diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/CMakeLists.txt index 76880cd1efc..6b680ccf2ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/CMakeLists.txt @@ -16,4 +16,5 @@ # limitations under the License. # -add_subdirectory(cat) +add_subdirectory(c) +add_subdirectory(cpp) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt similarity index 97% rename from hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/CMakeLists.txt rename to hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt index 93139ceb6de..76880cd1efc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/CMakeLists.txt @@ -16,4 +16,4 @@ # limitations under the License. # -add_subdirectory(c) +add_subdirectory(cat) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/cat/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/cat/CMakeLists.txt new file mode 100644 index 00000000000..1319293dcf6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/cat/CMakeLists.txt @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default LIBHDFSPP_DIR to the default install location. You can override +# it by add -DLIBHDFSPP_DIR=... to your cmake invocation +set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX}) + +include_directories( ${LIBHDFSPP_DIR}/include ) +link_directories( ${LIBHDFSPP_DIR}/lib ) + +add_executable(cat_c cat.c) +target_link_libraries(cat_c hdfspp uriparser2) + +# Several examples in different languages need to produce executables with +# same names. To allow executables with same names we keep their CMake +# names different, but specify their executable names as follows: +set_target_properties( cat_c + PROPERTIES + OUTPUT_NAME "cat" +) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/cat/cat.c similarity index 57% rename from hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c rename to hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/cat/cat.c index dec87586150..586e0a9718d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/cat.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/c/cat/cat.c @@ -22,68 +22,14 @@ Doesn't deal with any flags for now, will just attempt to read the whole file. */ -#include #include +#include #include - #include "hdfspp/hdfs_ext.h" +#include "uriparser2/uriparser2.h" +#include "common/util_c.h" #define SCHEME "hdfs" -#define MAX_STRING 1024 - -struct Uri { - int valid; - char host[MAX_STRING]; - int port; - char path[MAX_STRING]; -}; - -int min(int a, int b) { - return a < b ? a : b; -} - -void parse_uri(const char * uri_string, struct Uri * uri) { - uri->valid = 0; - uri->host[0] = 0; - uri->port = -1; - uri->path[0] = 0; - - // most start with hdfs scheme - const char * remaining; - const char * scheme_end = strstr(uri_string, "://"); - if (scheme_end != NULL) { - if (strncmp(uri_string, SCHEME, strlen(SCHEME)) != 0) - return; - - remaining = scheme_end + 3; - - // parse authority - const char * authority_end = strstr(remaining, "/"); - if (authority_end != NULL) { - char authority[MAX_STRING]; - strncpy(authority, remaining, min(authority_end - remaining, sizeof(authority))); - remaining = authority_end; - - char * host_port_separator = strstr(authority, ":"); - if (host_port_separator != NULL) { - errno = 0; - uri->port = strtol(host_port_separator + 1, NULL, 10); - if (errno != 0) - return; - - // Terminate authority at the new end of the host - *host_port_separator = 0; - } - strncpy(uri->host, authority, sizeof(uri->host)); - } - strncpy(uri->path, remaining, sizeof(uri->path)); - } else { - // Absolute path - strncpy(uri->path, uri_string, sizeof(uri->path)); - } - - uri->valid = 1; -}; int main(int argc, char** argv) { @@ -93,36 +39,46 @@ int main(int argc, char** argv) { return 1; } + URI * uri = NULL; const char * uri_path = argv[1]; - struct Uri uri; - parse_uri(uri_path, &uri); - if (!uri.valid) { - fprintf(stderr, "malformed URI: %s\n", uri_path); + + //Separate check for scheme is required, otherwise uriparser2.h library causes memory issues under valgrind + const char * scheme_end = strstr(uri_path, "://"); + if (scheme_end) { + if (strncmp(uri_path, SCHEME, strlen(SCHEME)) != 0) { + fprintf(stderr, "Scheme %.*s:// is not supported.\n", (int) (scheme_end - uri_path), uri_path); + return 1; + } else { + uri = uri_parse(uri_path); + } + } + if (!uri) { + fprintf(stderr, "Malformed URI: %s\n", uri_path); return 1; } struct hdfsBuilder* builder = hdfsNewBuilder(); - if (*uri.host != 0) - hdfsBuilderSetNameNode(builder, uri.host); - if (uri.port != -1) - hdfsBuilderSetNameNodePort(builder, uri.port); + if (uri->host) + hdfsBuilderSetNameNode(builder, uri->host); + if (uri->port != 0) + hdfsBuilderSetNameNodePort(builder, uri->port); hdfsFS fs = hdfsBuilderConnect(builder); if (fs == NULL) { hdfsGetLastError(error_text, sizeof(error_text)); - const char * host = uri.host[0] ? uri.host : ""; - int port = uri.port; - if (-1 == port) + const char * host = uri->host ? uri->host : ""; + int port = uri->port; + if (port == 0) port = 8020; fprintf(stderr, "Unable to connect to %s:%d, hdfsConnect returned null.\n%s\n", host, port, error_text); return 1; } - hdfsFile file = hdfsOpenFile(fs, uri.path, 0, 0, 0, 0); + hdfsFile file = hdfsOpenFile(fs, uri->path, 0, 0, 0, 0); if (NULL == file) { hdfsGetLastError(error_text, sizeof(error_text)); - fprintf(stderr, "Unable to open file %s: %s\n", uri.path, error_text ); + fprintf(stderr, "Unable to open file %s: %s\n", uri->path, error_text ); hdfsDisconnect(fs); hdfsFreeBuilder(builder); return 1; @@ -158,5 +114,8 @@ int main(int argc, char** argv) { } hdfsFreeBuilder(builder); + free(uri); + // Clean up static data and prevent valgrind memory leaks + ShutdownProtobufLibrary_C(); return 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt new file mode 100644 index 00000000000..76880cd1efc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_subdirectory(cat) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/CMakeLists.txt similarity index 75% rename from hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/CMakeLists.txt rename to hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/CMakeLists.txt index a2dc4a41857..9ec3332247b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cat/c/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/CMakeLists.txt @@ -23,5 +23,13 @@ set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX}) include_directories( ${LIBHDFSPP_DIR}/include ) link_directories( ${LIBHDFSPP_DIR}/lib ) -add_executable(cat cat.c) -target_link_libraries(cat hdfspp) +add_executable(cat_cpp cat.cpp) +target_link_libraries(cat_cpp hdfspp) + +# Several examples in different languages need to produce executables with +# same names. To allow executables with same names we keep their CMake +# names different, but specify their executable names as follows: +set_target_properties( cat_cpp + PROPERTIES + OUTPUT_NAME "cat" +) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp new file mode 100644 index 00000000000..bfab50717fd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp @@ -0,0 +1,123 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +/* + A a stripped down version of unix's "cat". + Doesn't deal with any flags for now, will just attempt to read the whole file. +*/ + +#include "hdfspp/hdfspp.h" +#include "common/hdfs_configuration.h" +#include "common/configuration_loader.h" +#include "common/uri.h" + +#include + +using namespace std; +using namespace hdfs; + +#define SCHEME "hdfs" + +int main(int argc, char *argv[]) { + if (argc != 2) { + cerr << "usage: cat [hdfs://[:]]/" << endl; + return 1; + } + + optional uri; + const string uri_path = argv[1]; + + //Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind + size_t scheme_end = uri_path.find("://"); + if (scheme_end != string::npos) { + if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) { + cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl; + return 1; + } else { + uri = URI::parse_from_string(uri_path); + } + } + if (!uri) { + cerr << "Malformed URI: " << uri_path << endl; + return 1; + } + + ConfigurationLoader loader; + optional config = loader.LoadDefaultResources(); + const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR"); + if (envHadoopConfDir && (*envHadoopConfDir != 0) ) { + config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml"); + } + + Options options; + if(config){ + options = config->GetOptions(); + } + + IoService * io_service = IoService::New(); + + FileSystem *fs_raw = FileSystem::New(io_service, "", options); + if (!fs_raw) { + cerr << "Could not create FileSystem object" << endl; + return 1; + } + //wrapping fs_raw into a unique pointer to guarantee deletion + unique_ptr fs(fs_raw); + + Status stat = fs->Connect(uri->get_host(), to_string(*(uri->get_port()))); + if (!stat.ok()) { + cerr << "Could not connect to " << uri->get_host() << ":" << *(uri->get_port()) << endl; + return 1; + } + + FileHandle *file_raw = nullptr; + stat = fs->Open(uri->get_path(), &file_raw); + if (!stat.ok()) { + cerr << "Could not open file " << uri->get_path() << endl; + return 1; + } + //wrapping file_raw into a unique pointer to guarantee deletion + unique_ptr file(file_raw); + + char input_buffer[4096]; + ssize_t read_bytes_count = 0; + size_t last_read_bytes = 0; + + do{ + //Reading file chunks + Status stat = file->PositionRead(input_buffer, sizeof(input_buffer), read_bytes_count, &last_read_bytes); + if(stat.ok()) { + //Writing file chunks to stdout + fwrite(input_buffer, last_read_bytes, 1, stdout); + read_bytes_count += last_read_bytes; + } else { + if(stat.is_invalid_offset()){ + //Reached the end of the file + break; + } else { + cerr << "Error reading the file: " << stat.ToString() << endl; + return 1; + } + } + } while (last_read_bytes > 0); + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index a44df74378d..20a651a0dce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -82,18 +82,18 @@ public: * stops at the block boundary. * * @param buf the pointer to the buffer - * @param nbyte the size of the buffer + * @param buf_size the size of the buffer * @param offset the offset the file * * The handler returns the datanode that serves the block and the number of - * bytes has read. + * bytes has read. Status::InvalidOffset is returned when trying to begin + * a read past the EOF. **/ virtual void - PositionRead(void *buf, size_t nbyte, uint64_t offset, + PositionRead(void *buf, size_t buf_size, uint64_t offset, const std::function &handler) = 0; - - virtual Status PositionRead(void *buf, size_t *nbyte, off_t offset) = 0; - virtual Status Read(void *buf, size_t *nbyte) = 0; + virtual Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) = 0; + virtual Status Read(void *buf, size_t buf_size, size_t *bytes_read) = 0; virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h index f217cad96c0..0187786f667 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h @@ -45,10 +45,13 @@ class Status { static Status AuthenticationFailed(); static Status Canceled(); static Status PathNotFound(const char *msg); + static Status InvalidOffset(const char *msg); // success bool ok() const { return code_ == 0; } + bool is_invalid_offset() const { return code_ == kInvalidOffset; } + // Returns the string "OK" for success. std::string ToString() const; @@ -73,6 +76,7 @@ class Status { kAccessControlException = 258, kStandbyException = 259, kSnapshotProtocolException = 260, + kInvalidOffset = 261, }; std::string get_exception_class_str() const { diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index a42feae0b65..be57a7e79c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -173,6 +173,10 @@ static int Error(const Status &stat) { errnum = ENOTEMPTY; default_message = "Directory is not empty"; break; + case Status::Code::kInvalidOffset: + errnum = Status::Code::kInvalidOffset; + default_message = "Trying to begin a read past the EOF"; + break; default: errnum = ENOSYS; default_message = "Error: unrecognised code"; @@ -754,8 +758,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, return -1; } - size_t len = length; - Status stat = file->get_impl()->PositionRead(buffer, &len, position); + size_t len = 0; + Status stat = file->get_impl()->PositionRead(buffer, length, position, &len); if(!stat.ok()) { return Error(stat); } @@ -775,8 +779,8 @@ tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) { return -1; } - size_t len = length; - Status stat = file->get_impl()->Read(buffer, &len); + size_t len = 0; + Status stat = file->get_impl()->Read(buffer, length, &len); if (!stat.ok()) { return Error(stat); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc index 796b1a2a862..b351900c753 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/status.cc @@ -127,6 +127,10 @@ Status Status::Canceled() { return Status(kOperationCanceled, "Operation canceled"); } +Status Status::InvalidOffset(const char *msg){ + return Status(kInvalidOffset, msg); +} + std::string Status::ToString() const { if (code_ == kOk) { return "OK"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc index 6fd5a7c3c52..7bb1e300802 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -17,6 +17,7 @@ */ #include "common/util.h" +#include "common/util_c.h" #include #include @@ -107,3 +108,7 @@ std::string SafeDisconnect(asio::ip::tcp::socket *sock) { } } + +void ShutdownProtobufLibrary_C() { + google::protobuf::ShutdownProtobufLibrary(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h new file mode 100644 index 00000000000..c7db7d26bcc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util_c.h @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_COMMON_UTIL_C_H_ +#define LIB_COMMON_UTIL_C_H_ + +#ifdef __cplusplus +extern "C" { +#endif + + void ShutdownProtobufLibrary_C(); + +#ifdef __cplusplus +} /* end extern "C" */ +#endif + +#endif diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index 40f1b4a957d..f40b81c8863 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -48,11 +48,11 @@ FileHandleImpl::FileHandleImpl(const std::string & cluster_name, } void FileHandleImpl::PositionRead( - void *buf, size_t nbyte, uint64_t offset, + void *buf, size_t buf_size, uint64_t offset, const std::function &handler) { LOG_TRACE(kFileHandle, << "FileHandleImpl::PositionRead(" << FMT_THIS_ADDR << ", buf=" << buf - << ", nbyte=" << nbyte << ") called"); + << ", buf_size=" << buf_size << ") called"); /* prevent usage after cancelation */ if(cancel_state_->is_canceled()) { @@ -71,13 +71,14 @@ void FileHandleImpl::PositionRead( handler(status, bytes_read); }; - AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, callback); + AsyncPreadSome(offset, asio::buffer(buf, buf_size), bad_node_tracker_, callback); } -Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) { +Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) { LOG_TRACE(kFileHandle, << "FileHandleImpl::[sync]PositionRead(" << FMT_THIS_ADDR << ", buf=" << buf - << ", nbyte=" << *nbyte << ") called"); + << ", buf_size=" << buf_size + << ", offset=" << offset << ") called"); auto callstate = std::make_shared>>(); std::future> future(callstate->get_future()); @@ -87,7 +88,7 @@ Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) { callstate->set_value(std::make_tuple(s,bytes)); }; - PositionRead(buf, *nbyte, offset, callback); + PositionRead(buf, buf_size, offset, callback); /* wait for async to finish */ auto returnstate = future.get(); @@ -97,21 +98,21 @@ Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) { return stat; } - *nbyte = std::get<1>(returnstate); + *bytes_read = std::get<1>(returnstate); return stat; } -Status FileHandleImpl::Read(void *buf, size_t *nbyte) { +Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) { LOG_TRACE(kFileHandle, << "FileHandleImpl::Read(" << FMT_THIS_ADDR << ", buf=" << buf - << ", nbyte=" << *nbyte << ") called"); + << ", buf_size=" << buf_size << ") called"); - Status stat = PositionRead(buf, nbyte, offset_); + Status stat = PositionRead(buf, buf_size, offset_, bytes_read); if(!stat.ok()) { return stat; } - offset_ += *nbyte; + offset_ += *bytes_read; return Status::OK(); } @@ -179,6 +180,11 @@ void FileHandleImpl::AsyncPreadSome( return; } + if(offset >= file_info_->file_length_){ + handler(Status::InvalidOffset("AsyncPreadSome: trying to begin a read past the EOF"), "", 0); + return; + } + /** * Note: block and chosen_dn will end up pointing to things inside * the blocks_ vector. They shouldn't be directly deleted. diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 57cf4b77a92..38c1fec710b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -59,32 +59,27 @@ public: std::shared_ptr event_handlers); /* - * [Some day reliably] Reads a particular offset into the data file. - * On error, bytes_read returns the number of bytes successfully read; on - * success, bytes_read will equal nbyte + * Reads the file at the specified offset into the buffer. + * bytes_read returns the number of bytes successfully read on success + * and on error. Status::InvalidOffset is returned when trying to begin + * a read past the EOF. */ void PositionRead( void *buf, - size_t nbyte, + size_t buf_size, uint64_t offset, const std::function &handler ) override; /** - * Note: The nbyte argument for Read and Pread as well as the - * offset argument for Seek are in/out parameters. - * - * For Read and Pread the value referenced by nbyte should - * be set to the number of bytes to read. Before returning - * the value referenced will be set by the callee to the number - * of bytes that was successfully read. - * - * For Seek the value referenced by offset should be the number - * of bytes to shift from the specified whence position. The - * referenced value will be set to the new offset before returning. - **/ - Status PositionRead(void *buf, size_t *bytes_read, off_t offset) override; - Status Read(void *buf, size_t *nbyte) override; + * Reads the file at the specified offset into the buffer. + * @param buf output buffer + * @param buf_size size of the output buffer + * @param offset offset at which to start reading + * @param bytes_read number of bytes successfully read + */ + Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) override; + Status Read(void *buf, size_t buf_size, size_t *bytes_read) override; Status Seek(off_t *offset, std::ios_base::seekdir whence) override; @@ -95,6 +90,7 @@ public: * If an error occurs during connection or transfer, the callback will be * called with bytes_read equal to the number of bytes successfully transferred. * If no data nodes can be found, status will be Status::ResourceUnavailable. + * If trying to begin a read past the EOF, status will be Status::InvalidOffset. * */ void AsyncPreadSome(size_t offset, const MutableBuffers &buffers, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 9e3aeb74f35..51854847371 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -113,6 +113,7 @@ protected: TEST(BadDataNodeTest, TestNoNodes) { auto file_info = std::make_shared(); + file_info->file_length_ = 1; //To avoid running into EOF file_info->blocks_.push_back(LocatedBlockProto()); LocatedBlockProto & block = file_info->blocks_[0]; ExtendedBlockProto *b = block.mutable_b(); @@ -152,6 +153,7 @@ TEST(BadDataNodeTest, TestNoNodes) { TEST(BadDataNodeTest, NNEventCallback) { auto file_info = std::make_shared(); + file_info->file_length_ = 1; //To avoid running into EOF file_info->blocks_.push_back(LocatedBlockProto()); LocatedBlockProto & block = file_info->blocks_[0]; ExtendedBlockProto *b = block.mutable_b(); @@ -215,6 +217,7 @@ TEST(BadDataNodeTest, NNEventCallback) { TEST(BadDataNodeTest, RecoverableError) { auto file_info = std::make_shared(); + file_info->file_length_ = 1; //To avoid running into EOF file_info->blocks_.push_back(LocatedBlockProto()); LocatedBlockProto & block = file_info->blocks_[0]; ExtendedBlockProto *b = block.mutable_b(); @@ -265,6 +268,7 @@ TEST(BadDataNodeTest, RecoverableError) { TEST(BadDataNodeTest, InternalError) { auto file_info = std::make_shared(); + file_info->file_length_ = 1; //To avoid running into EOF file_info->blocks_.push_back(LocatedBlockProto()); LocatedBlockProto & block = file_info->blocks_[0]; ExtendedBlockProto *b = block.mutable_b(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc index f154be8fcf4..e28e4e9e380 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc @@ -16,8 +16,6 @@ * limitations under the License. */ -//#include "expect.h" - #include "hdfspp_mini_dfs.h" #include "hdfspp/hdfs_ext.h" #include @@ -293,6 +291,41 @@ TEST_F(HdfsExtTest, TestChmodChown) { hdfsFreeFileInfo(file_info, 1); } +//Testing EOF +TEST_F(HdfsExtTest, TestEOF) { + HdfsHandle connection = cluster.connect_c(); + hdfsFS fs = connection.handle(); + EXPECT_NE(nullptr, fs); + + //Write to a file + errno = 0; + int size = 256; + std::string path = "/eofTest"; + hdfsFile file = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0); + EXPECT_NE(nullptr, file); + void * buf = malloc(size); + memset(buf, ' ', size); + EXPECT_EQ(size, hdfsWrite(fs, file, buf, size)); + free(buf); + EXPECT_EQ(0, hdfsCloseFile(fs, file)); + EXPECT_EQ(0, errno); + + //Test normal reading (no EOF) + char buffer[300]; + EXPECT_EQ(0, errno); + file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0); + EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer))); + //Read executes correctly, but causes a warning (captured in HDFS-10595) + //and sets errno to EINPROGRESS 115 : Operation now in progress + errno = 0; + + //Test reading at offset past the EOF + EXPECT_EQ(-1, hdfsPread(fs, file, sizeof(buffer), buffer, sizeof(buffer))); + EXPECT_EQ(Status::kInvalidOffset, errno); + + EXPECT_EQ(0, hdfsCloseFile(fs, file)); + EXPECT_EQ(0, errno); +} }