HDFS-10465: libhdfs++: Implement GetBlockLocations. Contributed by Bob Hansen

This commit is contained in:
Bob Hansen 2016-06-01 13:25:22 -04:00 committed by James Clampffer
parent f1f0b8f0f8
commit 18f4d2f42e
15 changed files with 832 additions and 15 deletions

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HDFSPP_BLOCK_LOCATION_H
#define HDFSPP_BLOCK_LOCATION_H
namespace hdfs {
class DNInfo {
public:
DNInfo() : xfer_port(-1), info_port(-1), IPC_port(-1), info_secure_port(-1) {}
std::string getHostname() const {
return hostname;
}
void setHostname(const std::string & hostname) {
this->hostname = hostname;
}
std::string getIPAddr() const {
return ip_addr;
}
void setIPAddr(const std::string & ip_addr) {
this->ip_addr = ip_addr;
}
int getXferPort() const {
return xfer_port;
}
void setXferPort(int xfer_port) {
this->xfer_port = xfer_port;
}
int getInfoPort() const {
return info_port;
}
void setInfoPort(int info_port) {
this->info_port = info_port;
}
int getIPCPort() const {
return IPC_port;
}
void setIPCPort(int IPC_port) {
this->IPC_port = IPC_port;
}
int getInfoSecurePort() const {
return info_secure_port;
}
void setInfoSecurePort(int info_secure_port) {
this->info_secure_port = info_secure_port;
}
private:
std::string hostname;
std::string ip_addr;
int xfer_port;
int info_port;
int IPC_port;
int info_secure_port;
};
class BlockLocation {
public:
bool isCorrupt() const {
return corrupt;
}
void setCorrupt(bool corrupt) {
this->corrupt = corrupt;
}
int64_t getLength() const {
return length;
}
void setLength(int64_t length) {
this->length = length;
}
int64_t getOffset() const {
return offset;
}
void setOffset(int64_t offset) {
this->offset = offset;
}
const std::vector<DNInfo> & getDataNodes() const {
return dn_info;
}
void setDataNodes(const std::vector<DNInfo> & dn_info) {
this->dn_info = dn_info;
}
private:
bool corrupt;
int64_t length;
int64_t offset; // Offset of the block in the file
std::vector<DNInfo> dn_info; // Info about who stores each block
};
class FileBlockLocation {
public:
uint64_t getFileLength() {
return fileLength;
}
void setFileLength(uint64_t fileLength) {
this->fileLength = fileLength;
}
bool isLastBlockComplete() const {
return this->lastBlockComplete;
}
void setLastBlockComplete(bool lastBlockComplete) {
this->lastBlockComplete = lastBlockComplete;
}
bool isUnderConstruction() const {
return underConstruction;
}
void setUnderConstruction(bool underConstruction) {
this->underConstruction = underConstruction;
}
const std::vector<BlockLocation> & getBlockLocations() const {
return blockLocations;
}
void setBlockLocations(const std::vector<BlockLocation> & blockLocations) {
this->blockLocations = blockLocations;
}
private:
uint64_t fileLength;
bool lastBlockComplete;
bool underConstruction;
std::vector<BlockLocation> blockLocations;
};
} // namespace hdfs
#endif /* HDFSPP_BLOCK_LOCATION_H */

View File

