HDFS-16472. Make HDFS setrep tool cross platform (#4130)

* The source files for hdfs_setrep
   uses getopt for parsing the
   command line arguments.
* getopt is available only on Linux
   and thus, isn't cross platform.
* We need to replace getopt
  with boost::program_options
  to make this tool cross platform.
This commit is contained in:
Gautham B A 2022-04-05 22:59:11 +05:30 committed by GitHub
parent 34b3275bf4
commit 4ef1d3eef9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 617 additions and 176 deletions

View File

@ -38,6 +38,7 @@ add_executable(hdfs_tool_tests
hdfs-get-mock.cc hdfs-get-mock.cc
hdfs-find-mock.cc hdfs-find-mock.cc
hdfs-ls-mock.cc hdfs-ls-mock.cc
hdfs-setrep-mock.cc
main.cc) main.cc)
target_include_directories(hdfs_tool_tests PRIVATE target_include_directories(hdfs_tool_tests PRIVATE
../tools ../tools
@ -60,6 +61,7 @@ target_include_directories(hdfs_tool_tests PRIVATE
../../tools/hdfs-get ../../tools/hdfs-get
../../tools/hdfs-find ../../tools/hdfs-find
../../tools/hdfs-ls ../../tools/hdfs-ls
../../tools/hdfs-setrep
../../tools/hdfs-cat) ../../tools/hdfs-cat)
target_link_libraries(hdfs_tool_tests PRIVATE target_link_libraries(hdfs_tool_tests PRIVATE
gmock_main gmock_main
@ -81,5 +83,6 @@ target_link_libraries(hdfs_tool_tests PRIVATE
hdfs_get_lib hdfs_get_lib
hdfs_find_lib hdfs_find_lib
hdfs_ls_lib hdfs_ls_lib
hdfs_setrep_lib
hdfs_cat_lib) hdfs_cat_lib)
add_test(hdfs_tool_tests hdfs_tool_tests) add_test(hdfs_tool_tests hdfs_tool_tests)

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "hdfs-setrep-mock.h"
#include "hdfs-tool-tests.h"
namespace hdfs::tools::test {
SetrepMock::~SetrepMock() = default;
void SetrepMock::SetExpectations(
std::function<std::unique_ptr<SetrepMock>()> test_case,
const std::vector<std::string> &args) const {
// Get the pointer to the function that defines the test case
const auto test_case_func =
test_case.target<std::unique_ptr<SetrepMock> (*)()>();
ASSERT_NE(test_case_func, nullptr);
// Set the expected method calls and their corresponding arguments for each
// test case
if (*test_case_func == &CallHelp<SetrepMock>) {
EXPECT_CALL(*this, HandleHelp()).Times(1).WillOnce(testing::Return(true));
return;
}
if (*test_case_func == &PassPermissionsAndAPath<SetrepMock>) {
const auto number = args[0];
const auto path = args[1];
EXPECT_CALL(*this, HandlePath(path, number))
.Times(1)
.WillOnce(testing::Return(true));
}
}
} // namespace hdfs::tools::test

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_TOOLS_HDFS_SETREP_MOCK
#define LIBHDFSPP_TOOLS_HDFS_SETREP_MOCK
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <gmock/gmock.h>
#include "hdfs-setrep.h"
namespace hdfs::tools::test {
/**
* {@class SetrepMock} is an {@class Setrep} whereby it mocks the
* HandleHelp and HandlePath methods for testing their functionality.
*/
class SetrepMock : public hdfs::tools::Setrep {
public:
/**
* {@inheritdoc}
*/
SetrepMock(const int argc, char **argv) : Setrep(argc, argv) {}
// Abiding to the Rule of 5
SetrepMock(const SetrepMock &) = delete;
SetrepMock(SetrepMock &&) = delete;
SetrepMock &operator=(const SetrepMock &) = delete;
SetrepMock &operator=(SetrepMock &&) = delete;
~SetrepMock() override;
/**
* Defines the methods and the corresponding arguments that are expected
* to be called on this instance of {@link HdfsTool} for the given test case.
*
* @param test_case An {@link std::function} object that points to the
* function defining the test case
* @param args The arguments that are passed to this test case
*/
void SetExpectations(std::function<std::unique_ptr<SetrepMock>()> test_case,
const std::vector<std::string> &args = {}) const;
MOCK_METHOD(bool, HandleHelp, (), (const, override));
MOCK_METHOD(bool, HandlePath, (const std::string &, const std::string &),
(const, override));
};
} // namespace hdfs::tools::test
#endif

View File

@ -38,6 +38,7 @@
#include "hdfs-move-to-local-mock.h" #include "hdfs-move-to-local-mock.h"
#include "hdfs-rename-snapshot-mock.h" #include "hdfs-rename-snapshot-mock.h"
#include "hdfs-rm-mock.h" #include "hdfs-rm-mock.h"
#include "hdfs-setrep-mock.h"
#include "hdfs-tool-test-fixtures.h" #include "hdfs-tool-test-fixtures.h"
#include "hdfs-tool-tests.h" #include "hdfs-tool-tests.h"
@ -156,6 +157,11 @@ INSTANTIATE_TEST_SUITE_P(
PassMOptPermissionsAndAPath<hdfs::tools::test::FindMock>, PassMOptPermissionsAndAPath<hdfs::tools::test::FindMock>,
PassNOptAndAPath<hdfs::tools::test::FindMock>)); PassNOptAndAPath<hdfs::tools::test::FindMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsSetrep, HdfsToolBasicTest,
testing::Values(CallHelp<hdfs::tools::test::SetrepMock>,
PassPermissionsAndAPath<hdfs::tools::test::SetrepMock>));
// Negative tests // Negative tests
INSTANTIATE_TEST_SUITE_P( INSTANTIATE_TEST_SUITE_P(
HdfsAllowSnapshot, HdfsToolNegativeTestThrows, HdfsAllowSnapshot, HdfsToolNegativeTestThrows,
@ -245,6 +251,20 @@ INSTANTIATE_TEST_SUITE_P(
PassMOpt<hdfs::tools::test::FindMock>, PassMOpt<hdfs::tools::test::FindMock>,
PassNOpt<hdfs::tools::test::FindMock>)); PassNOpt<hdfs::tools::test::FindMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsChgrp, HdfsToolNegativeTestThrows,
testing::Values(PassNOptAndAPath<hdfs::tools::test::ChgrpMock>));
INSTANTIATE_TEST_SUITE_P(
HdfsSetrep, HdfsToolNegativeTestThrows,
testing::Values(
Pass3Paths<hdfs::tools::test::SetrepMock>,
PassRecursiveOwnerAndAPath<hdfs::tools::test::SetrepMock>,
PassRecursive<hdfs::tools::test::SetrepMock>,
PassMPOptsPermissionsAndAPath<hdfs::tools::test::SetrepMock>,
PassMOpt<hdfs::tools::test::SetrepMock>,
PassNOpt<hdfs::tools::test::SetrepMock>));
INSTANTIATE_TEST_SUITE_P( INSTANTIATE_TEST_SUITE_P(
HdfsRm, HdfsToolNegativeTestNoThrow, HdfsRm, HdfsToolNegativeTestNoThrow,
testing::Values(PassRecursive<hdfs::tools::test::RmMock>)); testing::Values(PassRecursive<hdfs::tools::test::RmMock>));
@ -302,5 +322,5 @@ INSTANTIATE_TEST_SUITE_P(
testing::Values(PassAPath<hdfs::tools::test::ChgrpMock>)); testing::Values(PassAPath<hdfs::tools::test::ChgrpMock>));
INSTANTIATE_TEST_SUITE_P( INSTANTIATE_TEST_SUITE_P(
HdfsChgrp, HdfsToolNegativeTestThrows, HdfsSetrep, HdfsToolNegativeTestNoThrow,
testing::Values(PassNOptAndAPath<hdfs::tools::test::ChgrpMock>)); testing::Values(PassAPath<hdfs::tools::test::SetrepMock>));

