HDFS-10672: libhdfs++: reorder directories in src/main/libhdfspp/examples, and add C++ version of cat tool. Contributed by Anatoli Shein.

This commit is contained in:
James 2016-08-04 18:09:49 -04:00 committed by James Clampffer
parent 4cb0dad5e5
commit 649aff11fe
17 changed files with 347 additions and 115 deletions

View File

@ -16,4 +16,5 @@
# limitations under the License.
#
add_subdirectory(cat)
add_subdirectory(c)
add_subdirectory(cpp)

View File

@ -0,0 +1,35 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Default LIBHDFSPP_DIR to the default install location. You can override
# it by add -DLIBHDFSPP_DIR=... to your cmake invocation
set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
include_directories( ${LIBHDFSPP_DIR}/include )
link_directories( ${LIBHDFSPP_DIR}/lib )
add_executable(cat_c cat.c)
target_link_libraries(cat_c hdfspp uriparser2)
# Several examples in different languages need to produce executables with
# same names. To allow executables with same names we keep their CMake
# names different, but specify their executable names as follows:
set_target_properties( cat_c
PROPERTIES
OUTPUT_NAME "cat"
)

View File

@ -22,68 +22,14 @@
Doesn't deal with any flags for now, will just attempt to read the whole file.
*/
#include <stdlib.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "hdfspp/hdfs_ext.h"
#include "uriparser2/uriparser2.h"
#include "common/util_c.h"
#define SCHEME "hdfs"
#define MAX_STRING 1024
struct Uri {
int valid;
char host[MAX_STRING];
int port;
char path[MAX_STRING];
};
int min(int a, int b) {
return a < b ? a : b;
}
void parse_uri(const char * uri_string, struct Uri * uri) {
uri->valid = 0;
uri->host[0] = 0;
uri->port = -1;
uri->path[0] = 0;
// most start with hdfs scheme
const char * remaining;
const char * scheme_end = strstr(uri_string, "://");
if (scheme_end != NULL) {
if (strncmp(uri_string, SCHEME, strlen(SCHEME)) != 0)
return;
remaining = scheme_end + 3;
// parse authority
const char * authority_end = strstr(remaining, "/");
if (authority_end != NULL) {
char authority[MAX_STRING];
strncpy(authority, remaining, min(authority_end - remaining, sizeof(authority)));
remaining = authority_end;
char * host_port_separator = strstr(authority, ":");
if (host_port_separator != NULL) {
errno = 0;
uri->port = strtol(host_port_separator + 1, NULL, 10);
if (errno != 0)
return;
// Terminate authority at the new end of the host
*host_port_separator = 0;
}
strncpy(uri->host, authority, sizeof(uri->host));
}
strncpy(uri->path, remaining, sizeof(uri->path));
} else {
// Absolute path
strncpy(uri->path, uri_string, sizeof(uri->path));
}
uri->valid = 1;
};
int main(int argc, char** argv) {
@ -93,36 +39,46 @@ int main(int argc, char** argv) {
return 1;
}
URI * uri = NULL;
const char * uri_path = argv[1];
struct Uri uri;
parse_uri(uri_path, &uri);
if (!uri.valid) {
fprintf(stderr, "malformed URI: %s\n", uri_path);
//Separate check for scheme is required, otherwise uriparser2.h library causes memory issues under valgrind
const char * scheme_end = strstr(uri_path, "://");
if (scheme_end) {
if (strncmp(uri_path, SCHEME, strlen(SCHEME)) != 0) {
fprintf(stderr, "Scheme %.*s:// is not supported.\n", (int) (scheme_end - uri_path), uri_path);
return 1;
} else {
uri = uri_parse(uri_path);
}
}
if (!uri) {
fprintf(stderr, "Malformed URI: %s\n", uri_path);
return 1;
}
struct hdfsBuilder* builder = hdfsNewBuilder();
if (*uri.host != 0)
hdfsBuilderSetNameNode(builder, uri.host);
if (uri.port != -1)
hdfsBuilderSetNameNodePort(builder, uri.port);
if (uri->host)
hdfsBuilderSetNameNode(builder, uri->host);
if (uri->port != 0)
hdfsBuilderSetNameNodePort(builder, uri->port);
hdfsFS fs = hdfsBuilderConnect(builder);
if (fs == NULL) {
hdfsGetLastError(error_text, sizeof(error_text));
const char * host = uri.host[0] ? uri.host : "<default>";
int port = uri.port;
if (-1 == port)
const char * host = uri->host ? uri->host : "<default>";
int port = uri->port;
if (port == 0)
port = 8020;
fprintf(stderr, "Unable to connect to %s:%d, hdfsConnect returned null.\n%s\n",
host, port, error_text);
return 1;
}
hdfsFile file = hdfsOpenFile(fs, uri.path, 0, 0, 0, 0);
hdfsFile file = hdfsOpenFile(fs, uri->path, 0, 0, 0, 0);
if (NULL == file) {
hdfsGetLastError(error_text, sizeof(error_text));
fprintf(stderr, "Unable to open file %s: %s\n", uri.path, error_text );
fprintf(stderr, "Unable to open file %s: %s\n", uri->path, error_text );
hdfsDisconnect(fs);
hdfsFreeBuilder(builder);
return 1;
@ -158,5 +114,8 @@ int main(int argc, char** argv) {
}
hdfsFreeBuilder(builder);
free(uri);
// Clean up static data and prevent valgrind memory leaks
ShutdownProtobufLibrary_C();
return 0;
}

View File

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
add_subdirectory(cat)

View File

@ -23,5 +23,13 @@ set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX})
include_directories( ${LIBHDFSPP_DIR}/include )
link_directories( ${LIBHDFSPP_DIR}/lib )
add_executable(cat cat.c)
target_link_libraries(cat hdfspp)
add_executable(cat_cpp cat.cpp)
target_link_libraries(cat_cpp hdfspp)
# Several examples in different languages need to produce executables with
# same names. To allow executables with same names we keep their CMake
# names different, but specify their executable names as follows:
set_target_properties( cat_cpp
PROPERTIES
OUTPUT_NAME "cat"
)