@ -115,6 +115,63 @@ LIBHDFS_EXTERNAL
int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val);
/**
* Returns the block information and data nodes associated with a particular file.
*
* The hdfsBlockLocations structure will have zero or more hdfsBlockInfo elements,
* which will have zero or more ip_addr elements indicating which datanodes have
* each block.
*
* @param fs A connected hdfs instance
* @param path Path of the file to query
* @param locations The address of an output pointer to contain the block information.
* On success, this pointer must be later freed with hdfsFreeBlockLocations.
*
* @return 0 on success; nonzero error code otherwise.
* If the file does not exist, an error will be returned.
*/
struct hdfsDNInfo {
const char * ip_address;
const char * hostname;
int xfer_port;
int info_port;
int IPC_port;
int info_secure_port;
};
struct hdfsBlockInfo {
uint64_t start_offset;
uint64_t num_bytes;
size_t num_locations;
struct hdfsDNInfo * locations;
};
struct hdfsBlockLocations
{
uint64_t fileLength;
int isLastBlockComplete;
int isUnderConstruction;
size_t num_blocks;
struct hdfsBlockInfo * blocks;
};
LIBHDFS_EXTERNAL
int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations);
/**
* Frees up an hdfsBlockLocations pointer allocated by hdfsGetBlockLocations.
*
* @param locations The previously-populated pointer allocated by hdfsGetBlockLocations
* @return 0 on success, nonzero on error
*/
LIBHDFS_EXTERNAL
int hdfsFreeBlockLocations(struct hdfsBlockLocations * locations);
/**
* Client can supply a C style function pointer to be invoked any time something
* is logged. Unlike the C++ logger this will not filter by level or component,

View File

@ -21,6 +21,7 @@
#include "hdfspp/options.h"
#include "hdfspp/status.h"
#include "hdfspp/events.h"
#include "hdfspp/block_location.h"
#include <functional>
#include <memory>
@ -168,6 +169,16 @@ class FileSystem {
const std::function<void(const Status &, FileHandle *)> &handler) = 0;
virtual Status Open(const std::string &path, FileHandle **handle) = 0;
/**
* Returns the locations of all known blocks for the indicated file, or an error
* if the information clould not be found
*/
virtual void GetBlockLocations(const std::string & path,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) = 0;
virtual Status GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * locations) = 0;
/**
* Note that it is an error to destroy the filesystem from within a filesystem
* callback. It will lead to a deadlock and the termination of the process.

View File

@ -167,12 +167,20 @@ static int ReportCaughtNonException()
return Error(Status::Exception("Uncaught value not derived from std::exception", ""));
}
/* return false on failure */
bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
bool CheckSystem(hdfsFS fs) {
if (!fs) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return false;
}
return true;
}
/* return false on failure */
bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) {
if (!CheckSystem(fs))
return false;
if (!file) {
ReportError(EBADF, "Cannot perform FS operations with null File handle.");
return false;
@ -410,6 +418,92 @@ int hdfsCancel(hdfsFS fs, hdfsFile file) {
}
int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations_out)
{
try
{
if (!CheckSystem(fs)) {
return -1;
}
if (locations_out == nullptr) {
ReportError(EINVAL, "Null pointer passed to hdfsGetBlockLocations");
return -2;
}
std::shared_ptr<FileBlockLocation> ppLocations;
Status stat = fs->get_impl()->GetBlockLocations(path, &ppLocations);
if (!stat.ok()) {
return Error(stat);
}
hdfsBlockLocations *locations = new struct hdfsBlockLocations();
(*locations_out) = locations;
bzero(locations, sizeof(*locations));
locations->fileLength = ppLocations->getFileLength();
locations->isLastBlockComplete = ppLocations->isLastBlockComplete();
locations->isUnderConstruction = ppLocations->isUnderConstruction();
const std::vector<BlockLocation> & ppBlockLocations = ppLocations->getBlockLocations();
locations->num_blocks = ppBlockLocations.size();
locations->blocks = new struct hdfsBlockInfo[locations->num_blocks];
for (size_t i=0; i < ppBlockLocations.size(); i++) {
auto ppBlockLocation = ppBlockLocations[i];
auto block = &locations->blocks[i];
block->num_bytes = ppBlockLocation.getLength();
block->start_offset = ppBlockLocation.getOffset();
const std::vector<DNInfo> & ppDNInfos = ppBlockLocation.getDataNodes();
block->num_locations = ppDNInfos.size();
block->locations = new hdfsDNInfo[block->num_locations];
for (size_t j=0; j < block->num_locations; j++) {
auto ppDNInfo = ppDNInfos[j];
auto dn_info = &block->locations[j];
dn_info->xfer_port = ppDNInfo.getXferPort();
dn_info->info_port = ppDNInfo.getInfoPort();
dn_info->IPC_port = ppDNInfo.getIPCPort();
dn_info->info_secure_port = ppDNInfo.getInfoSecurePort();
char * buf;
buf = new char[ppDNInfo.getHostname().size() + 1];
strncpy(buf, ppDNInfo.getHostname().c_str(), ppDNInfo.getHostname().size());
dn_info->hostname = buf;
buf = new char[ppDNInfo.getIPAddr().size() + 1];
strncpy(buf, ppDNInfo.getIPAddr().c_str(), ppDNInfo.getIPAddr().size());
dn_info->ip_address = buf;
}
}
return 0;
} catch (const std::exception & e) {
return ReportException(e);
} catch (...) {
return ReportCaughtNonException();
}
}
int hdfsFreeBlockLocations(struct hdfsBlockLocations * blockLocations) {
if (blockLocations == nullptr)
return 0;
for (size_t i=0; i < blockLocations->num_blocks; i++) {
auto block = &blockLocations->blocks[i];
for (size_t j=0; j < block->num_locations; j++) {
auto location = &block->locations[j];
delete[] location->hostname;
delete[] location->ip_address;
}
}
delete[] blockLocations->blocks;
delete blockLocations;
return 0;
}
/*******************************************************************
* EVENT CALLBACKS
*******************************************************************/

View File

@ -97,12 +97,15 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
auto locations = s.resp->locations();
file_info->file_length_ = locations.filelength();
file_info->last_block_complete_ = locations.islastblockcomplete();
file_info->under_construction_ = locations.underconstruction();
for (const auto &block : locations.blocks()) {
file_info->blocks_.push_back(block);
}
if (locations.has_lastblock() && locations.lastblock().b().numbytes()) {
if (!locations.islastblockcomplete() &&
locations.has_lastblock() && locations.lastblock().b().numbytes()) {
file_info->blocks_.push_back(locations.lastblock());
file_info->file_length_ += locations.lastblock().b().numbytes();
}
@ -336,6 +339,102 @@ Status FileSystemImpl::Open(const std::string &path,
return stat;
}
BlockLocation LocatedBlockToBlockLocation(const hadoop::hdfs::LocatedBlockProto & locatedBlock)
{
BlockLocation result;
result.setCorrupt(locatedBlock.corrupt());
result.setOffset(locatedBlock.offset());
std::vector<DNInfo> dn_info;
dn_info.reserve(locatedBlock.locs_size());
for (const hadoop::hdfs::DatanodeInfoProto & datanode_info: locatedBlock.locs()) {
const hadoop::hdfs::DatanodeIDProto &id = datanode_info.id();
DNInfo newInfo;
if (id.has_ipaddr())
newInfo.setIPAddr(id.ipaddr());
if (id.has_hostname())
newInfo.setHostname(id.hostname());
if (id.has_xferport())
newInfo.setXferPort(id.xferport());
if (id.has_infoport())
newInfo.setInfoPort(id.infoport());
if (id.has_ipcport())
newInfo.setIPCPort(id.ipcport());
if (id.has_infosecureport())
newInfo.setInfoSecurePort(id.infosecureport());
dn_info.push_back(newInfo);
}
result.setDataNodes(dn_info);
if (locatedBlock.has_b()) {
const hadoop::hdfs::ExtendedBlockProto & b=locatedBlock.b();
result.setLength(b.numbytes());
}
return result;
}
void FileSystemImpl::GetBlockLocations(const std::string & path,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> handler)
{
auto conversion = [handler](const Status & status, std::shared_ptr<const struct FileInfo> fileInfo) {
if (status.ok()) {
auto result = std::make_shared<FileBlockLocation>();
result->setFileLength(fileInfo->file_length_);
result->setLastBlockComplete(fileInfo->last_block_complete_);
result->setUnderConstruction(fileInfo->under_construction_);
std::vector<BlockLocation> blocks;
for (const hadoop::hdfs::LocatedBlockProto & locatedBlock: fileInfo->blocks_) {
auto newLocation = LocatedBlockToBlockLocation(locatedBlock);
blocks.push_back(newLocation);
}
result->setBlockLocations(blocks);
handler(status, result);
} else {
handler(status, std::shared_ptr<FileBlockLocation>());
}
};
nn_.GetBlockLocations(path, conversion);
}
Status FileSystemImpl::GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * fileBlockLocations)
{
if (!fileBlockLocations)
return Status::InvalidArgument("Null pointer passed to GetBlockLocations");
auto callstate = std::make_shared<std::promise<std::tuple<Status, std::shared_ptr<FileBlockLocation>>>>();
std::future<std::tuple<Status, std::shared_ptr<FileBlockLocation>>> future(callstate->get_future());
/* wrap async call with promise/future to make it blocking */
auto callback = [callstate](const Status &s, std::shared_ptr<FileBlockLocation> blockInfo) {
callstate->set_value(std::make_tuple(s,blockInfo));
};
GetBlockLocations(path, callback);
/* wait for async to finish */
auto returnstate = future.get();
auto stat = std::get<0>(returnstate);
if (!stat.ok()) {
return stat;
}
*fileBlockLocations = std::get<1>(returnstate);
return stat;
}
void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
// It is far too easy to destroy the filesystem (and thus the threadpool)
// from within one of the worker threads, leading to a deadlock. Let's

View File

@ -106,6 +106,12 @@ public:
Status Open(const std::string &path, FileHandle **handle) override;
virtual void GetBlockLocations(const std::string & path,
const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override;
virtual Status GetBlockLocations(const std::string & path,
std::shared_ptr<FileBlockLocation> * locations) override;
void SetFsEventCallback(fs_event_callback callback) override;
/* add a new thread to handle asio requests, return number of threads in pool

View File

@ -28,6 +28,8 @@ namespace hdfs {
*/
struct FileInfo {
unsigned long long file_length_;
bool under_construction_;
bool last_block_complete_;
std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_;
};

View File

@ -24,7 +24,7 @@ set (LIBHDFSPP_LIB_DIR ${LIBHDFSPP_SRC_DIR}/lib)
set (LIBHDFSPP_BINDING_C ${LIBHDFSPP_LIB_DIR}/bindings/c)
include_directories(
${GENERATED_JAVAH}
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_LIST_DIR}
${CMAKE_BINARY_DIR}
${JNI_INCLUDE_DIRS}
${LIBHDFS_SRC_DIR}/include
@ -54,6 +54,11 @@ function(add_memcheck_test name binary)
endif()
endfunction(add_memcheck_test)
#
#
# UNIT TESTS - TEST SELECTED PARTS OF THE LIBRARY
#
#
add_executable(uri_test uri_test.cc)
target_link_libraries(uri_test common gmock_main ${CMAKE_THREAD_LIBS_INIT})
@ -96,16 +101,6 @@ add_executable(hdfspp_errors_test hdfspp_errors.cc)
target_link_libraries(hdfspp_errors_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(hdfspp_errors hdfspp_errors_test)
#This test requires a great deal of Hadoop Java infrastructure to run.
if(HADOOP_BUILD)
add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c)
link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY})
add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static)
endif(HADOOP_BUILD)
add_executable(hdfs_builder_test hdfs_builder_test.cc)
target_link_libraries(hdfs_builder_test test_common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(hdfs_builder_test hdfs_builder_test)
@ -113,3 +108,38 @@ add_memcheck_test(hdfs_builder_test hdfs_builder_test)
add_executable(logging_test logging_test.cc)
target_link_libraries(logging_test common gmock_main bindings_c fs rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} ${SASL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT})
add_memcheck_test(logging_test logging_test)
#
#
# INTEGRATION TESTS - TESTS THE FULL LIBRARY AGAINST ACTUAL SERVERS
#
#
# This test requires a great deal of Hadoop Java infrastructure to run.
#
if(HADOOP_BUILD)
include_directories (
#TODO: Put this in a variable up top and pull it out here
${CMAKE_CURRENT_SOURCE_DIR}/../../libhdfs-tests/
)
add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc)
# TODO: get all of the mini dfs library bits here in one plase
# add_library(hdfspp_mini_cluster native_mini_dfs ${JAVA_JVM_LIBRARY} )
#TODO: Link against full library rather than just parts
build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c)
link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES} )
add_libhdfs_test (libhdfs_threaded hdfspp_test_shim_static)
build_libhdfs_test(hdfspp_mini_dfs_smoke hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfspp_mini_dfs_smoke.cc)
link_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static fs reader rpc proto common connection gmock_main ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
add_libhdfs_test (hdfspp_mini_dfs_smoke hdfspp_test_shim_static)
build_libhdfs_test(hdfs_ext hdfspp_test_shim_static ${CMAKE_CURRENT_LIST_DIR}/hdfs_ext_test.cc)
link_libhdfs_test (hdfs_ext hdfspp_test_shim_static hdfspp_static gmock_main native_mini_dfs ${JAVA_JVM_LIBRARY} ${SASL_LIBRARIES})
add_libhdfs_test (hdfs_ext hdfspp_test_shim_static)
endif(HADOOP_BUILD)

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#include "expect.h"
#include "hdfspp_mini_dfs.h"
#include "hdfspp/hdfs_ext.h"
namespace hdfs {
class HdfsExtTest: public ::testing::Test {
public:
MiniCluster cluster;
};
// Make sure we can set up a mini-cluster and connect to it
TEST_F(HdfsExtTest, TestGetBlockLocations) {
HdfsHandle connection = cluster.connect_c();
EXPECT_NE(nullptr, connection.handle());
hdfsBlockLocations * blocks = nullptr;
// Free a null pointer
int result = hdfsFreeBlockLocations(blocks);
EXPECT_EQ(0, result);
// Test non-extant files
result = hdfsGetBlockLocations(connection, "non_extant_file", &blocks);
EXPECT_NE(0, result); // Should be an error
// Test an extant file
std::string filename = connection.newFile(1024);
result = hdfsGetBlockLocations(connection, filename.c_str(), &blocks);
EXPECT_EQ(0, result);
EXPECT_EQ(1024, blocks->fileLength);
EXPECT_EQ(1, blocks->num_blocks);
EXPECT_EQ(0, blocks->isUnderConstruction);
EXPECT_NE(0, blocks->isLastBlockComplete);
EXPECT_EQ(1024, blocks->blocks->num_bytes);
EXPECT_EQ(0, blocks->blocks->start_offset);
EXPECT_EQ(1, blocks->blocks->num_locations);
EXPECT_NE(nullptr, blocks->blocks->locations->hostname);
EXPECT_NE(nullptr, blocks->blocks->locations->ip_address);
EXPECT_NE(0, blocks->blocks->locations->xfer_port);
result = hdfsFreeBlockLocations(blocks);
EXPECT_EQ(0, result);
}
}
int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.
::testing::InitGoogleMock(&argc, argv);
int exit_code = RUN_ALL_TESTS();
google::protobuf::ShutdownProtobufLibrary();
return exit_code;
}