View File

@ -64,8 +64,7 @@ add_subdirectory(hdfs-copy-to-local)
add_subdirectory(hdfs-move-to-local) add_subdirectory(hdfs-move-to-local)
add_executable(hdfs_setrep hdfs_setrep.cc) add_subdirectory(hdfs-setrep)
target_link_libraries(hdfs_setrep tools_common hdfspp_static)
add_subdirectory(hdfs-allow-snapshot) add_subdirectory(hdfs-allow-snapshot)

View File

@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
add_library(hdfs_setrep_lib STATIC $<TARGET_OBJECTS:hdfs_tool_obj> hdfs-setrep.cc)
target_include_directories(hdfs_setrep_lib PRIVATE ../../tools ${Boost_INCLUDE_DIRS})
target_link_libraries(hdfs_setrep_lib PRIVATE Boost::boost Boost::program_options tools_common hdfspp_static)
add_executable(hdfs_setrep main.cc)
target_include_directories(hdfs_setrep PRIVATE ../../tools)
target_link_libraries(hdfs_setrep PRIVATE hdfs_setrep_lib)
install(TARGETS hdfs_setrep RUNTIME DESTINATION bin)

View File

@ -0,0 +1,220 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <future>
#include <iostream>
#include <memory>
#include <ostream>
#include <sstream>
#include <string>
#include "hdfs-setrep.h"
#include "internal/set-replication-state.h"
#include "tools_common.h"
namespace hdfs::tools {
Setrep::Setrep(const int argc, char **argv) : HdfsTool(argc, argv) {}
bool Setrep::Initialize() {
auto add_options = opt_desc_.add_options();
add_options("help,h",
"Changes the replication factor of a file at PATH. If PATH is a "
"directory then the command recursively changes the replication "
"factor of all files under the directory tree rooted at PATH.");
add_options(
"replication-factor", po::value<std::string>(),
"The replication factor to set for the given path and its children.");
add_options("path", po::value<std::string>(),
"The path for which the replication factor needs to be set.");
// We allow only one positional argument to be passed to this tool. An
// exception is thrown if multiple arguments are passed.
pos_opt_desc_.add("replication-factor", 1);
pos_opt_desc_.add("path", 1);
po::store(po::command_line_parser(argc_, argv_)
.options(opt_desc_)
.positional(pos_opt_desc_)
.run(),
opt_val_);
po::notify(opt_val_);
return true;
}
bool Setrep::ValidateConstraints() const {
// Only "help" is allowed as single argument.
if (argc_ == 2) {
return opt_val_.count("help");
}
// Rest of the cases must contain more than 2 arguments on the command line.
return argc_ > 2;
}
std::string Setrep::GetDescription() const {
std::stringstream desc;
desc << "Usage: hdfs_setrep [OPTION] NUM_REPLICAS PATH" << std::endl
<< std::endl
<< "Changes the replication factor of a file at PATH. If PATH is a "
"directory then the command"
<< std::endl
<< "recursively changes the replication factor of all files under the "
"directory tree rooted at PATH."
<< std::endl
<< std::endl
<< " -h display this help and exit" << std::endl
<< std::endl
<< "Examples:" << std::endl
<< "hdfs_setrep 5 hdfs://localhost.localdomain:8020/dir/file"
<< std::endl
<< "hdfs_setrep 3 /dir1/dir2" << std::endl;
return desc.str();
}
bool Setrep::Do() {
if (!Initialize()) {
std::cerr << "Unable to initialize HDFS setrep tool" << std::endl;
return false;
}
if (!ValidateConstraints()) {
std::cout << GetDescription();
return false;
}
if (opt_val_.count("help") > 0) {
return HandleHelp();
}
if (opt_val_.count("path") > 0 && opt_val_.count("replication-factor") > 0) {
const auto replication_factor =
opt_val_["replication-factor"].as<std::string>();
const auto path = opt_val_["path"].as<std::string>();
return HandlePath(path, replication_factor);
}
return false;
}
bool Setrep::HandleHelp() const {
std::cout << GetDescription();
return true;
}
bool Setrep::HandlePath(const std::string &path,
const std::string &replication_factor) const {
// Building a URI object from the given path.
auto uri = hdfs::parse_path_or_exit(path);
const auto fs = hdfs::doConnect(uri, true);
if (!fs) {
std::cerr << "Could not connect to the file system." << std::endl;
return false;
}
/*
* Wrap async FileSystem::SetReplication with promise to make it a blocking
* call.
*/
auto promise = std::make_shared<std::promise<hdfs::Status>>();
std::future future(promise->get_future());
auto handler = [promise](const hdfs::Status &s) { promise->set_value(s); };
const auto replication = static_cast<uint16_t>(
std::strtol(replication_factor.c_str(), nullptr, 8));
/*
* Allocating shared state, which includes:
* replication to be set, handler to be called, request counter, and a boolean
* to keep track if find is done
*/
auto state =
std::make_shared<SetReplicationState>(replication, handler, 0, false);
/*
* Keep requesting more from Find until we process the entire listing. Call
* handler when Find is done and request counter is 0. Find guarantees that
* the handler will only be called once at a time so we do not need locking in
* handler_find.
*/
auto handler_find = [fs, state](const hdfs::Status &status_find,
const std::vector<hdfs::StatInfo> &stat_infos,
const bool has_more_results) -> bool {
/*
* For each result returned by Find we call async SetReplication with the
* handler below. SetReplication DOES NOT guarantee that the handler will
* only be called once at a time, so we DO need locking in
* handler_set_replication.
*/
auto handler_set_replication =
[state](const hdfs::Status &status_set_replication) {
std::lock_guard guard(state->lock);
// Decrement the counter once since we are done with this async call.
if (!status_set_replication.ok() && state->status.ok()) {
// We make sure we set state->status only on the first error.
state->status = status_set_replication;
}
state->request_counter--;
if (state->request_counter == 0 && state->find_is_done) {
state->handler(state->status); // Exit.
}
};
if (!stat_infos.empty() && state->status.ok()) {
for (hdfs::StatInfo const &stat_info : stat_infos) {
// Launch an asynchronous call to SetReplication for every returned
// file.
if (stat_info.file_type == hdfs::StatInfo::IS_FILE) {
state->request_counter++;
fs->SetReplication(stat_info.full_path, state->replication,
handler_set_replication);
}
}
}
/*
* Lock this section because handlerSetReplication might be accessing the
* same shared variables simultaneously.
*/
std::lock_guard guard(state->lock);
if (!status_find.ok() && state->status.ok()) {
// We make sure we set state->status only on the first error.
state->status = status_find;
}
if (!has_more_results) {
state->find_is_done = true;
if (state->request_counter == 0) {
state->handler(state->status); // Exit.
}
return false;
}
return true;
};
// Asynchronous call to Find.
fs->Find(uri.get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(),
handler_find);
// Block until promise is set.
const auto status = future.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
return false;
}
return true;
}
} // namespace hdfs::tools

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_TOOLS_HDFS_SETREP
#define LIBHDFSPP_TOOLS_HDFS_SETREP
#include <string>
#include <boost/program_options.hpp>
#include "hdfs-tool.h"
namespace hdfs::tools {
/**
* {@class Setrep} is an {@class HdfsTool} that changes the replication factor
* of a file at a given path. If the path is a directory, then it recursively
* changes the replication factor of all files under the directory tree rooted
* at the given path.
*/
class Setrep : public HdfsTool {
public:
/**
* {@inheritdoc}
*/
Setrep(int argc, char **argv);
// Abiding to the Rule of 5
Setrep(const Setrep &) = default;
Setrep(Setrep &&) = default;
Setrep &operator=(const Setrep &) = delete;
Setrep &operator=(Setrep &&) = delete;
~Setrep() override = default;
/**
* {@inheritdoc}
*/
[[nodiscard]] std::string GetDescription() const override;
/**
* {@inheritdoc}
*/
[[nodiscard]] bool Do() override;
protected:
/**
* {@inheritdoc}
*/
[[nodiscard]] bool Initialize() override;
/**
* {@inheritdoc}
*/
[[nodiscard]] bool ValidateConstraints() const override;
/**
* {@inheritdoc}
*/
[[nodiscard]] bool HandleHelp() const override;
/**
* Handle the path argument that's passed to this tool.
*
* @param path The path to the directory for which we need setrep info.
* @param replication_factor The replication factor to set to given path and
* its children.
*
* @return A boolean indicating the result of this operation.
*/
[[nodiscard]] virtual bool
HandlePath(const std::string &path,
const std::string &replication_factor) const;
private:
/**
* A boost data-structure containing the description of positional arguments
* passed to the command-line.
*/
po::positional_options_description pos_opt_desc_;
};
} // namespace hdfs::tools
#endif

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdlib>
#include <exception>
#include <iostream>
#include <google/protobuf/stubs/common.h>
#include "hdfs-setrep.h"
int main(int argc, char *argv[]) {
const auto result = std::atexit([]() -> void {
// Clean up static data on exit and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
});
if (result != 0) {
std::cerr << "Error: Unable to schedule clean-up tasks for HDFS setrep "
"tool, exiting"
<< std::endl;
std::exit(EXIT_FAILURE);
}
hdfs::tools::Setrep setrep(argc, argv);
auto success = false;
try {
success = setrep.Do();
} catch (const std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
}
if (!success) {
std::exit(EXIT_FAILURE);
}
return 0;
}

