HDFS-8766. Implement a libhdfs(3) compatible API. Contributed by James Clampffer.

This commit is contained in:
Haohui Mai 2015-10-28 15:50:26 -07:00 committed by James Clampffer
parent 422a6e4ed5
commit dbd1d0c75b
7 changed files with 417 additions and 0 deletions

View File

@ -51,6 +51,7 @@ include_directories(
third_party/asio-1.10.2/include third_party/asio-1.10.2/include
third_party/gmock-1.7.0 third_party/gmock-1.7.0
${OPENSSL_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR}
../libhdfs/include
) )
set(PROTO_HDFS_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../hadoop-hdfs-client/src/main/proto) set(PROTO_HDFS_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../../../hadoop-hdfs-client/src/main/proto)

View File

@ -21,3 +21,4 @@ add_subdirectory(fs)
add_subdirectory(reader) add_subdirectory(reader)
add_subdirectory(rpc) add_subdirectory(rpc)
add_subdirectory(proto) add_subdirectory(proto)
add_subdirectory(bindings)

View File

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
add_subdirectory(c)

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
add_library(bindings_c hdfs.cc hdfs_cpp.cc)
add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common)

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "hdfs_cpp.h"
#include <cstring>
#include <iostream>
using namespace hdfs;
/* Seperate the handles used by the C api from the C++ API*/
struct hdfs_internal {
hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {}
hdfs_internal(std::unique_ptr<HadoopFileSystem> p)
: filesystem_(std::move(p)) {}
virtual ~hdfs_internal(){};
HadoopFileSystem *get_impl() { return filesystem_.get(); }
const HadoopFileSystem *get_impl() const { return filesystem_.get(); }
private:
std::unique_ptr<HadoopFileSystem> filesystem_;
};
struct hdfsFile_internal {
hdfsFile_internal(FileHandle *p) : file_(p) {}
hdfsFile_internal(std::unique_ptr<FileHandle> p) : file_(std::move(p)) {}
virtual ~hdfsFile_internal(){};
FileHandle *get_impl() { return file_.get(); }
const FileHandle *get_impl() const { return file_.get(); }
private:
std::unique_ptr<FileHandle> file_;
};
/* Error handling with optional debug to stderr */
static void ReportError(int errnum, std::string msg) {
errno = errnum;
#ifdef LIBHDFSPP_C_API_ENABLE_DEBUG
std::cerr << "Error: errno=" << strerror(errnum) << " message=\"" << msg
<< "\"" << std::endl;
#else
(void)msg;
#endif
}
/**
* C API implementations
**/
int hdfsFileIsOpenForRead(hdfsFile file) {
/* files can only be open for reads at the moment, do a quick check */
if (file) {
return file->get_impl()->IsOpenForRead();
}
return false;
}
hdfsFS hdfsConnect(const char *nn, tPort port) {
HadoopFileSystem *fs = new HadoopFileSystem();
Status stat = fs->Connect(nn, port);
if (!stat.ok()) {
ReportError(ENODEV, "Unable to connect to NameNode.");
delete fs;
return nullptr;
}
return new hdfs_internal(fs);
}
int hdfsDisconnect(hdfsFS fs) {
if (!fs) {
ReportError(ENODEV, "Cannot disconnect null FS handle.");
return -1;
}
delete fs;
return 0;
}
hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize,
short replication, tSize blocksize) {
(void)flags;
(void)bufferSize;
(void)replication;
(void)blocksize;
if (!fs) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return nullptr;
}
FileHandle *f = nullptr;
Status stat = fs->get_impl()->OpenFileForRead(path, &f);
if (!stat.ok()) {
return nullptr;
}
return new hdfsFile_internal(f);
}
int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
if (!fs) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return -1;
}
if (!file) {
ReportError(EBADF, "Cannot perform FS operations with null File handle.");
return -1;
}
delete file;
return 0;
}
tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer,
tSize length) {
if (!fs) {
ReportError(ENODEV, "Cannot perform FS operations with null FS handle.");
return -1;
}
if (!file) {
ReportError(EBADF, "Cannot perform FS operations with null File handle.");
return -1;
}
return file->get_impl()->Pread(buffer, length, position);
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "hdfs_cpp.h"
#include <cstdint>
#include <cerrno>
#include <string>
#include <future>
#include <memory>
#include <thread>
#include <vector>
#include <set>
#include <hdfs/hdfs.h>
#include "libhdfspp/hdfs.h"
#include "libhdfspp/status.h"
#include "fs/filesystem.h"
#include "common/hdfs_public_api.h"
namespace hdfs {
ssize_t FileHandle::Pread(void *buf, size_t nbyte, off_t offset) {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
/* wrap async call with promise/future to make it blocking */
size_t read_count = 0;
auto callback = [stat, &read_count](const Status &s, const std::string &dn,
size_t bytes) {
(void)dn;
stat->set_value(s);
read_count = bytes;
};
input_stream_->PositionRead(buf, nbyte, offset, std::set<std::string>(),
callback);
/* wait for async to finish */
auto s = future.get();
if (!s.ok()) {
return -1;
}
return (ssize_t)read_count;
}
bool FileHandle::IsOpenForRead() {
/* for now just check if InputStream exists */
if (!input_stream_) {
return false;
}
return true;
}
HadoopFileSystem::~HadoopFileSystem() {
/**
* Note: IoService must be stopped before getting rid of worker threads.
* Once worker threads are joined and deleted the service can be deleted.
**/
file_system_.reset(nullptr);
service_->Stop();
worker_threads_.clear();
service_.reset(nullptr);
}
Status HadoopFileSystem::Connect(const char *nn, tPort port,
unsigned int threads) {
/* IoService::New can return nullptr */
if (!service_) {
return Status::Error("Null IoService");
}
/* spawn background threads for asio delegation */
for (unsigned int i = 0; i < threads; i++) {
AddWorkerThread();
}
/* synchronized */
FileSystem *fs = nullptr;
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callback = [stat, &fs](const Status &s, FileSystem *f) {
fs = f;
stat->set_value(s);
};
/* dummy options object until this is hooked up to HDFS-9117 */
Options options_object;
FileSystem::New(service_.get(), options_object, nn, std::to_string(port),
callback);
/* block until promise is set */
auto s = future.get();
/* check and see if it worked */
if (!fs) {
service_->Stop();
worker_threads_.clear();
return s;
}
file_system_ = std::unique_ptr<FileSystem>(fs);
return s;
}
int HadoopFileSystem::AddWorkerThread() {
auto service_task = [](IoService *service) { service->Run(); };
worker_threads_.push_back(
WorkerPtr(new std::thread(service_task, service_.get())));
return worker_threads_.size();
}
Status HadoopFileSystem::OpenFileForRead(const std::string &path,
FileHandle **handle) {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
/* wrap async FileSystem::Open with promise to make it a blocking call */
InputStream *input_stream = nullptr;
auto h = [stat, &input_stream](const Status &s, InputStream *is) {
stat->set_value(s);
input_stream = is;
};
file_system_->Open(path, h);
/* block until promise is set */
auto s = future.get();
if (!s.ok()) {
delete input_stream;
return s;
}
if (!input_stream) {
return s;
}
*handle = new FileHandle(input_stream);
return s;
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H
#define LIBHDFSPP_BINDINGS_HDFSCPP_H
#include <cstdint>
#include <thread>
#include <vector>
#include "libhdfspp/hdfs.h"
#include <hdfs/hdfs.h>
namespace hdfs {
/**
* Implement a very simple 'it just works' interface in C++
* that provides posix-like file operations + extra stuff for hadoop.
* Then provide very thin C wrappers over each method.
*/
class FileHandle {
public:
virtual ~FileHandle(){};
ssize_t Pread(void *buf, size_t nbyte, off_t offset);
bool IsOpenForRead();
private:
/* handle should only be created by fs */
friend class HadoopFileSystem;
FileHandle(InputStream *is) : input_stream_(is){};
std::unique_ptr<InputStream> input_stream_;
};
class HadoopFileSystem {
public:
HadoopFileSystem() : service_(IoService::New()) {}
virtual ~HadoopFileSystem();
/* attempt to connect to namenode, return false on failure */
Status Connect(const char *nn, tPort port, unsigned int threads = 1);
/* how many worker threads are servicing asio requests */
int WorkerThreadCount() { return worker_threads_.size(); }
/* add a new thread to handle asio requests, return number of threads in pool
*/
int AddWorkerThread();
Status OpenFileForRead(const std::string &path, FileHandle **handle);
private:
std::unique_ptr<IoService> service_;
/* std::thread needs to join before deletion */
struct WorkerDeleter {
void operator()(std::thread *t) {
t->join();
delete t;
}
};
typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
std::vector<WorkerPtr> worker_threads_;
std::unique_ptr<FileSystem> file_system_;
};
}
#endif