HDFS-8775. SASL support for data transfer protocol in libhdfspp. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-07-20 14:31:01 -07:00 committed by James Clampffer
parent 26d9f6cee3
commit bccc640648
11 changed files with 635 additions and 10 deletions

View File

@ -1 +1 @@
add_library(common base64.cc status.cc)
add_library(common base64.cc status.cc sasl_digest_md5.cc)

View File

@ -20,6 +20,8 @@
#include "common/util.h"
#include <asio/read.hpp>
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_COMMON_SASL_AUTHENTICATOR_H_
#define LIB_COMMON_SASL_AUTHENTICATOR_H_
#include "libhdfspp/status.h"
namespace hdfs {
class DigestMD5AuthenticatorTest_TestResponse_Test;
/**
* A specialized implementation of RFC 2831 for the HDFS
* DataTransferProtocol.
*
* The current lacks the following features:
* * Encoding the username, realm, and password in ISO-8859-1 when
* it is required by the RFC. They are always encoded in UTF-8.
* * Checking whether the challenges from the server are
* well-formed.
* * Specifying authzid, digest-uri and maximum buffer size.
* * Supporting QOP other than the auth level.
**/
class DigestMD5Authenticator {
public:
Status EvaluateResponse(const std::string &payload, std::string *result);
DigestMD5Authenticator(const std::string &username,
const std::string &password, bool mock_nonce = false);
private:
Status GenerateFirstResponse(std::string *result);
Status GenerateResponseValue(std::string *response_value);
Status ParseFirstChallenge(const std::string &payload);
static size_t NextToken(const std::string &payload, size_t off,
std::string *tok);
void GenerateCNonce();
std::string username_;
std::string password_;
std::string nonce_;
std::string cnonce_;
std::string realm_;
std::string qop_;
unsigned nonce_count_;
const bool TEST_mock_cnonce_;
friend class DigestMD5AuthenticatorTest_TestResponse_Test;
};
}
#endif

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "sasl_authenticator.h"
#include "common/util.h"
#include <openssl/rand.h>
#include <openssl/md5.h>
#include <iomanip>
#include <map>
#include <sstream>
namespace hdfs {
static std::string QuoteString(const std::string &src);
static std::string GetMD5Digest(const std::string &src);
static std::string BinaryToHex(const std::string &src);
static const char kDigestUri[] = "hdfs/0";
static const size_t kMaxBufferSize = 65536;
DigestMD5Authenticator::DigestMD5Authenticator(const std::string &username,
const std::string &password,
bool mock_nonce)
: username_(username), password_(password), nonce_count_(0),
TEST_mock_cnonce_(mock_nonce) {}
Status DigestMD5Authenticator::EvaluateResponse(const std::string &payload,
std::string *result) {
Status status = ParseFirstChallenge(payload);
if (status.ok()) {
status = GenerateFirstResponse(result);
}
return status;
}
size_t DigestMD5Authenticator::NextToken(const std::string &payload, size_t off,
std::string *tok) {
tok->clear();
if (off >= payload.size()) {
return std::string::npos;
}
char c = payload[off];
if (c == '=' || c == ',') {
*tok = c;
return off + 1;
}
int quote_count = 0;
for (; off < payload.size(); ++off) {
char c = payload[off];
if (c == '"') {
++quote_count;
if (quote_count == 2) {
return off + 1;
}
continue;
}
if (c == '=') {
if (quote_count) {
tok->append(&c, 1);
} else {
break;
}
} else if (('0' <= c && c <= '9') || ('a' <= c && c <= 'z') ||
('A' <= c && c <= 'Z') || c == '+' || c == '/' || c == '-' ||
c == '_' || c == '@') {
tok->append(&c, 1);
} else {
break;
}
}
return off;
}
void DigestMD5Authenticator::GenerateCNonce() {
if (!TEST_mock_cnonce_) {
char buf[8];
RAND_pseudo_bytes(reinterpret_cast<unsigned char *>(buf), sizeof(buf));
cnonce_ = Base64Encode(std::string(buf, sizeof(buf)));
}
}
Status DigestMD5Authenticator::ParseFirstChallenge(const std::string &payload) {
std::map<std::string, std::string> props;
std::string token;
enum {
kStateLVal,
kStateEqual,
kStateRVal,
kStateCommaOrEnd,
};
int state = kStateLVal;
std::string lval, rval;
size_t off = 0;
while (true) {
off = NextToken(payload, off, &token);
if (off == std::string::npos) {
break;
}
switch (state) {
case kStateLVal:
lval = token;
state = kStateEqual;
break;
case kStateEqual:
state = kStateRVal;
break;
case kStateRVal:
rval = token;
props[lval] = rval;
state = kStateCommaOrEnd;
break;
case kStateCommaOrEnd:
state = kStateLVal;
break;
}
}
if (props["algorithm"] != "md5-sess" || props["charset"] != "utf-8" ||
props.find("nonce") == props.end()) {
return Status::Error("Invalid challenge");
}
realm_ = props["realm"];
nonce_ = props["nonce"];
qop_ = props["qop"];
return Status::OK();
}
Status DigestMD5Authenticator::GenerateFirstResponse(std::string *result) {
// TODO: Support auth-int and auth-conf
// Handle cipher
if (qop_ != "auth") {
return Status::Unimplemented();
}
std::stringstream ss;
GenerateCNonce();
ss << "charset=utf-8,username=\"" << QuoteString(username_) << "\""
<< ",authzid=\"" << QuoteString(username_) << "\""
<< ",nonce=\"" << QuoteString(nonce_) << "\""
<< ",digest-uri=\"" << kDigestUri << "\""
<< ",maxbuf=" << kMaxBufferSize << ",cnonce=\"" << cnonce_ << "\"";
if (realm_.size()) {
ss << ",realm=\"" << QuoteString(realm_) << "\"";
}
ss << ",nc=" << std::hex << std::setw(8) << std::setfill('0')
<< ++nonce_count_;
std::string response_value;
GenerateResponseValue(&response_value);
ss << ",response=" << response_value;
*result = ss.str();
return result->size() > 4096 ? Status::Error("Response too big")
: Status::OK();
}
/**
* Generate the response value specified in S 2.1.2.1 in RFC2831.
**/
Status
DigestMD5Authenticator::GenerateResponseValue(std::string *response_value) {
std::stringstream begin_a1, a1_ss;
std::string a1, a2;
if (qop_ == "auth") {
a2 = std::string("AUTHENTICATE:") + kDigestUri;
} else {
a2 = std::string("AUTHENTICATE:") + kDigestUri +
":00000000000000000000000000000000";
}
begin_a1 << username_ << ":" << realm_ << ":" << password_;
a1_ss << GetMD5Digest(begin_a1.str()) << ":" << nonce_ << ":" << cnonce_
<< ":" << username_;
std::stringstream combine_ss;
combine_ss << BinaryToHex(GetMD5Digest(a1_ss.str())) << ":" << nonce_ << ":"
<< std::hex << std::setw(8) << std::setfill('0') << nonce_count_
<< ":" << cnonce_ << ":" << qop_ << ":"
<< BinaryToHex(GetMD5Digest(a2));
*response_value = BinaryToHex(GetMD5Digest(combine_ss.str()));
return Status::OK();
}
static std::string QuoteString(const std::string &src) {
std::string dst;
dst.resize(2 * src.size());
size_t j = 0;
for (size_t i = 0; i < src.size(); ++i) {
if (src[i] == '"') {
dst[j++] = '\\';
}
dst[j++] = src[i];
}
dst.resize(j);
return dst;
}
static std::string GetMD5Digest(const std::string &src) {
MD5_CTX ctx;
unsigned long long res[2];
MD5_Init(&ctx);
MD5_Update(&ctx, src.c_str(), src.size());
MD5_Final(reinterpret_cast<unsigned char *>(res), &ctx);
return std::string(reinterpret_cast<char *>(res), sizeof(res));
}
static std::string BinaryToHex(const std::string &src) {
std::stringstream ss;
ss << std::hex << std::setfill('0');
for (size_t i = 0; i < src.size(); ++i) {
unsigned c = (unsigned)(static_cast<unsigned char>(src[i]));
ss << std::setw(2) << c;
}
return ss.str();
}
}