View File

@ -1,172 +0,0 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
#include <google/protobuf/stubs/common.h>
#include <unistd.h>
#include <future>
#include "tools_common.h"
void usage(){
std::cout << "Usage: hdfs_setrep [OPTION] NUM_REPLICAS PATH"
<< std::endl
<< std::endl << "Changes the replication factor of a file at PATH. If PATH is a directory then the command"
<< std::endl << "recursively changes the replication factor of all files under the directory tree rooted at PATH."
<< std::endl
<< std::endl << " -h display this help and exit"
<< std::endl
<< std::endl << "Examples:"
<< std::endl << "hdfs_setrep 5 hdfs://localhost.localdomain:8020/dir/file"
<< std::endl << "hdfs_setrep 3 /dir1/dir2"
<< std::endl;
}
struct SetReplicationState {
const uint16_t replication;
const std::function<void(const hdfs::Status &)> handler;
//The request counter is incremented once every time SetReplication async call is made
uint64_t request_counter;
//This boolean will be set when find returns the last result
bool find_is_done;
//Final status to be returned
hdfs::Status status;
//Shared variables will need protection with a lock
std::mutex lock;
SetReplicationState(const uint16_t replication_, const std::function<void(const hdfs::Status &)> & handler_,
uint64_t request_counter_, bool find_is_done_)
: replication(replication_),
handler(handler_),
request_counter(request_counter_),
find_is_done(find_is_done_),
status(),
lock() {
}
};
int main(int argc, char *argv[]) {
//We should have 3 or 4 parameters
if (argc < 3) {
usage();
exit(EXIT_FAILURE);
}
int input;
//Using GetOpt to read in the values
opterr = 0;
while ((input = getopt(argc, argv, "h")) != -1) {
switch (input)
{
case 'h':
usage();
exit(EXIT_SUCCESS);
case '?':
if (isprint(optopt))
std::cerr << "Unknown option `-" << (char) optopt << "'." << std::endl;
else
std::cerr << "Unknown option character `" << (char) optopt << "'." << std::endl;
usage();
exit(EXIT_FAILURE);
default:
exit(EXIT_FAILURE);
}
}
std::string repl = argv[optind];
std::string uri_path = argv[optind + 1];
//Building a URI object from the given uri_path
hdfs::URI uri = hdfs::parse_path_or_exit(uri_path);
std::shared_ptr<hdfs::FileSystem> fs = hdfs::doConnect(uri, true);
if (!fs) {
std::cerr << "Could not connect the file system. " << std::endl;
exit(EXIT_FAILURE);
}
/* wrap async FileSystem::SetReplication with promise to make it a blocking call */
std::shared_ptr<std::promise<hdfs::Status>> promise = std::make_shared<std::promise<hdfs::Status>>();
std::future<hdfs::Status> future(promise->get_future());
auto handler = [promise](const hdfs::Status &s) {
promise->set_value(s);
};
uint16_t replication = std::stoi(repl.c_str(), NULL, 8);
//Allocating shared state, which includes:
//replication to be set, handler to be called, request counter, and a boolean to keep track if find is done
std::shared_ptr<SetReplicationState> state = std::make_shared<SetReplicationState>(replication, handler, 0, false);
// Keep requesting more from Find until we process the entire listing. Call handler when Find is done and reques counter is 0.
// Find guarantees that the handler will only be called once at a time so we do not need locking in handlerFind.
auto handlerFind = [fs, state](const hdfs::Status &status_find, const std::vector<hdfs::StatInfo> & stat_infos, bool has_more_results) -> bool {
//For each result returned by Find we call async SetReplication with the handler below.
//SetReplication DOES NOT guarantee that the handler will only be called once at a time, so we DO need locking in handlerSetReplication.
auto handlerSetReplication = [state](const hdfs::Status &status_set_replication) {
std::lock_guard<std::mutex> guard(state->lock);
//Decrement the counter once since we are done with this async call
if (!status_set_replication.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_set_replication;
}
state->request_counter--;
if(state->request_counter == 0 && state->find_is_done){
state->handler(state->status); //exit
}
};
if(!stat_infos.empty() && state->status.ok()) {
for (hdfs::StatInfo const& s : stat_infos) {
//Launch an asynchronous call to SetReplication for every returned file
if(s.file_type == hdfs::StatInfo::IS_FILE){
state->request_counter++;
fs->SetReplication(s.full_path, state->replication, handlerSetReplication);
}
}
}
//Lock this section because handlerSetReplication might be accessing the same
//shared variables simultaneously
std::lock_guard<std::mutex> guard(state->lock);
if (!status_find.ok() && state->status.ok()){
//We make sure we set state->status only on the first error.
state->status = status_find;
}
if(!has_more_results){
state->find_is_done = true;
if(state->request_counter == 0){
state->handler(state->status); //exit
}
return false;
}
return true;
};
//Asynchronous call to Find
fs->Find(uri.get_path(), "*", hdfs::FileSystem::GetDefaultFindMaxDepth(), handlerFind);
/* block until promise is set */
hdfs::Status status = future.get();
if (!status.ok()) {
std::cerr << "Error: " << status.ToString() << std::endl;
exit(EXIT_FAILURE);
}
// Clean up static data and prevent valgrind memory leaks
google::protobuf::ShutdownProtobufLibrary();
return 0;
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBHDFSPP_TOOLS_HDFS_SET_REPLICATION_STATE
#define LIBHDFSPP_TOOLS_HDFS_SET_REPLICATION_STATE
#include <functional>
#include <mutex>
#include "hdfspp/hdfspp.h"
namespace hdfs::tools {
/**
* {@class SetReplicationState} helps in handling the intermediate results while
* running {@link Setrep}.
*/
struct SetReplicationState {
SetReplicationState(const uint16_t replication,
std::function<void(const hdfs::Status &)> handler,
const uint64_t request_counter, const bool find_is_done)
: replication{replication}, handler{std::move(handler)},
request_counter{request_counter}, find_is_done{find_is_done} {}
/**
* The replication factor.
*/
const uint16_t replication;
/**
* Handle the given {@link hdfs::Status}.
*/
const std::function<void(const hdfs::Status &)> handler;
/**
* The request counter is incremented once every time SetReplication async
* call is made.
*/
uint64_t request_counter;
/**
* This boolean will be set when find returns the last result.
*/
bool find_is_done;
/**
* Final status to be returned.
*/
hdfs::Status status;
/**
* Shared variables will need protection with a lock.
*/
std::mutex lock;
};
} // namespace hdfs::tools
#endif