HDFS-7382. DataNode in secure mode may throw NullPointerException if client connects before DataNode registers itself with NameNode. Contributed by Chris Nauroth.
(cherry picked from commit 9ba8d8c7eb
)
This commit is contained in:
parent
0a5d95f705
commit
98b81c6624
|
@ -1267,6 +1267,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-7226. Fix TestDNFencing.testQueueingWithAppend. (Yongjun Zhang via jing9)
|
HDFS-7226. Fix TestDNFencing.testQueueingWithAppend. (Yongjun Zhang via jing9)
|
||||||
|
|
||||||
|
HDFS-7382. DataNode in secure mode may throw NullPointerException if client
|
||||||
|
connects before DataNode registers itself with NameNode. (cnauroth)
|
||||||
|
|
||||||
Release 2.5.2 - UNRELEASED
|
Release 2.5.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -94,12 +94,14 @@ public class SaslDataTransferServer {
|
||||||
* @param peer connection peer
|
* @param peer connection peer
|
||||||
* @param underlyingOut connection output stream
|
* @param underlyingOut connection output stream
|
||||||
* @param underlyingIn connection input stream
|
* @param underlyingIn connection input stream
|
||||||
|
* @param int xferPort data transfer port of DataNode accepting connection
|
||||||
* @param datanodeId ID of DataNode accepting connection
|
* @param datanodeId ID of DataNode accepting connection
|
||||||
* @return new pair of streams, wrapped after SASL negotiation
|
* @return new pair of streams, wrapped after SASL negotiation
|
||||||
* @throws IOException for any error
|
* @throws IOException for any error
|
||||||
*/
|
*/
|
||||||
public IOStreamPair receive(Peer peer, OutputStream underlyingOut,
|
public IOStreamPair receive(Peer peer, OutputStream underlyingOut,
|
||||||
InputStream underlyingIn, DatanodeID datanodeId) throws IOException {
|
InputStream underlyingIn, int xferPort, DatanodeID datanodeId)
|
||||||
|
throws IOException {
|
||||||
if (dnConf.getEncryptDataTransfer()) {
|
if (dnConf.getEncryptDataTransfer()) {
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"SASL server doing encrypted handshake for peer = {}, datanodeId = {}",
|
"SASL server doing encrypted handshake for peer = {}, datanodeId = {}",
|
||||||
|
@ -110,16 +112,16 @@ public class SaslDataTransferServer {
|
||||||
"SASL server skipping handshake in unsecured configuration for "
|
"SASL server skipping handshake in unsecured configuration for "
|
||||||
+ "peer = {}, datanodeId = {}", peer, datanodeId);
|
+ "peer = {}, datanodeId = {}", peer, datanodeId);
|
||||||
return new IOStreamPair(underlyingIn, underlyingOut);
|
return new IOStreamPair(underlyingIn, underlyingOut);
|
||||||
} else if (datanodeId.getXferPort() < 1024) {
|
} else if (xferPort < 1024) {
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"SASL server skipping handshake in unsecured configuration for "
|
"SASL server skipping handshake in secured configuration for "
|
||||||
+ "peer = {}, datanodeId = {}", peer, datanodeId);
|
+ "peer = {}, datanodeId = {}", peer, datanodeId);
|
||||||
return new IOStreamPair(underlyingIn, underlyingOut);
|
return new IOStreamPair(underlyingIn, underlyingOut);
|
||||||
} else if (dnConf.getSaslPropsResolver() != null) {
|
} else if (dnConf.getSaslPropsResolver() != null) {
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"SASL server doing general handshake for peer = {}, datanodeId = {}",
|
"SASL server doing general handshake for peer = {}, datanodeId = {}",
|
||||||
peer, datanodeId);
|
peer, datanodeId);
|
||||||
return getSaslStreams(peer, underlyingOut, underlyingIn, datanodeId);
|
return getSaslStreams(peer, underlyingOut, underlyingIn);
|
||||||
} else if (dnConf.getIgnoreSecurePortsForTesting()) {
|
} else if (dnConf.getIgnoreSecurePortsForTesting()) {
|
||||||
// It's a secured cluster using non-privileged ports, but no SASL. The
|
// It's a secured cluster using non-privileged ports, but no SASL. The
|
||||||
// only way this can happen is if the DataNode has
|
// only way this can happen is if the DataNode has
|
||||||
|
@ -271,12 +273,11 @@ public class SaslDataTransferServer {
|
||||||
* @param peer connection peer
|
* @param peer connection peer
|
||||||
* @param underlyingOut connection output stream
|
* @param underlyingOut connection output stream
|
||||||
* @param underlyingIn connection input stream
|
* @param underlyingIn connection input stream
|
||||||
* @param datanodeId ID of DataNode accepting connection
|
|
||||||
* @return new pair of streams, wrapped after SASL negotiation
|
* @return new pair of streams, wrapped after SASL negotiation
|
||||||
* @throws IOException for any error
|
* @throws IOException for any error
|
||||||
*/
|
*/
|
||||||
private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
|
private IOStreamPair getSaslStreams(Peer peer, OutputStream underlyingOut,
|
||||||
InputStream underlyingIn, final DatanodeID datanodeId) throws IOException {
|
InputStream underlyingIn) throws IOException {
|
||||||
if (peer.hasSecureChannel() ||
|
if (peer.hasSecureChannel() ||
|
||||||
dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
|
dnConf.getTrustedChannelResolver().isTrusted(getPeerAddress(peer))) {
|
||||||
return new IOStreamPair(underlyingIn, underlyingOut);
|
return new IOStreamPair(underlyingIn, underlyingOut);
|
||||||
|
|
|
@ -182,7 +182,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||||
InputStream input = socketIn;
|
InputStream input = socketIn;
|
||||||
try {
|
try {
|
||||||
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
|
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
|
||||||
socketIn, datanode.getDatanodeId());
|
socketIn, datanode.getXferAddress().getPort(),
|
||||||
|
datanode.getDatanodeId());
|
||||||
input = new BufferedInputStream(saslStreams.in,
|
input = new BufferedInputStream(saslStreams.in,
|
||||||
HdfsConstants.SMALL_BUFFER_SIZE);
|
HdfsConstants.SMALL_BUFFER_SIZE);
|
||||||
socketOut = saslStreams.out;
|
socketOut = saslStreams.out;
|
||||||
|
|
Loading…
Reference in New Issue