HDFS-16407. Make hdfs_du tool cross platform (#3848)

This commit is contained in:
Gautham B A 2022-01-04 22:29:54 +05:30 committed by GitHub
parent 9eea0e28f2
commit c3006be516
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 628 additions and 178 deletions

View File

@ -29,11 +29,13 @@ add_executable(hdfs_tool_tests
hdfs-tool-test-fixtures.cc
hdfs-tool-tests.cc
hdfs-df-mock.cc
hdfs-du-mock.cc
main.cc)
target_include_directories(hdfs_tool_tests PRIVATE
../tools
../../tools
../../tools/hdfs-df
../../tools/hdfs-du
../../tools/hdfs-allow-snapshot
../../tools/hdfs-disallow-snapshot
../../tools/hdfs-delete-snapshot
@ -46,6 +48,7 @@ target_include_directories(hdfs_tool_tests PRIVATE
target_link_libraries(hdfs_tool_tests PRIVATE
gmock_main
hdfs_df_lib
hdfs_du_lib
hdfs_allowSnapshot_lib
hdfs_disallowSnapshot_lib
hdfs_deleteSnapshot_lib

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "hdfs-du-mock.h"
#include "hdfs-tool-tests.h"
namespace hdfs::tools::test {
DuMock::~DuMock() = default;
void DuMock::SetExpectations(std::function<std::unique_ptr<DuMock>()> test_case,
const std::vector<std::string> &args) const {
// Get the pointer to the function that defines the test case
const auto test_case_func = test_case.target<std::unique_ptr<DuMock> (*)()>();
ASSERT_NE(test_case_func, nullptr);
// Set the expected method calls and their corresponding arguments for each
// test case
if (*test_case_func == &CallHelp<DuMock>) {
EXPECT_CALL(*this, HandleHelp()).Times(1).WillOnce(testing::Return(true));
return;
}
if (*test_case_func == &PassAPath<DuMock>) {
const auto arg1 = args[0];
EXPECT_CALL(*this, HandlePath(arg1, false))
.Times(1)
.WillOnce(testing::Return(true));
}
if (*test_case_func == &PassRecursivePath<DuMock>) {
const auto arg1 = args[0];
const auto arg2 = args[1];
ASSERT_EQ(arg1, "-R");
EXPECT_CALL(*this, HandlePath(arg2, true))
.Times(1)
.WillOnce(testing::Return(true));
}
if (*test_case_func == &PassRecursive<DuMock>) {
const auto arg1 = args[0];
ASSERT_EQ(arg1, "-R");
}
}
} // namespace hdfs::tools::test

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_TOOLS_HDFS_DU_MOCK
#define LIBHDFSPP_TOOLS_HDFS_DU_MOCK
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include "hdfs-du.h"
namespace hdfs::tools::test {
/**
* {@class DuMock} is an {@class Du} whereby it mocks the
* HandleHelp and HandlePath methods for testing their functionality.
*/
class DuMock : public hdfs::tools::Du {
public:
/**
* {@inheritdoc}
*/
DuMock(const int argc, char **argv) : Du(argc, argv) {}
// Abiding to the Rule of 5
DuMock(const DuMock &) = delete;
DuMock(DuMock &&) = delete;
DuMock &operator=(const DuMock &) = delete;
DuMock &operator=(DuMock &&) = delete;
~DuMock() override;
/**
* Defines the methods and the corresponding arguments that are expected
* to be called on this instance of {@link HdfsTool} for the given test case.
*
* @param test_case An {@link std::function} object that points to the
* function defining the test case
* @param args The arguments that are passed to this test case
*/
void SetExpectations(std::function<std::unique_ptr<DuMock>()> test_case,
const std::vector<std::string> &args = {}) const;
MOCK_METHOD(bool, HandleHelp, (), (const, override));
MOCK_METHOD(bool, HandlePath, (const std::string &, const bool),
(const, override));
};
} // namespace hdfs::tools::test
#endif

View File

@ -28,6 +28,7 @@
#include "hdfs-delete-snapshot-mock.h"
#include "hdfs-df-mock.h"
#include "hdfs-disallow-snapshot-mock.h"
#include "hdfs-du-mock.h"
#include "hdfs-rename-snapshot-mock.h"
#include "hdfs-tool-test-fixtures.h"
#include "hdfs-tool-tests.h"
@ -67,6 +68,12 @@ INSTANTIATE_TEST_SUITE_P(HdfsDf, HdfsToolBasicTest,
testing::Values(PassAPath<hdfs::tools::test::DfMock>,
CallHelp<hdfs::tools::test::DfMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsDu, HdfsToolBasicTest,
testing::Values(PassAPath<hdfs::tools::test::DuMock>,
CallHelp<hdfs::tools::test::DuMock>,
PassRecursivePath<hdfs::tools::test::DuMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsDeleteSnapshot, HdfsToolBasicTest,
testing::Values(CallHelp<hdfs::tools::test::DeleteSnapshotMock>,
@ -114,6 +121,14 @@ INSTANTIATE_TEST_SUITE_P(
HdfsDf, HdfsToolNegativeTestThrows,
testing::Values(Pass2Paths<hdfs::tools::test::DfMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsDu, HdfsToolNegativeTestThrows,
testing::Values(Pass2Paths<hdfs::tools::test::DuMock>,
Pass3Paths<hdfs::tools::test::DuMock>,
PassNOptAndAPath<hdfs::tools::test::DuMock>,
PassOwnerAndAPath<hdfs::tools::test::DuMock>,
PassPermissionsAndAPath<hdfs::tools::test::DuMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsCat, HdfsToolNegativeTestThrows,
testing::Values(Pass2Paths<hdfs::tools::test::CatMock>));
@ -122,6 +137,10 @@ INSTANTIATE_TEST_SUITE_P(
HdfsDeleteSnapshot, HdfsToolNegativeTestNoThrow,
testing::Values(PassAPath<hdfs::tools::test::DeleteSnapshotMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsDu, HdfsToolNegativeTestNoThrow,
testing::Values(PassRecursive<hdfs::tools::test::DuMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsChown, HdfsToolNegativeTestNoThrow,
testing::Values(PassAPath<hdfs::tools::test::ChownMock>));

View File

@ -44,6 +44,31 @@ template <class T> std::unique_ptr<T> PassAPath() {
return hdfs_tool;
}
template <class T> std::unique_ptr<T> PassRecursive() {
constexpr auto argc = 2;
static std::string exe("hdfs_tool_name");
static std::string arg1("-R");
static char *argv[] = {exe.data(), arg1.data()};
auto hdfs_tool = std::make_unique<T>(argc, argv);
hdfs_tool->SetExpectations(PassRecursive<T>, {arg1});
return hdfs_tool;
}
template <class T> std::unique_ptr<T> PassRecursivePath() {
constexpr auto argc = 3;
static std::string exe("hdfs_tool_name");
static std::string arg1("-R");
static std::string arg2("a/b/c");
static char *argv[] = {exe.data(), arg1.data(), arg2.data()};
auto hdfs_tool = std::make_unique<T>(argc, argv);
hdfs_tool->SetExpectations(PassRecursivePath<T>, {arg1, arg2});
return hdfs_tool;
}
template <class T> std::unique_ptr<T> CallHelp() {
constexpr auto argc = 2;
static std::string exe("hdfs_tool_name");

View File

@ -61,8 +61,7 @@ target_link_libraries(hdfs_count tools_common hdfspp_static)
add_subdirectory(hdfs-df)
add_executable(hdfs_du hdfs_du.cc)
target_link_libraries(hdfs_du tools_common hdfspp_static)
add_subdirectory(hdfs-du)
add_executable(hdfs_get hdfs_get.cc)
target_link_libraries(hdfs_get tools_common hdfspp_static)

View File

@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
add_library(hdfs_du_lib STATIC $<TARGET_OBJECTS:hdfs_tool_obj> hdfs-du.cc)
target_include_directories(hdfs_du_lib PRIVATE ../../tools ${Boost_INCLUDE_DIRS})
target_link_libraries(hdfs_du_lib PRIVATE Boost::boost Boost::program_options tools_common hdfspp_static)
add_executable(hdfs_du main.cc)
target_include_directories(hdfs_du PRIVATE ../../tools)
target_link_libraries(hdfs_du PRIVATE hdfs_du_lib)
install(TARGETS hdfs_du RUNTIME DESTINATION bin)

View File

@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <future>
#include <iostream>
#include <memory>
#include <ostream>
#include <sstream>
#include <string>
#include "hdfs-du.h"
#include "internal/get-content-summary-state.h"
#include "tools_common.h"
namespace hdfs::tools {
Du::Du(const int argc, char **argv) : HdfsTool(argc, argv) {}
bool Du::Initialize() {
auto add_options = opt_desc_.add_options();
add_options("help,h",
"Displays sizes of files and directories contained in the given "
"PATH or the length of a file in case PATH is just a file");
add_options("recursive,R", "Operate on files and directories recursively");
add_options("path", po::value<std::string>(),
"The path indicating the filesystem that needs to be du-ed");
// We allow only one positional argument to be passed to this tool. An
// exception is thrown if multiple arguments are passed.
pos_opt_desc_.add("path", 1);
po::store(po::command_line_parser(argc_, argv_)
.options(opt_desc_)
.positional(pos_opt_desc_)
.run(),
opt_val_);
po::notify(opt_val_);
return true;
}
std::string Du::GetDescription() const {
std::stringstream desc;
desc << "Usage: hdfs_du [OPTION] PATH" << std::endl
<< std::endl
<< "Displays sizes of files and directories contained in the given PATH"
<< std::endl
<< "or the length of a file in case PATH is just a file" << std::endl
<< std::endl
<< " -R operate on files and directories recursively"
<< std::endl
<< " -h display this help and exit" << std::endl
<< std::endl
<< "Examples:" << std::endl
<< "hdfs_du hdfs://localhost.localdomain:8020/dir/file" << std::endl
<< "hdfs_du -R /dir1/dir2" << std::endl;
return desc.str();
}
bool Du::Do() {
if (!Initialize()) {
std::cerr << "Unable to initialize HDFS du tool" << std::endl;
return false;
}
if (!ValidateConstraints()) {
std::cout << GetDescription();
return false;
}
if (opt_val_.count("help") > 0) {
return HandleHelp();
}
if (opt_val_.count("path") > 0) {
const auto path = opt_val_["path"].as<std::string>();
const auto recursive = opt_val_.count("recursive") > 0;
return HandlePath(path, recursive);
}
return false;
}
bool Du::HandleHelp() const {
std::cout << GetDescription();
return true;
}
bool Du::HandlePath(const std::string &path, const bool recursive) const {
// Building a URI object from the given path.
auto uri = hdfs::parse_path_or_exit(path);
const auto fs = hdfs::doConnect(uri, true);
if (!fs) {
std::cerr << "Could not connect to the file system." << std::endl;
return false;
}
/*
* Wrap async FileSystem::GetContentSummary with promise to make it a blocking
* call.
*/
const auto promise = std::make_shared<std::promise<hdfs::Status>>();
std::future future(promise->get_future());
auto handler = [promise](const hdfs::Status &s) { promise->set_value(s); };
/*
* Allocating shared state, which includes: handler to be called, request
* counter, and a boolean to keep track if find is done.
*/
const auto state =
std::make_shared<GetContentSummaryState>(handler, 0, false);
/*
* Keep requesting more from Find until we process the entire listing. Call
* handler when Find is done and request counter is 0. Find guarantees that
* the handler will only be called once at a time so we do not need locking in
* handler_find.
*/
auto handler_find = [fs, state](const hdfs::Status &status_find,
const std::vector<hdfs::StatInfo> &stat_infos,
const bool has_more_results) -> bool {
/*
* For each result returned by Find we call async GetContentSummary with the
* handler below. GetContentSummary DOES NOT guarantee that the handler will
* only be called once at a time, so we DO need locking in
* handler_get_content_summary.
*/
auto handler_get_content_summary =
[state](const hdfs::Status &status_get_summary,
const hdfs::ContentSummary &si) {
std::lock_guard guard(state->lock);
std::cout << si.str_du() << std::endl;
// Decrement the counter once since we are done with this async call.
if (!status_get_summary.ok() && state->status.ok()) {
// We make sure we set state->status only on the first error.
state->status = status_get_summary;
}
state->request_counter--;
if (state->request_counter == 0 && state->find_is_done) {
state->handler(state->status); // exit
}
};
if (!stat_infos.empty() && state->status.ok()) {
for (hdfs::StatInfo const &s : stat_infos) {
/*
* Launch an asynchronous call to GetContentSummary for every returned
* result.
*/
state->request_counter++;
fs->GetContentSummary(s.full_path, handler_get_content_summary);
}
}
/*
* Lock this section because handler_get_content_summary might be accessing
* the same shared variables simultaneously.
*/
std::lock_guard guard(state->lock);
if (!status_find.ok() && state->status.ok()) {
// We make sure we set state->status only on the first error.
state->status = status_find;
}
if (!has_more_results) {
state->find_is_done = true;
if (state->request_counter == 0) {
state->handler(state->status); // exit
}
return false;
}
return true;
};
// Asynchronous call to Find.
if (!recursive) {
fs->GetListing(uri.get_path(), handler_find);
} else {
fs->Find(uri.get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(),
handler_find);
}
// Block until promise is set.
const auto status = future.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
return false;
}
return true;
}
} // namespace hdfs::tools

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_TOOLS_HDFS_DU
#define LIBHDFSPP_TOOLS_HDFS_DU
#include <string>
#include <boost/program_options.hpp>
#include "hdfs-tool.h"
namespace hdfs::tools {
/**
* {@class Du} is an {@class HdfsTool} that displays the size of the directories
* and files.
*/
class Du : public HdfsTool {
public:
/**
* {@inheritdoc}
*/
Du(int argc, char **argv);
// Abiding to the Rule of 5
Du(const Du &) = default;
Du(Du &&) = default;
Du &operator=(const Du &) = delete;
Du &operator=(Du &&) = delete;
~Du() override = default;
/**
* {@inheritdoc}
*/
[[nodiscard]] std::string GetDescription() const override;
/**
* {@inheritdoc}
*/
[[nodiscard]] bool Do() override;
protected:
/**
* {@inheritdoc}
*/
[[nodiscard]] bool Initialize() override;
/**
* {@inheritdoc}
*/
[[nodiscard]] bool ValidateConstraints() const override { return argc_ > 1; }
/**
* {@inheritdoc}
*/
[[nodiscard]] bool HandleHelp() const override;
/**
* Handle the path argument that's passed to this tool.
*
* @param path The path to the directory for which we need du info.
* @param recursive A boolean indicating whether du needs to be
* performed recursively for the given path.
*
* @return A boolean indicating the result of this operation.
*/
[[nodiscard]] virtual bool HandlePath(const std::string &path,
bool recursive) const;
private:
/**
* A boost data-structure containing the description of positional arguments
* passed to the command-line.
*/
po::positional_options_description pos_opt_desc_;
};
} // namespace hdfs::tools
#endif

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdlib>
#include <exception>
#include <iostream>
#include <google/protobuf/stubs/common.h>
#include "hdfs-du.h"
int main(int argc, char *argv[]) {
const auto result = std::atexit([]() -> void {
// Clean up static data on exit and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
});
if (result != 0) {
std::cerr
<< "Error: Unable to schedule clean-up tasks for HDFS df tool, exiting"
<< std::endl;
std::exit(EXIT_FAILURE);
}
hdfs::tools::Du du(argc, argv);
auto success = false;
try {
success = du.Do();
} catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
}
if (!success) {
std::exit(EXIT_FAILURE);
}
return 0;
}

View File

@ -1,176 +0,0 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include <google/protobuf/stubs/common.h>
#include <unistd.h>
#include <future>
#include "tools_common.h"
void usage(){
std::cout << "Usage: hdfs_du [OPTION] PATH"
<< std::endl
<< std::endl << "Displays sizes of files and directories contained in the given PATH"
<< std::endl << "or the length of a file in case PATH is just a file"
<< std::endl
<< std::endl << " -R operate on files and directories recursively"
<< std::endl << " -h display this help and exit"
<< std::endl
<< std::endl << "Examples:"
<< std::endl << "hdfs_du hdfs://localhost.localdomain:8020/dir/file"
<< std::endl << "hdfs_du -R /dir1/dir2"
<< std::endl;
}
struct GetContentSummaryState {
const std::function<void(const hdfs::Status &)> handler;
//The request counter is incremented once every time GetContentSummary async call is made
uint64_t request_counter;
//This boolean will be set when find returns the last result
bool find_is_done;
//Final status to be returned
hdfs::Status status;
//Shared variables will need protection with a lock
std::mutex lock;
GetContentSummaryState(const std::function<void(const hdfs::Status &)> & handler_,
uint64_t request_counter_, bool find_is_done_)
: handler(handler_),
request_counter(request_counter_),
find_is_done(find_is_done_),
status(),
lock() {
}
};
int main(int argc, char *argv[]) {
//We should have at least 2 arguments
if (argc < 2) {
usage();
exit(EXIT_FAILURE);
}
bool recursive = false;
int input;
//Using GetOpt to read in the values
opterr = 0;
while ((input = getopt(argc, argv, "Rh")) != -1) {
switch (input)
{
case 'R':
recursive = true;
break;
case 'h':
usage();
exit(EXIT_SUCCESS);
case '?':
if (isprint(optopt))
std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
else
std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
usage();
exit(EXIT_FAILURE);
default:
exit(EXIT_FAILURE);
}
}
std::string uri_path = argv[optind];
//Building a URI object from the given uri_path
hdfs::URI uri = hdfs::parse_path_or_exit(uri_path);
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri, true);
if (!fs) {
std::cerr << "Could not connect the file system. " << std::endl;
exit(EXIT_FAILURE);
}
/* wrap async FileSystem::GetContentSummary with promise to make it a blocking call */
std::shared_ptr<std::promise<hdfs::Status>> promise = std::make_shared<std::promise<hdfs::Status>>();
std::future<hdfs::Status> future(promise->get_future());
auto handler = [promise](const hdfs::Status &s) {
promise->set_value(s);
};
//Allocating shared state, which includes:
//handler to be called, request counter, and a boolean to keep track if find is done
std::shared_ptr<GetContentSummaryState> state = std::make_shared<GetContentSummaryState>(handler, 0, false);
// Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0.
// Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind.
auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector<hdfs::StatInfo> & stat_infos, bool has_more_results) -> bool {
//For each result returned by Find we call async GetContentSummary with the handler below.
//GetContentSummary DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerGetContentSummary.
auto handlerGetContentSummary = [state](const hdfs::Status &status_get_summary, const hdfs::ContentSummary &si) {
std::lock_guard<std::mutex> guard(state->lock);
std::cout << si.str_du() << std::endl;
//Decrement the counter once since we are done with this async call
if (!status_get_summary.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_get_summary;
}
state->request_counter--;
if(state->request_counter == 0 && state->find_is_done){
state->handler(state->status); //exit
}
};
if(!stat_infos.empty() && state->status.ok()) {
for (hdfs::StatInfo const& s : stat_infos) {
//Launch an asynchronous call to GetContentSummary for every returned result
state->request_counter++;
fs->GetContentSummary(s.full_path, handlerGetContentSummary);
}
}
//Lock this section because handlerGetContentSummary might be accessing the same
//shared variables simultaneously
std::lock_guard<std::mutex> guard(state->lock);
if (!status_find.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_find;
}
if(!has_more_results){
state->find_is_done = true;
if(state->request_counter == 0){
state->handler(state->status); //exit
}
return false;
}
return true;
};
if(!recursive){
//Asynchronous call to Find
fs->GetListing(uri.get_path(), handlerFind);
} else {
//Asynchronous call to Find
fs->Find(uri.get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind);
}
/* block until promise is set */
hdfs::Status status = future.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_TOOLS_HDFS_DU_GET_CONTENT_SUMMARY_STATE
#define LIBHDFSPP_TOOLS_HDFS_DU_GET_CONTENT_SUMMARY_STATE
#include <functional>
#include <mutex>
#include <utility>
#include "hdfspp/hdfspp.h"
namespace hdfs::tools {
/**
* The {@class GetContentSummaryState} is used to hold intermediate information
* during the execution of {@link hdfs::FileSystem#GetContentSummary}.
*/
struct GetContentSummaryState {
GetContentSummaryState(std::function<void(const hdfs::Status &)> handler,
const uint64_t request_counter,
const bool find_is_done)
: handler{std::move(handler)}, request_counter{request_counter},
find_is_done{find_is_done} {}
/**
* The handler that is used to update the status asynchronously.
*/
const std::function<void(const hdfs::Status &)> handler;
/**
* The request counter is incremented once every time GetContentSummary async
* call is made.
*/
uint64_t request_counter;
/**
* This boolean will be set when find returns the last result.
*/
bool find_is_done;
/**
* Final status to be returned.
*/
hdfs::Status status;
/**
* Shared variables will need protection with a lock.
*/
std::mutex lock;
};
} // namespace hdfs::tools
#endif