View File

@ -0,0 +1,123 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
/*
A a stripped down version of unix's "cat".
Doesn't deal with any flags for now, will just attempt to read the whole file.
*/
#include "hdfspp/hdfspp.h"
#include "common/hdfs_configuration.h"
#include "common/configuration_loader.h"
#include "common/uri.h"
#include <google/protobuf/io/coded_stream.h>
using namespace std;
using namespace hdfs;
#define SCHEME "hdfs"
int main(int argc, char *argv[]) {
if (argc != 2) {
cerr << "usage: cat [hdfs://[<hostname>:<port>]]/<path-to-file>" << endl;
return 1;
}
optional<URI> uri;
const string uri_path = argv[1];
//Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind
size_t scheme_end = uri_path.find("://");
if (scheme_end != string::npos) {
if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) {
cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl;
return 1;
} else {
uri = URI::parse_from_string(uri_path);
}
}
if (!uri) {
cerr << "Malformed URI: " << uri_path << endl;
return 1;
}
ConfigurationLoader loader;
optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>();
const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR");
if (envHadoopConfDir && (*envHadoopConfDir != 0) ) {
config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml");
}
Options options;
if(config){
options = config->GetOptions();
}
IoService * io_service = IoService::New();
FileSystem *fs_raw = FileSystem::New(io_service, "", options);
if (!fs_raw) {
cerr << "Could not create FileSystem object" << endl;
return 1;
}
//wrapping fs_raw into a unique pointer to guarantee deletion
unique_ptr<FileSystem> fs(fs_raw);
Status stat = fs->Connect(uri->get_host(), to_string(*(uri->get_port())));
if (!stat.ok()) {
cerr << "Could not connect to " << uri->get_host() << ":" << *(uri->get_port()) << endl;
return 1;
}
FileHandle *file_raw = nullptr;
stat = fs->Open(uri->get_path(), &file_raw);
if (!stat.ok()) {
cerr << "Could not open file " << uri->get_path() << endl;
return 1;
}
//wrapping file_raw into a unique pointer to guarantee deletion
unique_ptr<FileHandle> file(file_raw);
char input_buffer[4096];
ssize_t read_bytes_count = 0;
size_t last_read_bytes = 0;
do{
//Reading file chunks
Status stat = file->PositionRead(input_buffer, sizeof(input_buffer), read_bytes_count, &last_read_bytes);
if(stat.ok()) {
//Writing file chunks to stdout
fwrite(input_buffer, last_read_bytes, 1, stdout);
read_bytes_count += last_read_bytes;
} else {
if(stat.is_invalid_offset()){
//Reached the end of the file
break;
} else {
cerr << "Error reading the file: " << stat.ToString() << endl;
return 1;
}
}
} while (last_read_bytes > 0);
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -82,18 +82,18 @@ public:
* stops at the block boundary.
*
* @param buf the pointer to the buffer
* @param nbyte the size of the buffer
* @param buf_size the size of the buffer
* @param offset the offset the file
*
* The handler returns the datanode that serves the block and the number of
* bytes has read.
* bytes has read. Status::InvalidOffset is returned when trying to begin
* a read past the EOF.
**/
virtual void
PositionRead(void *buf, size_t nbyte, uint64_t offset,
PositionRead(void *buf, size_t buf_size, uint64_t offset,
const std::function<void(const Status &, size_t)> &handler) = 0;
virtual Status PositionRead(void *buf, size_t *nbyte, off_t offset) = 0;
virtual Status Read(void *buf, size_t *nbyte) = 0;
virtual Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) = 0;
virtual Status Read(void *buf, size_t buf_size, size_t *bytes_read) = 0;
virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0;
/**

View File

@ -45,10 +45,13 @@ class Status {
static Status AuthenticationFailed();
static Status Canceled();
static Status PathNotFound(const char *msg);
static Status InvalidOffset(const char *msg);
// success
bool ok() const { return code_ == 0; }
bool is_invalid_offset() const { return code_ == kInvalidOffset; }
// Returns the string "OK" for success.
std::string ToString() const;
@ -73,6 +76,7 @@ class Status {
kAccessControlException = 258,
kStandbyException = 259,
kSnapshotProtocolException = 260,
kInvalidOffset = 261,
};
std::string get_exception_class_str() const {

View File

@ -173,6 +173,10 @@ static int Error(const Status &stat) {
errnum = ENOTEMPTY;
default_message = "Directory is not empty";
break;
case Status::Code::kInvalidOffset:
errnum = Status::Code::kInvalidOffset;
default_message = "Trying to begin a read past the EOF";
break;
default:
errnum = ENOSYS;
default_message = "Error: unrecognised code";
@ -754,8 +758,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
return -1;
}
size_t len = length;
Status stat = file->get_impl()->PositionRead(buffer, &len, position);
size_t len = 0;
Status stat = file->get_impl()->PositionRead(buffer, length, position, &len);
if(!stat.ok()) {
return Error(stat);
}
@ -775,8 +779,8 @@ tSize hdfsRead(hdfsFS fs, hdfsFile file, void *buffer, tSize length) {
return -1;
}
size_t len = length;
Status stat = file->get_impl()->Read(buffer, &len);
size_t len = 0;
Status stat = file->get_impl()->Read(buffer, length, &len);
if (!stat.ok()) {
return Error(stat);
}

View File

@ -127,6 +127,10 @@ Status Status::Canceled() {
return Status(kOperationCanceled, "Operation canceled");
}
Status Status::InvalidOffset(const char *msg){
return Status(kInvalidOffset, msg);
}
std::string Status::ToString() const {
if (code_ == kOk) {
return "OK";

View File

@ -17,6 +17,7 @@
*/
#include "common/util.h"
#include "common/util_c.h"
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <exception>
@ -107,3 +108,7 @@ std::string SafeDisconnect(asio::ip::tcp::socket *sock) {
}
}
void ShutdownProtobufLibrary_C() {
google::protobuf::ShutdownProtobufLibrary();
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_COMMON_UTIL_C_H_
#define LIB_COMMON_UTIL_C_H_
#ifdef __cplusplus
extern "C" {
#endif
void ShutdownProtobufLibrary_C();
#ifdef __cplusplus
} /* end extern "C" */
#endif
#endif

View File

@ -48,11 +48,11 @@ FileHandleImpl::FileHandleImpl(const std::string & cluster_name,
}
void FileHandleImpl::PositionRead(
void *buf, size_t nbyte, uint64_t offset,
void *buf, size_t buf_size, uint64_t offset,
const std::function<void(const Status &, size_t)> &handler) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::PositionRead("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", nbyte=" << nbyte << ") called");
<< ", buf_size=" << buf_size << ") called");
/* prevent usage after cancelation */
if(cancel_state_->is_canceled()) {
@ -71,13 +71,14 @@ void FileHandleImpl::PositionRead(
handler(status, bytes_read);
};
AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, callback);
AsyncPreadSome(offset, asio::buffer(buf, buf_size), bad_node_tracker_, callback);
}
Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
Status FileHandleImpl::PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::[sync]PositionRead("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", nbyte=" << *nbyte << ") called");
<< ", buf_size=" << buf_size
<< ", offset=" << offset << ") called");
auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>();
std::future<std::tuple<Status, size_t>> future(callstate->get_future());
@ -87,7 +88,7 @@ Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
callstate->set_value(std::make_tuple(s,bytes));
};
PositionRead(buf, *nbyte, offset, callback);
PositionRead(buf, buf_size, offset, callback);
/* wait for async to finish */
auto returnstate = future.get();
@ -97,21 +98,21 @@ Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) {
return stat;
}
*nbyte = std::get<1>(returnstate);
*bytes_read = std::get<1>(returnstate);
return stat;
}
Status FileHandleImpl::Read(void *buf, size_t *nbyte) {
Status FileHandleImpl::Read(void *buf, size_t buf_size, size_t *bytes_read) {
LOG_TRACE(kFileHandle, << "FileHandleImpl::Read("
<< FMT_THIS_ADDR << ", buf=" << buf
<< ", nbyte=" << *nbyte << ") called");
<< ", buf_size=" << buf_size << ") called");
Status stat = PositionRead(buf, nbyte, offset_);
Status stat = PositionRead(buf, buf_size, offset_, bytes_read);
if(!stat.ok()) {
return stat;
}
offset_ += *nbyte;
offset_ += *bytes_read;
return Status::OK();
}
@ -179,6 +180,11 @@ void FileHandleImpl::AsyncPreadSome(
return;
}
if(offset >= file_info_->file_length_){
handler(Status::InvalidOffset("AsyncPreadSome: trying to begin a read past the EOF"), "", 0);
return;
}
/**
* Note: block and chosen_dn will end up pointing to things inside
* the blocks_ vector. They shouldn't be directly deleted.

View File

@ -59,32 +59,27 @@ public:
std::shared_ptr<LibhdfsEvents> event_handlers);
/*
* [Some day reliably] Reads a particular offset into the data file.
* On error, bytes_read returns the number of bytes successfully read; on
* success, bytes_read will equal nbyte
* Reads the file at the specified offset into the buffer.
* bytes_read returns the number of bytes successfully read on success
* and on error. Status::InvalidOffset is returned when trying to begin
* a read past the EOF.
*/
void PositionRead(
void *buf,
size_t nbyte,
size_t buf_size,
uint64_t offset,
const std::function<void(const Status &status, size_t bytes_read)> &handler
) override;
/**
* Note: The nbyte argument for Read and Pread as well as the
* offset argument for Seek are in/out parameters.
*
* For Read and Pread the value referenced by nbyte should
* be set to the number of bytes to read. Before returning
* the value referenced will be set by the callee to the number
* of bytes that was successfully read.
*
* For Seek the value referenced by offset should be the number
* of bytes to shift from the specified whence position. The
* referenced value will be set to the new offset before returning.
**/
Status PositionRead(void *buf, size_t *bytes_read, off_t offset) override;
Status Read(void *buf, size_t *nbyte) override;
* Reads the file at the specified offset into the buffer.
* @param buf output buffer
* @param buf_size size of the output buffer
* @param offset offset at which to start reading
* @param bytes_read number of bytes successfully read
*/
Status PositionRead(void *buf, size_t buf_size, off_t offset, size_t *bytes_read) override;
Status Read(void *buf, size_t buf_size, size_t *bytes_read) override;
Status Seek(off_t *offset, std::ios_base::seekdir whence) override;
@ -95,6 +90,7 @@ public:
* If an error occurs during connection or transfer, the callback will be
* called with bytes_read equal to the number of bytes successfully transferred.
* If no data nodes can be found, status will be Status::ResourceUnavailable.
* If trying to begin a read past the EOF, status will be Status::InvalidOffset.
*
*/
void AsyncPreadSome(size_t offset, const MutableBuffers &buffers,

View File

@ -113,6 +113,7 @@ protected:
TEST(BadDataNodeTest, TestNoNodes) {
auto file_info = std::make_shared<struct FileInfo>();
file_info->file_length_ = 1; //To avoid running into EOF
file_info->blocks_.push_back(LocatedBlockProto());
LocatedBlockProto & block = file_info->blocks_[0];
ExtendedBlockProto *b = block.mutable_b();
@ -152,6 +153,7 @@ TEST(BadDataNodeTest, TestNoNodes) {
TEST(BadDataNodeTest, NNEventCallback) {
auto file_info = std::make_shared<struct FileInfo>();
file_info->file_length_ = 1; //To avoid running into EOF
file_info->blocks_.push_back(LocatedBlockProto());
LocatedBlockProto & block = file_info->blocks_[0];
ExtendedBlockProto *b = block.mutable_b();
@ -215,6 +217,7 @@ TEST(BadDataNodeTest, NNEventCallback) {
TEST(BadDataNodeTest, RecoverableError) {
auto file_info = std::make_shared<struct FileInfo>();
file_info->file_length_ = 1; //To avoid running into EOF
file_info->blocks_.push_back(LocatedBlockProto());
LocatedBlockProto & block = file_info->blocks_[0];
ExtendedBlockProto *b = block.mutable_b();
@ -265,6 +268,7 @@ TEST(BadDataNodeTest, RecoverableError) {
TEST(BadDataNodeTest, InternalError) {
auto file_info = std::make_shared<struct FileInfo>();
file_info->file_length_ = 1; //To avoid running into EOF
file_info->blocks_.push_back(LocatedBlockProto());
LocatedBlockProto & block = file_info->blocks_[0];
ExtendedBlockProto *b = block.mutable_b();

View File

@ -16,8 +16,6 @@
* limitations under the License.
*/
//#include "expect.h"
#include "hdfspp_mini_dfs.h"
#include "hdfspp/hdfs_ext.h"
#include <chrono>
@ -293,6 +291,41 @@ TEST_F(HdfsExtTest, TestChmodChown) {
hdfsFreeFileInfo(file_info, 1);
}
//Testing EOF
TEST_F(HdfsExtTest, TestEOF) {
HdfsHandle connection = cluster.connect_c();
hdfsFS fs = connection.handle();
EXPECT_NE(nullptr, fs);
//Write to a file
errno = 0;
int size = 256;
std::string path = "/eofTest";
hdfsFile file = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0);
EXPECT_NE(nullptr, file);
void * buf = malloc(size);
memset(buf, ' ', size);
EXPECT_EQ(size, hdfsWrite(fs, file, buf, size));
free(buf);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
EXPECT_EQ(0, errno);
//Test normal reading (no EOF)
char buffer[300];
EXPECT_EQ(0, errno);
file = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
EXPECT_EQ(size, hdfsPread(fs, file, 0, buffer, sizeof(buffer)));
//Read executes correctly, but causes a warning (captured in HDFS-10595)
//and sets errno to EINPROGRESS 115 : Operation now in progress
errno = 0;
//Test reading at offset past the EOF
EXPECT_EQ(-1, hdfsPread(fs, file, sizeof(buffer), buffer, sizeof(buffer)));
EXPECT_EQ(Status::kInvalidOffset, errno);
EXPECT_EQ(0, hdfsCloseFile(fs, file));
EXPECT_EQ(0, errno);
}
}