HDFS-9452. libhdfs++ Fix memory stomp in OpenFileForRead. Contributed by James Clampffer

This commit is contained in:
James 2015-11-30 11:11:52 -05:00 committed by James Clampffer
parent 87362b1c17
commit 584c2a204d
1 changed files with 34 additions and 36 deletions

View File

@ -26,6 +26,7 @@
#include <thread>
#include <vector>
#include <set>
#include <tuple>
#include <hdfs/hdfs.h>
#include "libhdfspp/hdfs.h"
@ -35,38 +36,35 @@
namespace hdfs {
FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){};
FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){}
Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future(stat->get_future());
auto callstate = std::make_shared<std::promise<std::tuple<Status, std::string, size_t>>>();
std::future<std::tuple<Status, std::string, size_t>> future(callstate->get_future());
/* wrap async call with promise/future to make it blocking */
size_t read_count = 0;
std::string contacted_datanode;
auto callback = [stat, &read_count, &contacted_datanode](
auto callback = [callstate](
const Status &s, const std::string &dn, size_t bytes) {
stat->set_value(s);
read_count = bytes;
contacted_datanode = dn;
callstate->set_value(std::make_tuple(s, dn, bytes));
};
input_stream_->PositionRead(buf, *nbyte, offset, callback);
/* wait for async to finish */
auto s = future.get();
auto returnstate = future.get();
auto stat = std::get<0>(returnstate);
if (!s.ok()) {
if (!stat.ok()) {
/* determine if DN gets marked bad */
if (InputStream::ShouldExclude(s)) {
if (InputStream::ShouldExclude(stat)) {
InputStreamImpl *impl =
static_cast<InputStreamImpl *>(input_stream_.get());
impl->bad_node_tracker_->AddBadNode(contacted_datanode);
impl->bad_node_tracker_->AddBadNode(std::get<1>(returnstate));
}
return s;
return stat;
}
*nbyte = (size_t)read_count;
*nbyte = std::get<2>(returnstate);
return Status::OK();
}
@ -152,13 +150,11 @@ Status HadoopFileSystem::Connect(const char *nn, tPort port,
AddWorkerThread();
}
/* synchronized */
FileSystem *fs = nullptr;
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem*>>>();
std::future<std::tuple<Status, FileSystem*>> future(callstate->get_future());
auto callback = [stat, &fs](const Status &s, FileSystem *f) {
fs = f;
stat->set_value(s);
auto callback = [callstate](const Status &s, FileSystem *f) {
callstate->set_value(std::make_tuple(s,f));
};
/* dummy options object until this is hooked up to HDFS-9117 */
@ -167,17 +163,19 @@ Status HadoopFileSystem::Connect(const char *nn, tPort port,
callback);
/* block until promise is set */
auto s = future.get();
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
FileSystem *fs = std::get<1>(returnstate);
/* check and see if it worked */
if (!fs) {
if (!stat.ok() || !fs) {
service_->Stop();
worker_threads_.clear();
return s;
return stat;
}
file_system_ = std::unique_ptr<FileSystem>(fs);
return s;
return stat;
}
int HadoopFileSystem::AddWorkerThread() {
@ -189,30 +187,30 @@ int HadoopFileSystem::AddWorkerThread() {
Status HadoopFileSystem::OpenFileForRead(const std::string &path,
FileHandle **handle) {
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
auto callstate = std::make_shared<std::promise<std::tuple<Status, InputStream*>>>();
std::future<std::tuple<Status, InputStream*>> future(callstate->get_future());
/* wrap async FileSystem::Open with promise to make it a blocking call */
InputStream *input_stream = nullptr;
auto h = [stat, &input_stream](const Status &s, InputStream *is) {
stat->set_value(s);
input_stream = is;
auto h = [callstate](const Status &s, InputStream *is) {
callstate->set_value(std::make_tuple(s, is));
};
file_system_->Open(path, h);
/* block until promise is set */
auto s = future.get();
auto returnstate = future.get();
Status stat = std::get<0>(returnstate);
InputStream *input_stream = std::get<1>(returnstate);
if (!s.ok()) {
if (!stat.ok()) {
delete input_stream;
return s;
return stat;
}
if (!input_stream) {
return s;
return stat;
}
*handle = new FileHandle(input_stream);
return s;
return stat;
}
}