View File

@ -16,5 +16,5 @@
# limitations under the License.
#
add_library(reader remote_block_reader.cc)
add_library(reader remote_block_reader.cc datatransfer.cc)
add_dependencies(reader proto)

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "datatransfer.h"
#include "libhdfspp/status.h"
namespace hdfs {
namespace DataTransferSaslStreamUtil {
static const auto kSUCCESS = hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS;
using hadoop::hdfs::DataTransferEncryptorMessageProto;
Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, std::string *payload) {
using namespace hadoop::hdfs;
auto s = msg->status();
if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY) {
payload->clear();
return Status::Exception("InvalidEncryptionKeyException", msg->message().c_str());
} else if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) {
payload->clear();
return Status::Error(msg->message().c_str());
} else {
*payload = msg->payload();
return Status::OK();
}
}
void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) {
msg->set_status(kSUCCESS);
msg->set_payload("");
}
}
}

View File

@ -15,8 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef COMMON_DATA_TRANSFER_H_
#define COMMON_DATA_TRANSFER_H_
#ifndef LIB_READER_DATA_TRANSFER_H_
#define LIB_READER_DATA_TRANSFER_H_
#include "common/sasl_authenticator.h"
namespace hdfs {
@ -30,6 +32,32 @@ enum Operation {
kReadBlock = 81,
};
template <class Stream> class DataTransferSaslStream {
public:
DataTransferSaslStream(Stream *stream, const std::string &username,
const std::string &password)
: stream_(stream), authenticator_(username, password) {}
template <class Handler> void Handshake(const Handler &next);
template <class MutableBufferSequence, class ReadHandler>
void async_read_some(const MutableBufferSequence &buffers,
ReadHandler &&handler);
template <class ConstBufferSequence, class WriteHandler>
void async_write_some(const ConstBufferSequence &buffers,
WriteHandler &&handler);
private:
DataTransferSaslStream(const DataTransferSaslStream &) = delete;
DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete;
Stream *stream_;
DigestMD5Authenticator authenticator_;
struct ReadSaslMessage;
struct Authenticator;
};
}
#include "datatransfer_impl.h"
#endif

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_READER_DATATRANFER_IMPL_H_
#define LIB_READER_DATATRANFER_IMPL_H_
#include "datatransfer.pb.h"
#include "common/continuation/continuation.h"
#include "common/continuation/asio.h"
#include "common/continuation/protobuf.h"
#include <asio/read.hpp>
#include <asio/buffer.hpp>
namespace hdfs {
namespace DataTransferSaslStreamUtil {
Status
ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg,
std::string *payload);
void PrepareInitialHandshake(
::hadoop::hdfs::DataTransferEncryptorMessageProto *msg);
}
template <class Stream>
struct DataTransferSaslStream<Stream>::Authenticator
: continuation::Continuation {
Authenticator(DigestMD5Authenticator *authenticator,
const std::string *request,
hadoop::hdfs::DataTransferEncryptorMessageProto *msg)
: authenticator_(authenticator), request_(request), msg_(msg) {}
virtual void Run(const Next &next) override {
using namespace ::hadoop::hdfs;
std::string response;
Status status = authenticator_->EvaluateResponse(*request_, &response);
msg_->Clear();
if (status.ok()) {
// TODO: Handle encryption scheme
msg_->set_payload(response);
msg_->set_status(
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
} else {
msg_->set_status(
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR);
}
next(Status::OK());
}
private:
DigestMD5Authenticator *authenticator_;
const std::string *request_;
hadoop::hdfs::DataTransferEncryptorMessageProto *msg_;
};
template <class Stream>
struct DataTransferSaslStream<Stream>::ReadSaslMessage
: continuation::Continuation {
ReadSaslMessage(Stream *stream, std::string *data)
: stream_(stream), data_(data), read_pb_(stream, &resp_) {}
virtual void Run(const Next &next) override {
auto handler = [this, next](const Status &status) {
if (status.ok()) {
Status new_stat =
DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_);
next(new_stat);
} else {
next(status);
}
};
read_pb_.Run(handler);
}
private:
Stream *stream_;
std::string *data_;
hadoop::hdfs::DataTransferEncryptorMessageProto resp_;
continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_;
};
template <class Stream>
template <class Handler>
void DataTransferSaslStream<Stream>::Handshake(const Handler &next) {
using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
using ::hdfs::continuation::Write;
using ::hdfs::continuation::WriteDelimitedPBMessage;
static const int kMagicNumber = htonl(kDataTransferSasl);
static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer(
reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber));
struct State {
DataTransferEncryptorMessageProto req0;
std::string resp0;
DataTransferEncryptorMessageProto req1;
std::string resp1;
Stream *stream;
};
auto m = continuation::Pipeline<State>::Create();
State *s = &m->state();
s->stream = stream_;
DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0);
m->Push(Write(stream_, kMagicNumberBuffer))
.Push(WriteDelimitedPBMessage(stream_, &s->req0))
.Push(new ReadSaslMessage(stream_, &s->resp0))
.Push(new Authenticator(&authenticator_, &s->resp0, &s->req1))
.Push(WriteDelimitedPBMessage(stream_, &s->req1))
.Push(new ReadSaslMessage(stream_, &s->resp1));
m->Run([next](const Status &status, const State &) { next(status); });
}
template <class Stream>
template <class MutableBufferSequence, class ReadHandler>
void DataTransferSaslStream<Stream>::async_read_some(
const MutableBufferSequence &buffers, ReadHandler &&handler) {
stream_->async_read_some(buffers, handler);
}
template <class Stream>
template <typename ConstBufferSequence, typename WriteHandler>
void DataTransferSaslStream<Stream>::async_write_some(
const ConstBufferSequence &buffers, WriteHandler &&handler) {
stream_->async_write_some(buffers, handler);
}
}
#endif

