HDDS-1120. Add a config to disable checksum verification during read.
This commit is contained in:
parent
490206e4b4
commit
106bdc6c04
|
@ -66,6 +66,7 @@ public class BlockInputStream extends InputStream implements Seekable {
|
|||
private long[] chunkOffset;
|
||||
private List<ByteBuffer> buffers;
|
||||
private int bufferIndex;
|
||||
private final boolean verifyChecksum;
|
||||
|
||||
/**
|
||||
* Creates a new BlockInputStream.
|
||||
|
@ -75,10 +76,12 @@ public class BlockInputStream extends InputStream implements Seekable {
|
|||
* @param xceiverClient client to perform container calls
|
||||
* @param chunks list of chunks to read
|
||||
* @param traceID container protocol call traceID
|
||||
* @param verifyChecksum verify checksum
|
||||
*/
|
||||
public BlockInputStream(
|
||||
BlockID blockID, XceiverClientManager xceiverClientManager,
|
||||
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
|
||||
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID,
|
||||
boolean verifyChecksum) {
|
||||
this.blockID = blockID;
|
||||
this.traceID = traceID;
|
||||
this.xceiverClientManager = xceiverClientManager;
|
||||
|
@ -91,6 +94,7 @@ public class BlockInputStream extends InputStream implements Seekable {
|
|||
initializeChunkOffset();
|
||||
this.buffers = null;
|
||||
this.bufferIndex = 0;
|
||||
this.verifyChecksum = verifyChecksum;
|
||||
}
|
||||
|
||||
private void initializeChunkOffset() {
|
||||
|
@ -318,7 +322,9 @@ public class BlockInputStream extends InputStream implements Seekable {
|
|||
}
|
||||
ChecksumData checksumData =
|
||||
ChecksumData.getFromProtoBuf(chunkInfo.getChecksumData());
|
||||
Checksum.verifyChecksum(byteString, checksumData);
|
||||
if (verifyChecksum) {
|
||||
Checksum.verifyChecksum(byteString, checksumData);
|
||||
}
|
||||
break;
|
||||
} catch (IOException ioe) {
|
||||
// we will end up in this situation only if the checksum mismatch
|
||||
|
|
|
@ -362,6 +362,9 @@ public final class OzoneConfigKeys {
|
|||
public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES =
|
||||
1024 * 1024;
|
||||
public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024;
|
||||
public static final String OZONE_CLIENT_VERIFY_CHECKSUM =
|
||||
"ozone.client.verify.checksum";
|
||||
public static final boolean OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT = true;
|
||||
public static final String OZONE_ACL_AUTHORIZER_CLASS =
|
||||
"ozone.acl.authorizer.class";
|
||||
public static final String OZONE_ACL_AUTHORIZER_CLASS_DEFAULT =
|
||||
|
|
|
@ -1484,6 +1484,15 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ozone.client.verify.checksum</name>
|
||||
<value>true</value>
|
||||
<tag>OZONE, CLIENT, MANAGEMENT</tag>
|
||||
<description>
|
||||
Ozone client to verify checksum of the checksum blocksize data.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ozone.om.ratis.enable</name>
|
||||
<value>false</value>
|
||||
|
|
|
@ -265,7 +265,7 @@ public class KeyInputStream extends InputStream implements Seekable {
|
|||
XceiverClientManager xceiverClientManager,
|
||||
StorageContainerLocationProtocolClientSideTranslatorPB
|
||||
storageContainerLocationClient,
|
||||
String requestId) throws IOException {
|
||||
String requestId, boolean verifyChecksum) throws IOException {
|
||||
long length = 0;
|
||||
long containerKey;
|
||||
KeyInputStream groupInputStream = new KeyInputStream();
|
||||
|
@ -311,7 +311,7 @@ public class KeyInputStream extends InputStream implements Seekable {
|
|||
success = true;
|
||||
BlockInputStream inputStream = new BlockInputStream(
|
||||
omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
|
||||
chunks, requestId);
|
||||
chunks, requestId, verifyChecksum);
|
||||
groupInputStream.addStream(inputStream,
|
||||
omKeyLocationInfo.getLength());
|
||||
} finally {
|
||||
|
|
|
@ -112,6 +112,7 @@ public class RpcClient implements ClientProtocol {
|
|||
private final int chunkSize;
|
||||
private final ChecksumType checksumType;
|
||||
private final int bytesPerChecksum;
|
||||
private boolean verifyChecksum;
|
||||
private final UserGroupInformation ugi;
|
||||
private final OzoneAcl.OzoneACLRights userRights;
|
||||
private final OzoneAcl.OzoneACLRights groupRights;
|
||||
|
@ -198,6 +199,9 @@ public class RpcClient implements ClientProtocol {
|
|||
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
|
||||
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
||||
checksumType = ChecksumType.valueOf(checksumTypeStr);
|
||||
this.verifyChecksum =
|
||||
conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
|
||||
OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
|
||||
}
|
||||
|
||||
private InetSocketAddress getScmAddressForClient() throws IOException {
|
||||
|
@ -648,7 +652,7 @@ public class RpcClient implements ClientProtocol {
|
|||
LengthInputStream lengthInputStream =
|
||||
KeyInputStream.getFromOmKeyInfo(
|
||||
keyInfo, xceiverClientManager, storageContainerLocationClient,
|
||||
requestId);
|
||||
requestId, verifyChecksum);
|
||||
FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
|
||||
if (feInfo != null) {
|
||||
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.client.OzoneQuota;
|
||||
import org.apache.hadoop.hdds.client.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||
|
@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.ozone.HddsDatanodeService;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.OzoneTestUtils;
|
||||
import org.apache.hadoop.ozone.client.BucketArgs;
|
||||
|
@ -763,6 +765,95 @@ public abstract class TestOzoneRpcClientAbstract {
|
|||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReadKeyWithVerifyChecksumFlagEnable() throws Exception {
|
||||
String volumeName = UUID.randomUUID().toString();
|
||||
String bucketName = UUID.randomUUID().toString();
|
||||
String keyName = UUID.randomUUID().toString();
|
||||
|
||||
// Create and corrupt key
|
||||
createAndCorruptKey(volumeName, bucketName, keyName);
|
||||
|
||||
// read corrupt key with verify checksum enabled
|
||||
readCorruptedKey(volumeName, bucketName, keyName, true);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testReadKeyWithVerifyChecksumFlagDisable() throws Exception {
|
||||
String volumeName = UUID.randomUUID().toString();
|
||||
String bucketName = UUID.randomUUID().toString();
|
||||
String keyName = UUID.randomUUID().toString();
|
||||
|
||||
// Create and corrupt key
|
||||
createAndCorruptKey(volumeName, bucketName, keyName);
|
||||
|
||||
// read corrupt key with verify checksum enabled
|
||||
readCorruptedKey(volumeName, bucketName, keyName, false);
|
||||
|
||||
}
|
||||
|
||||
private void createAndCorruptKey(String volumeName, String bucketName,
|
||||
String keyName) throws IOException {
|
||||
String value = "sample value";
|
||||
store.createVolume(volumeName);
|
||||
OzoneVolume volume = store.getVolume(volumeName);
|
||||
volume.createBucket(bucketName);
|
||||
OzoneBucket bucket = volume.getBucket(bucketName);
|
||||
|
||||
// Write data into a key
|
||||
OzoneOutputStream out = bucket.createKey(keyName,
|
||||
value.getBytes().length, ReplicationType.RATIS,
|
||||
ReplicationFactor.ONE, new HashMap<>());
|
||||
out.write(value.getBytes());
|
||||
out.close();
|
||||
|
||||
// We need to find the location of the chunk file corresponding to the
|
||||
// data we just wrote.
|
||||
OzoneKey key = bucket.getKey(keyName);
|
||||
long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
|
||||
.getContainerID();
|
||||
|
||||
// Get the container by traversing the datanodes. Atleast one of the
|
||||
// datanode must have this container.
|
||||
Container container = null;
|
||||
for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) {
|
||||
container = hddsDatanode.getDatanodeStateMachine().getContainer()
|
||||
.getContainerSet().getContainer(containerID);
|
||||
if (container != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull("Container not found", container);
|
||||
corruptData(container, key);
|
||||
}
|
||||
|
||||
|
||||
private void readCorruptedKey(String volumeName, String bucketName,
|
||||
String keyName, boolean verifyChecksum) throws IOException {
|
||||
try {
|
||||
Configuration configuration = cluster.getConf();
|
||||
configuration.setBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
|
||||
verifyChecksum);
|
||||
RpcClient client = new RpcClient(configuration);
|
||||
OzoneInputStream is = client.getKey(volumeName, bucketName, keyName);
|
||||
is.read(new byte[100]);
|
||||
is.close();
|
||||
if (verifyChecksum) {
|
||||
fail("Reading corrupted data should fail, as verify checksum is " +
|
||||
"enabled");
|
||||
}
|
||||
} catch (OzoneChecksumException e) {
|
||||
if (!verifyChecksum) {
|
||||
fail("Reading corrupted data should not fail, as verify checksum is " +
|
||||
"disabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void readKey(OzoneBucket bucket, String keyName, String data)
|
||||
throws IOException {
|
||||
OzoneKey key = bucket.getKey(keyName);
|
||||
|
|
|
@ -88,6 +88,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
private final long blockSize;
|
||||
private final ChecksumType checksumType;
|
||||
private final int bytesPerChecksum;
|
||||
private final boolean verifyChecksum;
|
||||
|
||||
/**
|
||||
* Creates a new DistributedStorageHandler.
|
||||
|
@ -153,7 +154,9 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
|
||||
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
||||
this.checksumType = ChecksumType.valueOf(checksumTypeStr);
|
||||
|
||||
this.verifyChecksum =
|
||||
conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
|
||||
OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -479,7 +482,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
|
||||
return KeyInputStream.getFromOmKeyInfo(
|
||||
keyInfo, xceiverClientManager, storageContainerLocationClient,
|
||||
args.getRequestID());
|
||||
args.getRequestID(), verifyChecksum);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -48,7 +48,8 @@ public class TestChunkStreams {
|
|||
for (int i = 0; i < 5; i++) {
|
||||
int tempOffset = offset;
|
||||
BlockInputStream in =
|
||||
new BlockInputStream(null, null, null, new ArrayList<>(), null) {
|
||||
new BlockInputStream(null, null, null, new ArrayList<>(), null,
|
||||
true) {
|
||||
private long pos = 0;
|
||||
private ByteArrayInputStream in =
|
||||
new ByteArrayInputStream(buf, tempOffset, 100);
|
||||
|
@ -104,7 +105,8 @@ public class TestChunkStreams {
|
|||
for (int i = 0; i < 5; i++) {
|
||||
int tempOffset = offset;
|
||||
BlockInputStream in =
|
||||
new BlockInputStream(null, null, null, new ArrayList<>(), null) {
|
||||
new BlockInputStream(null, null, null, new ArrayList<>(), null,
|
||||
true) {
|
||||
private long pos = 0;
|
||||
private ByteArrayInputStream in =
|
||||
new ByteArrayInputStream(buf, tempOffset, 100);
|
||||
|
|
Loading…
Reference in New Issue