HDFS-12931. Handle InvalidEncryptionKeyException during DistributedFileSystem#getFileChecksum. Contributed by Mukul Kumar Singh.

This commit is contained in:
Xiaoyu Yao 2018-01-03 14:54:20 -08:00
parent 4379113bda
commit 3ba985997d
2 changed files with 65 additions and 2 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
@ -303,8 +304,7 @@ final class FileChecksumHelper {
* Return true when sounds good to continue or retry, false when severe
* condition or totally failed.
*/
private boolean checksumBlock(
LocatedBlock locatedBlock) throws IOException {
private boolean checksumBlock(LocatedBlock locatedBlock) {
ExtendedBlock block = locatedBlock.getBlock();
if (getRemaining() < block.getNumBytes()) {
block.setNumBytes(getRemaining());
@ -334,6 +334,17 @@ final class FileChecksumHelper {
blockIdx--; // repeat at blockIdx-th block
setRefetchBlocks(true);
}
} catch (InvalidEncryptionKeyException iee) {
if (blockIdx > getLastRetriedIndex()) {
LOG.debug("Got invalid encryption key error in response to "
+ "OP_BLOCK_CHECKSUM for file {} for block {} from "
+ "datanode {}. Will retry " + "the block once.",
getSrc(), block, datanodes[j]);
setLastRetriedIndex(blockIdx);
done = true; // actually it's not done; but we'll retry
blockIdx--; // repeat at i-th block
getClient().clearDataEncryptionKey();
}
} catch (IOException ie) {
LOG.warn("src={}" + ", datanodes[{}]={}",
getSrc(), j, datanodes[j], ie);

View File

@ -59,6 +59,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.Assert;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -318,6 +319,57 @@ public class TestEncryptedTransfer {
assertEquals(checksum, fs.getFileChecksum(TEST_PATH));
}
@Test
public void testFileChecksumWithInvalidEncryptionKey()
throws IOException, InterruptedException, TimeoutException {
if (resolverClazz != null) {
// TestTrustedChannelResolver does not use encryption keys.
return;
}
setEncryptionConfigKeys();
cluster = new MiniDFSCluster.Builder(conf).build();
fs = getFileSystem(conf);
DFSClient client = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
DFSClient spyClient = Mockito.spy(client);
DFSClientAdapter.setDFSClient((DistributedFileSystem) fs, spyClient);
writeTestDataToFile(fs);
FileChecksum checksum = fs.getFileChecksum(TEST_PATH);
BlockTokenSecretManager btsm = cluster.getNamesystem().getBlockManager()
.getBlockTokenSecretManager();
// Reduce key update interval and token life for testing.
btsm.setKeyUpdateIntervalForTesting(2 * 1000);
btsm.setTokenLifetime(2 * 1000);
btsm.clearAllKeysForTesting();
// Wait until the encryption key becomes invalid.
LOG.info("Wait until encryption keys become invalid...");
DataEncryptionKey encryptionKey = spyClient.getEncryptionKey();
List<DataNode> dataNodes = cluster.getDataNodes();
for (DataNode dn: dataNodes) {
GenericTestUtils.waitFor(
new Supplier<Boolean>() {
@Override
public Boolean get() {
return !dn.getBlockPoolTokenSecretManager().
get(encryptionKey.blockPoolId)
.hasKey(encryptionKey.keyId);
}
}, 100, 30*1000
);
}
LOG.info("The encryption key is invalid on all nodes now.");
fs.getFileChecksum(TEST_PATH);
// verify that InvalidEncryptionKeyException is handled properly
Assert.assertTrue(client.getEncryptionKey() == null);
Mockito.verify(spyClient, times(1)).clearDataEncryptionKey();
// Retry the operation after clearing the encryption key
FileChecksum verifyChecksum = fs.getFileChecksum(TEST_PATH);
Assert.assertEquals(checksum, verifyChecksum);
}
@Test
public void testLongLivedClientPipelineRecovery()
throws IOException, InterruptedException, TimeoutException {