View File

@ -383,3 +383,25 @@ const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer) {
void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer) {
return libhdfs_hadoopRzBufferFree(file->libhdfsRep, buffer);
}
/*************
* hdfs_ext functions
*/
void hdfsGetLastError(char *buf, int len) {
return libhdfspp_hdfsGetLastError(buf, len);
}
int hdfsCancel(hdfsFS fs, hdfsFile file) {
return libhdfspp_hdfsCancel(fs->libhdfsppRep, file->libhdfsppRep);
}
int hdfsGetBlockLocations(hdfsFS fs, const char *path, struct hdfsBlockLocations ** locations) {
return libhdfspp_hdfsGetBlockLocations(fs->libhdfsppRep, path, locations);
}
int hdfsFreeBlockLocations(struct hdfsBlockLocations * locations) {
return libhdfspp_hdfsFreeBlockLocations(locations);
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "hdfs/hdfs.h"
#include "hdfspp/hdfspp.h"
#include <native_mini_dfs.h>
#include <google/protobuf/io/coded_stream.h>
#include <gmock/gmock.h>
#include <string>
#include <atomic>
#define TO_STR_HELPER(X) #X
#define TO_STR(X) TO_STR_HELPER(X)
#define TEST_BLOCK_SIZE 134217728
namespace hdfs {
static std::atomic<int> dirnum;
static std::atomic<int> filenum;
class FSHandle {
public:
FSHandle() : fs(nullptr) {}
FSHandle(FileSystem * fs_in) : fs(fs_in) {}
FileSystem * handle() { return fs.get(); }
operator FileSystem *() { return fs.get(); }
protected:
std::shared_ptr<FileSystem> fs;
};
/**
* For tests going through the C API to libhdfs++
*/
class HdfsHandle {
public:
HdfsHandle() : fs(nullptr) {
}
HdfsHandle(hdfsFS fs_in) : fs(fs_in) {
}
~HdfsHandle () {
if (fs) {
EXPECT_EQ(0, hdfsDisconnect(fs));
}
}
std::string newDir(const std::string & parent_dir = "/") {
int newDirNum = dirnum++;
std::string path = parent_dir;
if (path.back() != '/')
path += "/";
path += "dir" + std::to_string(newDirNum) + "/";
EXPECT_EQ(0, hdfsCreateDirectory(*this, path.c_str()));
return path;
}
std::string newFile(const std::string & dir = "/", size_t size = 1024) {
int newFileNum = filenum++;
std::string path = dir;
if (path.back() != '/')
path += "/";
path += "file" + std::to_string(newFileNum);
hdfsFile file = hdfsOpenFile(*this, path.c_str(), O_WRONLY, 0, 0, 0);
EXPECT_NE(nullptr, file);
void * buf = malloc(size);
bzero(buf, size);
EXPECT_EQ(1024, hdfsWrite(*this, file, buf, size));
EXPECT_EQ(0, hdfsCloseFile(*this, file));
free(buf);
return path;
}
std::string newFile(size_t size) {
return newFile("/", size);
}
hdfsFS handle() { return fs; }
operator hdfsFS() { return fs; }
private:
hdfsFS fs;
};
class MiniCluster {
public:
MiniCluster() : io_service(IoService::New()) {
struct NativeMiniDfsConf conf = {
1, /* doFormat */
0, /* webhdfs */
-1, /* webhdfs port */
1 /* shortcircuit */
};
clusterInfo = nmdCreate(&conf);
EXPECT_NE(nullptr, clusterInfo);
EXPECT_EQ(0, nmdWaitClusterUp(clusterInfo));
//TODO: Write some files for tests to read/check
}
virtual ~MiniCluster() {
if (clusterInfo) {
EXPECT_EQ(0, nmdShutdown(clusterInfo));
}
nmdFree(clusterInfo);
}
// Connect via the C++ API
FSHandle connect(const std::string username) {
Options options;
FileSystem * fs = FileSystem::New(io_service, username, options);
EXPECT_NE(nullptr, fs);
FSHandle result(fs);
tPort port = (tPort)nmdGetNameNodePort(clusterInfo);
EXPECT_NE(0, port);
Status status = fs->Connect("localhost", std::to_string(port));
EXPECT_EQ(true, status.ok());
return result;
}
FSHandle connect() {
return connect("");
}
// Connect via the C API
HdfsHandle connect_c(const std::string & username) {
tPort port;
hdfsFS hdfs;
struct hdfsBuilder *bld;
port = (tPort)nmdGetNameNodePort(clusterInfo);
bld = hdfsNewBuilder();
EXPECT_NE(nullptr, bld);
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderSetNameNode(bld, "localhost");
hdfsBuilderSetNameNodePort(bld, port);
hdfsBuilderConfSetStr(bld, "dfs.block.size",
TO_STR(TEST_BLOCK_SIZE));
hdfsBuilderConfSetStr(bld, "dfs.blocksize",
TO_STR(TEST_BLOCK_SIZE));
if (!username.empty()) {
hdfsBuilderSetUserName(bld, username.c_str());
}
hdfs = hdfsBuilderConnect(bld);
EXPECT_NE(nullptr, hdfs);
return HdfsHandle(hdfs);
}
// Connect via the C API
HdfsHandle connect_c() {
return connect_c("");
}
protected:
struct NativeMiniDfsCluster* clusterInfo;
IoService * io_service;
};
} // namespace

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "hdfspp_mini_dfs.h"
namespace hdfs {
class HdfsMiniDfsSmokeTest: public ::testing::Test {
public:
MiniCluster cluster;
};
// Make sure we can set up a mini-cluster and connect to it
TEST_F(HdfsMiniDfsSmokeTest, SmokeTest) {
FSHandle handle = cluster.connect();
EXPECT_NE(nullptr, handle.handle());
HdfsHandle connection = cluster.connect_c();
EXPECT_NE(nullptr, connection.handle());
}
}
int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.
::testing::InitGoogleMock(&argc, argv);
int exit_code = RUN_ALL_TESTS();
google::protobuf::ShutdownProtobufLibrary();
return exit_code;
}

View File

@ -90,3 +90,7 @@
#undef kObjectKindDirectory
#undef hdfsReadStatistics
#undef hdfsFileInfo
#undef hdfsGetLastError
#undef hdfsCancel
#undef hdfsGetBlockLocations
#undef hdfsFreeBlockLocations

View File

@ -22,6 +22,7 @@
/* Rename libhdfspp structs and functions */
#include "libhdfspp_wrapper_defines.h"
#include "hdfs/hdfs.h"
#include "hdfspp/hdfs_ext.h"
#include "libhdfs_wrapper_undefs.h"
/* "Original" symbols can be included elsewhere. */

View File

@ -90,3 +90,7 @@
#define kObjectKindDirectory libhdfspp_kObjectKindDirectory
#define hdfsReadStatistics libhdfspp_hdfsReadStatistics
#define hdfsFileInfo libhdfspp_hdfsFileInfo
#define hdfsGetLastError libhdfspp_hdfsGetLastError
#define hdfsCancel libhdfspp_hdfsCancel
#define hdfsGetBlockLocations libhdfspp_hdfsGetBlockLocations
#define hdfsFreeBlockLocations libhdfspp_hdfsFreeBlockLocations