View File

@ -17,6 +17,11 @@
#
add_library(test_common OBJECT mock_connection.cc)
add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>)
target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} gmock_main)
target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main)
add_test(remote_block_reader remote_block_reader_test)
add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc)
target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main)
add_test(sasl_digest_md5 sasl_digest_md5_test)

View File

@ -32,6 +32,7 @@ using namespace hdfs;
using ::hadoop::common::TokenProto;
using ::hadoop::hdfs::BlockOpResponseProto;
using ::hadoop::hdfs::ChecksumProto;
using ::hadoop::hdfs::DataTransferEncryptorMessageProto;
using ::hadoop::hdfs::ExtendedBlockProto;
using ::hadoop::hdfs::PacketHeaderProto;
using ::hadoop::hdfs::ReadOpChecksumInfoProto;
@ -90,13 +91,14 @@ ProducePacket(const std::string &data, const std::string &checksum,
return std::make_pair(error_code(), std::move(payload));
}
static std::shared_ptr<RemoteBlockReader<MockDNConnection>>
ReadContent(MockDNConnection *conn, TokenProto *token,
template<class Stream = MockDNConnection>
static std::shared_ptr<RemoteBlockReader<Stream>>
ReadContent(Stream *conn, TokenProto *token,
const ExtendedBlockProto &block, uint64_t length, uint64_t offset,
const mutable_buffers_1 &buf, Status *status, size_t *transferred) {
BlockReaderOptions options;
auto reader =
std::make_shared<RemoteBlockReader<MockDNConnection>>(options, conn);
std::make_shared<RemoteBlockReader<Stream>>(options, conn);
Status result;
reader->async_connect(
"libhdfs++", token, &block, length, offset,
@ -121,7 +123,6 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) {
BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
EXPECT_CALL(conn, Produce())
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
@ -205,6 +206,49 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) {
});
}
TEST(RemoteBlockReaderTest, TestSaslConnection) {
static const size_t kChunkSize = 512;
static const string kChunkData(kChunkSize, 'a');
static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
"qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\","
"charset=utf-8,algorithm=md5-sess";
MockDNConnection conn;
BlockOpResponseProto block_op_resp;
block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS);
DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1;
sasl_resp0.set_status(
::hadoop::hdfs::
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
sasl_resp0.set_payload(kAuthPayload);
sasl_resp1.set_status(
::hadoop::hdfs::
DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS);
EXPECT_CALL(conn, Produce())
.WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0))))
.WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1))))
.WillOnce(Return(Produce(ToDelimitedString(&block_op_resp))))
.WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true)));
DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar");
ExtendedBlockProto block;
std::string data(kChunkSize, 0);
size_t transferred = 0;
Status stat;
sasl_conn.Handshake([&stat](const Status &s) {
stat = s;
});
ASSERT_TRUE(stat.ok());
ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0,
buffer(const_cast<char *>(data.c_str()), data.size()), &stat,
&transferred);
ASSERT_TRUE(stat.ok());
ASSERT_EQ(kChunkSize, transferred);
ASSERT_EQ(kChunkData, data);
}
int main(int argc, char *argv[]) {
// The following line must be executed to initialize Google Mock
// (and Google Test) before running the tests.

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/sasl_authenticator.h"
#include <gtest/gtest.h>
namespace hdfs {
/**
* Testing whether the authenticator generates the MD5 digest correctly.
**/
TEST(DigestMD5AuthenticatorTest, TestResponse) {
const std::string username = "igFLnEx4OIx5PZWHAAAABGhtYWkAAAAoQlAtMTM3MDQ2OTk"
"zLTE5Mi4xNjguMS4yMjctMTQyNDIyMDM4MTM2M4xAAAABAQ"
"RSRUFE";
const std::string password = "K5IFUibAynVVrApeCXLrBk9Sro8=";
DigestMD5Authenticator auth(username, password, true);
auth.cnonce_ = "KQlJwBDTseCHpAkFLZls4WcAktp6r5wTzje5feLY";
std::string result;
Status status =
auth.EvaluateResponse("realm=\"0\",nonce=\"+GAWc+O6yEAWpew/"
"qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\",charset="
"utf-8,algorithm=md5-sess",
&result);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(result.find("response=3a286c2c385b92a06ebc66d58b8c4330") !=
std::string::npos);
}
}