diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/block_location.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/block_location.h new file mode 100644 index 00000000000..cbe34be3642 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/block_location.h @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef HDFSPP_BLOCK_LOCATION_H +#define HDFSPP_BLOCK_LOCATION_H + +namespace hdfs { + +class DNInfo { +public: + DNInfo() : xfer_port(-1), info_port(-1), IPC_port(-1), info_secure_port(-1) {} + + std::string getHostname() const { + return hostname; + } + + void setHostname(const std::string & hostname) { + this->hostname = hostname; + } + + std::string getIPAddr() const { + return ip_addr; + } + + void setIPAddr(const std::string & ip_addr) { + this->ip_addr = ip_addr; + } + + int getXferPort() const { + return xfer_port; + } + + void setXferPort(int xfer_port) { + this->xfer_port = xfer_port; + } + + int getInfoPort() const { + return info_port; + } + + void setInfoPort(int info_port) { + this->info_port = info_port; + } + + int getIPCPort() const { + return IPC_port; + } + + void setIPCPort(int IPC_port) { + this->IPC_port = IPC_port; + } + + int getInfoSecurePort() const { + return info_secure_port; + } + + void setInfoSecurePort(int info_secure_port) { + this->info_secure_port = info_secure_port; + } +private: + std::string hostname; + std::string ip_addr; + int xfer_port; + int info_port; + int IPC_port; + int info_secure_port; +}; + +class BlockLocation { +public: + bool isCorrupt() const { + return corrupt; + } + + void setCorrupt(bool corrupt) { + this->corrupt = corrupt; + } + + int64_t getLength() const { + return length; + } + + void setLength(int64_t length) { + this->length = length; + } + + int64_t getOffset() const { + return offset; + } + + void setOffset(int64_t offset) { + this->offset = offset; + } + + const std::vector & getDataNodes() const { + return dn_info; + } + + void setDataNodes(const std::vector & dn_info) { + this->dn_info = dn_info; + } + +private: + bool corrupt; + int64_t length; + int64_t offset; // Offset of the block in the file + std::vector dn_info; // Info about who stores each block +}; + +class FileBlockLocation { +public: + uint64_t getFileLength() { + return fileLength; + } + + void setFileLength(uint64_t fileLength) { + this->fileLength = fileLength; + } + + bool isLastBlockComplete() const { + return this->lastBlockComplete; + } + + void setLastBlockComplete(bool lastBlockComplete) { + this->lastBlockComplete = lastBlockComplete; + } + + bool isUnderConstruction() const { + return underConstruction; + } + + void setUnderConstruction(bool underConstruction) { + this->underConstruction = underConstruction; + } + + const std::vector & getBlockLocations() const { + return blockLocations; + } + + void setBlockLocations(const std::vector & blockLocations) { + this->blockLocations = blockLocations; + } +private: + uint64_t fileLength; + bool lastBlockComplete; + bool underConstruction; + std::vector blockLocations; +}; + +} // namespace hdfs + + +#endif /* HDFSPP_BLOCK_LOCATION_H */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index 1e1e6aac715..4e2fefd9c00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -115,6 +115,63 @@ LIBHDFS_EXTERNAL int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val); +/** + * Returns the block information and data nodes associated with a particular file. + * + * The hdfsBlockLocations structure will have zero or more hdfsBlockInfo elements, + * which will have zero or more ip_addr elements indicating which datanodes have + * each block. + * + * @param fs A connected hdfs instance + * @param path Path of the file to query + * @param locations The address of an output pointer to contain the block information. + * On success, this pointer must be later freed with hdfsFreeBlockLocations. + * + * @return 0 on success; nonzero error code otherwise. + * If the file does not exist, an error will be returned. + */ +struct hdfsDNInfo { + const char * ip_address; + const char * hostname; + int xfer_port; + int info_port; + int IPC_port; + int info_secure_port; +}; + +struct hdfsBlockInfo { + uint64_t start_offset; + uint64_t num_bytes; + + size_t num_locations; + struct hdfsDNInfo * locations; +}; + +struct hdfsBlockLocations +{ + uint64_t fileLength; + int isLastBlockComplete; + int isUnderConstruction; + + size_t num_blocks; + struct hdfsBlockInfo * blocks; +}; + +LIBHDFS_EXTERNAL +int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations); + +/** + * Frees up an hdfsBlockLocations pointer allocated by hdfsGetBlockLocations. + * + * @param locations The previously-populated pointer allocated by hdfsGetBlockLocations + * @return 0 on success, nonzero on error + */ +LIBHDFS_EXTERNAL +int hdfsFreeBlockLocations(struct hdfsBlockLocations * locations); + + + + /** * Client can supply a C style function pointer to be invoked any time something * is logged. Unlike the C++ logger this will not filter by level or component, diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 674dc4a25e0..1e4455a87b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -21,6 +21,7 @@ #include "hdfspp/options.h" #include "hdfspp/status.h" #include "hdfspp/events.h" +#include "hdfspp/block_location.h" #include #include @@ -168,6 +169,16 @@ class FileSystem { const std::function &handler) = 0; virtual Status Open(const std::string &path, FileHandle **handle) = 0; + /** + * Returns the locations of all known blocks for the indicated file, or an error + * if the information clould not be found + */ + virtual void GetBlockLocations(const std::string & path, + const std::function locations)> ) = 0; + virtual Status GetBlockLocations(const std::string & path, + std::shared_ptr * locations) = 0; + + /** * Note that it is an error to destroy the filesystem from within a filesystem * callback. It will lead to a deadlock and the termination of the process. diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index cc0d9642435..8a73c41dc7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -167,12 +167,20 @@ static int ReportCaughtNonException() return Error(Status::Exception("Uncaught value not derived from std::exception", "")); } -/* return false on failure */ -bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { +bool CheckSystem(hdfsFS fs) { if (!fs) { ReportError(ENODEV, "Cannot perform FS operations with null FS handle."); return false; } + + return true; +} + +/* return false on failure */ +bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { + if (!CheckSystem(fs)) + return false; + if (!file) { ReportError(EBADF, "Cannot perform FS operations with null File handle."); return false; @@ -410,6 +418,92 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) { } +int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out) +{ + try + { + if (!CheckSystem(fs)) { + return -1; + } + if (locations_out == nullptr) { + ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations"); + return -2; + } + + std::shared_ptr ppLocations; + Status stat = fs->get_impl()->GetBlockLocations(path, &ppLocations); + if (!stat.ok()) { + return Error(stat); + } + + hdfsBlockLocations *locations = new struct hdfsBlockLocations(); + (*locations_out) = locations; + + bzero(locations, sizeof(*locations)); + locations->fileLength = ppLocations->getFileLength(); + locations->isLastBlockComplete = ppLocations->isLastBlockComplete(); + locations->isUnderConstruction = ppLocations->isUnderConstruction(); + + const std::vector & ppBlockLocations = ppLocations->getBlockLocations(); + locations->num_blocks = ppBlockLocations.size(); + locations->blocks = new struct hdfsBlockInfo[locations->num_blocks]; + for (size_t i=0; i < ppBlockLocations.size(); i++) { + auto ppBlockLocation = ppBlockLocations[i]; + auto block = &locations->blocks[i]; + + block->num_bytes = ppBlockLocation.getLength(); + block->start_offset = ppBlockLocation.getOffset(); + + const std::vector & ppDNInfos = ppBlockLocation.getDataNodes(); + block->num_locations = ppDNInfos.size(); + block->locations = new hdfsDNInfo[block->num_locations]; + for (size_t j=0; j < block->num_locations; j++) { + auto ppDNInfo = ppDNInfos[j]; + auto dn_info = &block->locations[j]; + + dn_info->xfer_port = ppDNInfo.getXferPort(); + dn_info->info_port = ppDNInfo.getInfoPort(); + dn_info->IPC_port = ppDNInfo.getIPCPort(); + dn_info->info_secure_port = ppDNInfo.getInfoSecurePort(); + + char * buf; + buf = new char[ppDNInfo.getHostname().size() + 1]; + strncpy(buf, ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size()); + dn_info->hostname = buf; + + buf = new char[ppDNInfo.getIPAddr().size() + 1]; + strncpy(buf, ppDNInfo.getIPAddr().c_str(), ppDNInfo.getIPAddr().size()); + dn_info->ip_address = buf; + } + } + + return 0; + } catch (const std::exception & e) { + return ReportException(e); + } catch (...) { + return ReportCaughtNonException(); + } +} + +int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) { + if (blockLocations == nullptr) + return 0; + + for (size_t i=0; i < blockLocations->num_blocks; i++) { + auto block = &blockLocations->blocks[i]; + for (size_t j=0; j < block->num_locations; j++) { + auto location = &block->locations[j]; + delete[] location->hostname; + delete[] location->ip_address; + } + } + delete[] blockLocations->blocks; + delete blockLocations; + + return 0; +} + + /******************************************************************* * EVENT CALLBACKS *******************************************************************/ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index 8530ffa6263..d2e23b8461d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -97,12 +97,15 @@ void NameNodeOperations::GetBlockLocations(const std::string & path, auto locations = s.resp->locations(); file_info->file_length_ = locations.filelength(); + file_info->last_block_complete_ = locations.islastblockcomplete(); + file_info->under_construction_ = locations.underconstruction(); for (const auto &block : locations.blocks()) { file_info->blocks_.push_back(block); } - if (locations.has_lastblock() && locations.lastblock().b().numbytes()) { + if (!locations.islastblockcomplete() && + locations.has_lastblock() && locations.lastblock().b().numbytes()) { file_info->blocks_.push_back(locations.lastblock()); file_info->file_length_ += locations.lastblock().b().numbytes(); } @@ -336,6 +339,102 @@ Status FileSystemImpl::Open(const std::string &path, return stat; } + +BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock) +{ + BlockLocation result; + + result.setCorrupt(locatedBlock.corrupt()); + result.setOffset(locatedBlock.offset()); + + std::vector dn_info; + dn_info.reserve(locatedBlock.locs_size()); + for (const hadoop::hdfs::DatanodeInfoProto & datanode_info: locatedBlock.locs()) { + const hadoop::hdfs::DatanodeIDProto &id = datanode_info.id(); + DNInfo newInfo; + if (id.has_ipaddr()) + newInfo.setIPAddr(id.ipaddr()); + if (id.has_hostname()) + newInfo.setHostname(id.hostname()); + if (id.has_xferport()) + newInfo.setXferPort(id.xferport()); + if (id.has_infoport()) + newInfo.setInfoPort(id.infoport()); + if (id.has_ipcport()) + newInfo.setIPCPort(id.ipcport()); + if (id.has_infosecureport()) + newInfo.setInfoSecurePort(id.infosecureport()); + dn_info.push_back(newInfo); + } + result.setDataNodes(dn_info); + + if (locatedBlock.has_b()) { + const hadoop::hdfs::ExtendedBlockProto & b=locatedBlock.b(); + result.setLength(b.numbytes()); + } + + + return result; +} + +void FileSystemImpl::GetBlockLocations(const std::string & path, + const std::function locations)> handler) +{ + auto conversion = [handler](const Status & status, std::shared_ptr fileInfo) { + if (status.ok()) { + auto result = std::make_shared(); + + result->setFileLength(fileInfo->file_length_); + result->setLastBlockComplete(fileInfo->last_block_complete_); + result->setUnderConstruction(fileInfo->under_construction_); + + std::vector blocks; + for (const hadoop::hdfs::LocatedBlockProto & locatedBlock: fileInfo->blocks_) { + auto newLocation = LocatedBlockToBlockLocation(locatedBlock); + blocks.push_back(newLocation); + } + result->setBlockLocations(blocks); + + handler(status, result); + } else { + handler(status, std::shared_ptr()); + } + }; + + nn_.GetBlockLocations(path, conversion); +} + +Status FileSystemImpl::GetBlockLocations(const std::string & path, + std::shared_ptr * fileBlockLocations) +{ + if (!fileBlockLocations) + return Status::InvalidArgument("Null pointer passed to GetBlockLocations"); + + auto callstate = std::make_shared>>>(); + std::future>> future(callstate->get_future()); + + /* wrap async call with promise/future to make it blocking */ + auto callback = [callstate](const Status &s, std::shared_ptr blockInfo) { + callstate->set_value(std::make_tuple(s,blockInfo)); + }; + + GetBlockLocations(path, callback); + + /* wait for async to finish */ + auto returnstate = future.get(); + auto stat = std::get<0>(returnstate); + + if (!stat.ok()) { + return stat; + } + + *fileBlockLocations = std::get<1>(returnstate); + + return stat; +} + + + void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) { // It is far too easy to destroy the filesystem (and thus the threadpool) // from within one of the worker threads, leading to a deadlock. Let's diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index ca233d1d7a5..0a479a61186 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -106,6 +106,12 @@ public: Status Open(const std::string &path, FileHandle **handle) override; + virtual void GetBlockLocations(const std::string & path, + const std::function locations)> ) override; + virtual Status GetBlockLocations(const std::string & path, + std::shared_ptr * locations) override; + + void SetFsEventCallback(fs_event_callback callback) override; /* add a new thread to handle asio requests, return number of threads in pool diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h index ad10165d5e5..2fb42a61ec0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h @@ -28,6 +28,8 @@ namespace hdfs { */ struct FileInfo { unsigned long long file_length_; + bool under_construction_; + bool last_block_complete_; std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; }; diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 5018afd434c..b30afb9cdc1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -24,7 +24,7 @@ set (LIBHDFSPP_LIB_DIR ${LIBHDFSPP_SRC_DIR}/lib) set (LIBHDFSPP_BINDING_C ${LIBHDFSPP_LIB_DIR}/bindings/c) include_directories( ${GENERATED_JAVAH} - ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_LIST_DIR} ${CMAKE_BINARY_DIR} ${JNI_INCLUDE_DIRS} ${LIBHDFS_SRC_DIR}/include @@ -54,6 +54,11 @@ function(add_memcheck_test name binary) endif() endfunction(add_memcheck_test) +# +# +# UNIT TESTS - TEST SELECTED PARTS OF THE LIBRARY +# +# add_executable(uri_test uri_test.cc) target_link_libraries(uri_test common gmock_main ${CMAKE_THREAD_LIBS_INIT}) @@ -96,16 +101,6 @@ add_executable(hdfspp_errors_test hdfspp_errors.cc) target_link_libraries(hdfspp_errors_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(hdfspp_errors hdfspp_errors_test) -#This test requires a great deal of Hadoop Java infrastructure to run. -if(HADOOP_BUILD) -add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc) - -build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c) -link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY}) -add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static) - -endif(HADOOP_BUILD) - add_executable(hdfs_builder_test hdfs_builder_test.cc) target_link_libraries(hdfs_builder_test test_common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(hdfs_builder_test hdfs_builder_test) @@ -113,3 +108,38 @@ add_memcheck_test(hdfs_builder_test hdfs_builder_test) add_executable(logging_test logging_test.cc) target_link_libraries(logging_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_memcheck_test(logging_test logging_test) + +# +# +# INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS +# +# +# This test requires a great deal of Hadoop Java infrastructure to run. +# +if(HADOOP_BUILD) + +include_directories ( + #TODO: Put this in a variable up top and pull it out here + ${CMAKE_CURRENT_SOURCE_DIR}/../../libhdfs-tests/ +) + +add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc) + +# TODO: get all of the mini dfs library bits here in one plase +# add_library(hdfspp_mini_cluster native_mini_dfs ${JAVA_JVM_LIBRARY} ) + +#TODO: Link against full library rather than just parts + +build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c) +link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES} ) +add_libhdfs_test (libhdfs_threaded hdfspp_test_shim_static) + +build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfspp_mini_dfs_smoke.cc) +link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) +add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static) + +build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc) +link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES}) +add_libhdfs_test (hdfs_ext hdfspp_test_shim_static) + +endif(HADOOP_BUILD) diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc new file mode 100644 index 00000000000..e660fcc1b72 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_ext_test.cc @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//#include "expect.h" + +#include "hdfspp_mini_dfs.h" +#include "hdfspp/hdfs_ext.h" + + +namespace hdfs { + +class HdfsExtTest: public ::testing::Test { +public: + MiniCluster cluster; +}; + +// Make sure we can set up a mini-cluster and connect to it +TEST_F(HdfsExtTest, TestGetBlockLocations) { + HdfsHandle connection = cluster.connect_c(); + EXPECT_NE(nullptr, connection.handle()); + + hdfsBlockLocations * blocks = nullptr; + + // Free a null pointer + int result = hdfsFreeBlockLocations(blocks); + EXPECT_EQ(0, result); + + // Test non-extant files + result = hdfsGetBlockLocations(connection, "non_extant_file", &blocks); + EXPECT_NE(0, result); // Should be an error + + // Test an extant file + std::string filename = connection.newFile(1024); + result = hdfsGetBlockLocations(connection, filename.c_str(), &blocks); + EXPECT_EQ(0, result); + + EXPECT_EQ(1024, blocks->fileLength); + EXPECT_EQ(1, blocks->num_blocks); + EXPECT_EQ(0, blocks->isUnderConstruction); + EXPECT_NE(0, blocks->isLastBlockComplete); + EXPECT_EQ(1024, blocks->blocks->num_bytes); + EXPECT_EQ(0, blocks->blocks->start_offset); + EXPECT_EQ(1, blocks->blocks->num_locations); + EXPECT_NE(nullptr, blocks->blocks->locations->hostname); + EXPECT_NE(nullptr, blocks->blocks->locations->ip_address); + EXPECT_NE(0, blocks->blocks->locations->xfer_port); + + result = hdfsFreeBlockLocations(blocks); + EXPECT_EQ(0, result); + +} + + +} + + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int exit_code = RUN_ALL_TESTS(); + google::protobuf::ShutdownProtobufLibrary(); + + return exit_code; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c index 28540476aa9..cff837c4b8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_shim.c @@ -311,7 +311,7 @@ int hdfsFileIsEncrypted(hdfsFileInfo *hdfsFileInfo) { ((libhdfs_hdfsFileInfo *) hdfsFileInfo); } -char*** hdfsGetHosts(hdfsFS fs, const char* path, +char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { return libhdfs_hdfsGetHosts(fs->libhdfsRep, path, start, length); } @@ -383,3 +383,25 @@ const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer) { void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer) { return libhdfs_hadoopRzBufferFree(file->libhdfsRep, buffer); } + + +/************* + * hdfs_ext functions + */ + +void hdfsGetLastError(char *buf, int len) { + return libhdfspp_hdfsGetLastError(buf, len); +} + +int hdfsCancel(hdfsFS fs, hdfsFile file) { + return libhdfspp_hdfsCancel(fs->libhdfsppRep, file->libhdfsppRep); +} + + +int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations) { + return libhdfspp_hdfsGetBlockLocations(fs->libhdfsppRep, path, locations); +} + +int hdfsFreeBlockLocations(struct hdfsBlockLocations * locations) { + return libhdfspp_hdfsFreeBlockLocations(locations); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h new file mode 100644 index 00000000000..3ec58e1ee7b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs.h @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfs/hdfs.h" +#include "hdfspp/hdfspp.h" +#include + +#include +#include + +#include +#include + +#define TO_STR_HELPER(X) #X +#define TO_STR(X) TO_STR_HELPER(X) + +#define TEST_BLOCK_SIZE 134217728 + +namespace hdfs { + + +static std::atomic dirnum; +static std::atomic filenum; + + +class FSHandle { +public: + FSHandle() : fs(nullptr) {} + FSHandle(FileSystem * fs_in) : fs(fs_in) {} + + + FileSystem * handle() { return fs.get(); } + operator FileSystem *() { return fs.get(); } +protected: + std::shared_ptr fs; +}; + + +/** + * For tests going through the C API to libhdfs++ + */ + +class HdfsHandle { +public: + HdfsHandle() : fs(nullptr) { + } + + HdfsHandle(hdfsFS fs_in) : fs(fs_in) { + } + + ~HdfsHandle () { + if (fs) { + EXPECT_EQ(0, hdfsDisconnect(fs)); + } + } + + std::string newDir(const std::string & parent_dir = "/") { + int newDirNum = dirnum++; + + std::string path = parent_dir; + if (path.back() != '/') + path += "/"; + path += "dir" + std::to_string(newDirNum) + "/"; + + EXPECT_EQ(0, hdfsCreateDirectory(*this, path.c_str())); + return path; + } + + std::string newFile(const std::string & dir = "/", size_t size = 1024) { + int newFileNum = filenum++; + + std::string path = dir; + if (path.back() != '/') + path += "/"; + path += "file" + std::to_string(newFileNum); + + hdfsFile file = hdfsOpenFile(*this, path.c_str(), O_WRONLY, 0, 0, 0); + EXPECT_NE(nullptr, file); + void * buf = malloc(size); + bzero(buf, size); + EXPECT_EQ(1024, hdfsWrite(*this, file, buf, size)); + EXPECT_EQ(0, hdfsCloseFile(*this, file)); + free(buf); + + return path; + } + + std::string newFile(size_t size) { + return newFile("/", size); + } + + hdfsFS handle() { return fs; } + operator hdfsFS() { return fs; } +private: + hdfsFS fs; +}; + + +class MiniCluster { +public: + MiniCluster() : io_service(IoService::New()) { + struct NativeMiniDfsConf conf = { + 1, /* doFormat */ + 0, /* webhdfs */ + -1, /* webhdfs port */ + 1 /* shortcircuit */ + }; + clusterInfo = nmdCreate(&conf); + EXPECT_NE(nullptr, clusterInfo); + EXPECT_EQ(0, nmdWaitClusterUp(clusterInfo)); + + //TODO: Write some files for tests to read/check + } + + virtual ~MiniCluster() { + if (clusterInfo) { + EXPECT_EQ(0, nmdShutdown(clusterInfo)); + } + nmdFree(clusterInfo); + } + + // Connect via the C++ API + FSHandle connect(const std::string username) { + Options options; + FileSystem * fs = FileSystem::New(io_service, username, options); + EXPECT_NE(nullptr, fs); + FSHandle result(fs); + + tPort port = (tPort)nmdGetNameNodePort(clusterInfo); + EXPECT_NE(0, port); + Status status = fs->Connect("localhost", std::to_string(port)); + EXPECT_EQ(true, status.ok()); + return result; + } + + FSHandle connect() { + return connect(""); + } + + // Connect via the C API + HdfsHandle connect_c(const std::string & username) { + tPort port; + hdfsFS hdfs; + struct hdfsBuilder *bld; + + port = (tPort)nmdGetNameNodePort(clusterInfo); + bld = hdfsNewBuilder(); + EXPECT_NE(nullptr, bld); + hdfsBuilderSetForceNewInstance(bld); + hdfsBuilderSetNameNode(bld, "localhost"); + hdfsBuilderSetNameNodePort(bld, port); + hdfsBuilderConfSetStr(bld, "dfs.block.size", + TO_STR(TEST_BLOCK_SIZE)); + hdfsBuilderConfSetStr(bld, "dfs.blocksize", + TO_STR(TEST_BLOCK_SIZE)); + if (!username.empty()) { + hdfsBuilderSetUserName(bld, username.c_str()); + } + hdfs = hdfsBuilderConnect(bld); + EXPECT_NE(nullptr, hdfs); + + return HdfsHandle(hdfs); + } + + // Connect via the C API + HdfsHandle connect_c() { + return connect_c(""); + } + +protected: + struct NativeMiniDfsCluster* clusterInfo; + IoService * io_service; +}; + +} // namespace diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs_smoke.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs_smoke.cc new file mode 100644 index 00000000000..aaa29038c91 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfspp_mini_dfs_smoke.cc @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hdfspp_mini_dfs.h" + +namespace hdfs { + +class HdfsMiniDfsSmokeTest: public ::testing::Test { +public: + MiniCluster cluster; +}; + +// Make sure we can set up a mini-cluster and connect to it +TEST_F(HdfsMiniDfsSmokeTest, SmokeTest) { + FSHandle handle = cluster.connect(); + EXPECT_NE(nullptr, handle.handle()); + + HdfsHandle connection = cluster.connect_c(); + EXPECT_NE(nullptr, connection.handle()); +} + + +} + + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + int exit_code = RUN_ALL_TESTS(); + google::protobuf::ShutdownProtobufLibrary(); + + return exit_code; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h index 787a11c8f72..9da21718235 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfs_wrapper_undefs.h @@ -90,3 +90,7 @@ #undef kObjectKindDirectory #undef hdfsReadStatistics #undef hdfsFileInfo +#undef hdfsGetLastError +#undef hdfsCancel +#undef hdfsGetBlockLocations +#undef hdfsFreeBlockLocations diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper.h index 802c5abc469..8cd78d2625f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper.h @@ -22,6 +22,7 @@ /* Rename libhdfspp structs and functions */ #include "libhdfspp_wrapper_defines.h" #include "hdfs/hdfs.h" +#include "hdfspp/hdfs_ext.h" #include "libhdfs_wrapper_undefs.h" /* "Original" symbols can be included elsewhere. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h index 3ca0e69cd7f..0d50fdaf1c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/libhdfspp_wrapper_defines.h @@ -90,3 +90,7 @@ #define kObjectKindDirectory libhdfspp_kObjectKindDirectory #define hdfsReadStatistics libhdfspp_hdfsReadStatistics #define hdfsFileInfo libhdfspp_hdfsFileInfo +#define hdfsGetLastError libhdfspp_hdfsGetLastError +#define hdfsCancel libhdfspp_hdfsCancel +#define hdfsGetBlockLocations libhdfspp_hdfsGetBlockLocations +#define hdfsFreeBlockLocations libhdfspp_hdfsFreeBlockLocations