HDDS-284. Checksum for ChunksData.
This commit is contained in:
parent
085f10e75d
commit
64a4b6b08b
|
@ -18,6 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdds.scm.storage;
|
package org.apache.hadoop.hdds.scm.storage;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
|
.StorageContainerException;
|
||||||
|
import org.apache.hadoop.ozone.common.Checksum;
|
||||||
|
import org.apache.hadoop.ozone.common.ChecksumData;
|
||||||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.hadoop.fs.Seekable;
|
import org.apache.hadoop.fs.Seekable;
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||||
|
@ -206,6 +210,9 @@ public class ChunkInputStream extends InputStream implements Seekable {
|
||||||
readChunkResponse = ContainerProtocolCalls
|
readChunkResponse = ContainerProtocolCalls
|
||||||
.readChunk(xceiverClient, chunkInfo, blockID, traceID);
|
.readChunk(xceiverClient, chunkInfo, blockID, traceID);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
if (e instanceof StorageContainerException) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
|
throw new IOException("Unexpected OzoneException: " + e.toString(), e);
|
||||||
}
|
}
|
||||||
ByteString byteString = readChunkResponse.getData();
|
ByteString byteString = readChunkResponse.getData();
|
||||||
|
@ -215,6 +222,10 @@ public class ChunkInputStream extends InputStream implements Seekable {
|
||||||
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
|
.format("Inconsistent read for chunk=%s len=%d bytesRead=%d",
|
||||||
chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
|
chunkInfo.getChunkName(), chunkInfo.getLen(), byteString.size()));
|
||||||
}
|
}
|
||||||
|
ChecksumData checksumData = ChecksumData.getFromProtoBuf(
|
||||||
|
chunkInfo.getChecksumData());
|
||||||
|
Checksum.verifyChecksum(byteString, checksumData);
|
||||||
|
|
||||||
buffers = byteString.asReadOnlyByteBufferList();
|
buffers = byteString.asReadOnlyByteBufferList();
|
||||||
bufferIndex = 0;
|
bufferIndex = 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,9 @@ import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
|
import org.apache.hadoop.hdds.scm.XceiverClientAsyncReply;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||||
|
import org.apache.hadoop.ozone.common.Checksum;
|
||||||
|
import org.apache.hadoop.ozone.common.ChecksumData;
|
||||||
|
import org.apache.hadoop.ozone.common.OzoneChecksumException;
|
||||||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
import org.apache.hadoop.hdds.scm.XceiverClientManager;
|
||||||
|
@ -74,6 +77,7 @@ public class ChunkOutputStream extends OutputStream {
|
||||||
private final BlockData.Builder containerBlockData;
|
private final BlockData.Builder containerBlockData;
|
||||||
private XceiverClientManager xceiverClientManager;
|
private XceiverClientManager xceiverClientManager;
|
||||||
private XceiverClientSpi xceiverClient;
|
private XceiverClientSpi xceiverClient;
|
||||||
|
private final Checksum checksum;
|
||||||
private final String streamId;
|
private final String streamId;
|
||||||
private int chunkIndex;
|
private int chunkIndex;
|
||||||
private int chunkSize;
|
private int chunkSize;
|
||||||
|
@ -113,7 +117,8 @@ public class ChunkOutputStream extends OutputStream {
|
||||||
public ChunkOutputStream(BlockID blockID, String key,
|
public ChunkOutputStream(BlockID blockID, String key,
|
||||||
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
|
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
|
||||||
String traceID, int chunkSize, long streamBufferFlushSize,
|
String traceID, int chunkSize, long streamBufferFlushSize,
|
||||||
long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer) {
|
long streamBufferMaxSize, long watchTimeout, ByteBuffer buffer,
|
||||||
|
Checksum checksum) {
|
||||||
this.blockID = blockID;
|
this.blockID = blockID;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.traceID = traceID;
|
this.traceID = traceID;
|
||||||
|
@ -132,6 +137,7 @@ public class ChunkOutputStream extends OutputStream {
|
||||||
this.watchTimeout = watchTimeout;
|
this.watchTimeout = watchTimeout;
|
||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
this.ioException = null;
|
this.ioException = null;
|
||||||
|
this.checksum = checksum;
|
||||||
|
|
||||||
// A single thread executor handle the responses of async requests
|
// A single thread executor handle the responses of async requests
|
||||||
responseExecutor = Executors.newSingleThreadExecutor();
|
responseExecutor = Executors.newSingleThreadExecutor();
|
||||||
|
@ -474,13 +480,20 @@ public class ChunkOutputStream extends OutputStream {
|
||||||
* information to be used later in putKey call.
|
* information to be used later in putKey call.
|
||||||
*
|
*
|
||||||
* @throws IOException if there is an I/O error while performing the call
|
* @throws IOException if there is an I/O error while performing the call
|
||||||
|
* @throws OzoneChecksumException if there is an error while computing
|
||||||
|
* checksum
|
||||||
*/
|
*/
|
||||||
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
|
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
|
||||||
int effectiveChunkSize = chunk.remaining();
|
int effectiveChunkSize = chunk.remaining();
|
||||||
ByteString data = ByteString.copyFrom(chunk);
|
ByteString data = ByteString.copyFrom(chunk);
|
||||||
ChunkInfo chunkInfo = ChunkInfo.newBuilder().setChunkName(
|
ChecksumData checksumData = checksum.computeChecksum(data);
|
||||||
DigestUtils.md5Hex(key) + "_stream_" + streamId + "_chunk_"
|
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
|
||||||
+ ++chunkIndex).setOffset(0).setLen(effectiveChunkSize).build();
|
.setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId +
|
||||||
|
"_chunk_" + ++chunkIndex)
|
||||||
|
.setOffset(0)
|
||||||
|
.setLen(effectiveChunkSize)
|
||||||
|
.setChecksumData(checksumData.getProtoBufMessage())
|
||||||
|
.build();
|
||||||
// generate a unique requestId
|
// generate a unique requestId
|
||||||
String requestId =
|
String requestId =
|
||||||
traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
|
traceID + ContainerProtos.Type.WriteChunk + chunkIndex + chunkInfo
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -322,6 +322,18 @@ public final class OzoneConfigKeys {
|
||||||
public static final String OZONE_CONTAINER_COPY_WORKDIR =
|
public static final String OZONE_CONTAINER_COPY_WORKDIR =
|
||||||
"hdds.datanode.replication.work.dir";
|
"hdds.datanode.replication.work.dir";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Config properties to set client side checksum properties.
|
||||||
|
*/
|
||||||
|
public static final String OZONE_CLIENT_CHECKSUM_TYPE =
|
||||||
|
"ozone.client.checksum.type";
|
||||||
|
public static final String OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT = "SHA256";
|
||||||
|
public static final String OZONE_CLIENT_BYTES_PER_CHECKSUM =
|
||||||
|
"ozone.client.bytes.per.checksum";
|
||||||
|
public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT =
|
||||||
|
1024 * 1024; // 1 MB
|
||||||
|
public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* There is no need to instantiate this class.
|
* There is no need to instantiate this class.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,239 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.common;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
import java.security.MessageDigest;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
|
||||||
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.util.PureJavaCrc32;
|
||||||
|
import org.apache.hadoop.util.PureJavaCrc32C;
|
||||||
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to compute and verify checksums for chunks.
|
||||||
|
*/
|
||||||
|
public class Checksum {
|
||||||
|
|
||||||
|
public static final Logger LOG = LoggerFactory.getLogger(Checksum.class);
|
||||||
|
|
||||||
|
private final ChecksumType checksumType;
|
||||||
|
private final int bytesPerChecksum;
|
||||||
|
|
||||||
|
private PureJavaCrc32 crc32Checksum;
|
||||||
|
private PureJavaCrc32C crc32cChecksum;
|
||||||
|
private MessageDigest sha;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a Checksum object.
|
||||||
|
* @param type type of Checksum
|
||||||
|
* @param bytesPerChecksum number of bytes of data per checksum
|
||||||
|
*/
|
||||||
|
public Checksum(ChecksumType type, int bytesPerChecksum) {
|
||||||
|
this.checksumType = type;
|
||||||
|
this.bytesPerChecksum = bytesPerChecksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a Checksum object with default ChecksumType and default
|
||||||
|
* BytesPerChecksum.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public Checksum() {
|
||||||
|
this.checksumType = ChecksumType.valueOf(
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
||||||
|
this.bytesPerChecksum = OzoneConfigKeys
|
||||||
|
.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes checksum for give data.
|
||||||
|
* @param byteString input data in the form of ByteString.
|
||||||
|
* @return ChecksumData computed for input data.
|
||||||
|
*/
|
||||||
|
public ChecksumData computeChecksum(ByteString byteString)
|
||||||
|
throws OzoneChecksumException {
|
||||||
|
return computeChecksum(byteString.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes checksum for give data.
|
||||||
|
* @param data input data in the form of byte array.
|
||||||
|
* @return ChecksumData computed for input data.
|
||||||
|
*/
|
||||||
|
public ChecksumData computeChecksum(byte[] data)
|
||||||
|
throws OzoneChecksumException {
|
||||||
|
ChecksumData checksumData = new ChecksumData(this.checksumType, this
|
||||||
|
.bytesPerChecksum);
|
||||||
|
if (checksumType == ChecksumType.NONE) {
|
||||||
|
// Since type is set to NONE, we do not need to compute the checksums
|
||||||
|
return checksumData;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (checksumType) {
|
||||||
|
case CRC32:
|
||||||
|
crc32Checksum = new PureJavaCrc32();
|
||||||
|
break;
|
||||||
|
case CRC32C:
|
||||||
|
crc32cChecksum = new PureJavaCrc32C();
|
||||||
|
break;
|
||||||
|
case SHA256:
|
||||||
|
try {
|
||||||
|
sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new OzoneChecksumException(OzoneConsts.FILE_HASH, e);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case MD5:
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new OzoneChecksumException(checksumType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute number of checksums needs for given data length based on bytes
|
||||||
|
// per checksum.
|
||||||
|
int dataSize = data.length;
|
||||||
|
int numChecksums = (dataSize + bytesPerChecksum - 1) / bytesPerChecksum;
|
||||||
|
|
||||||
|
// Checksum is computed for each bytesPerChecksum number of bytes of data
|
||||||
|
// starting at offset 0. The last checksum might be computed for the
|
||||||
|
// remaining data with length less than bytesPerChecksum.
|
||||||
|
List<ByteString> checksumList = new ArrayList<>(numChecksums);
|
||||||
|
for (int index = 0; index < numChecksums; index++) {
|
||||||
|
checksumList.add(computeChecksumAtIndex(data, index));
|
||||||
|
}
|
||||||
|
checksumData.setChecksums(checksumList);
|
||||||
|
|
||||||
|
return checksumData;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes checksum based on checksumType for a data block at given index
|
||||||
|
* and a max length of bytesPerChecksum.
|
||||||
|
* @param data input data
|
||||||
|
* @param index index to compute the offset from where data must be read
|
||||||
|
* @return computed checksum ByteString
|
||||||
|
* @throws OzoneChecksumException thrown when ChecksumType is not recognized
|
||||||
|
*/
|
||||||
|
private ByteString computeChecksumAtIndex(byte[] data, int index)
|
||||||
|
throws OzoneChecksumException {
|
||||||
|
int offset = index * bytesPerChecksum;
|
||||||
|
int len = bytesPerChecksum;
|
||||||
|
if ((offset + len) > data.length) {
|
||||||
|
len = data.length - offset;
|
||||||
|
}
|
||||||
|
byte[] checksumBytes = null;
|
||||||
|
switch (checksumType) {
|
||||||
|
case CRC32:
|
||||||
|
checksumBytes = computeCRC32Checksum(data, offset, len);
|
||||||
|
break;
|
||||||
|
case CRC32C:
|
||||||
|
checksumBytes = computeCRC32CChecksum(data, offset, len);
|
||||||
|
break;
|
||||||
|
case SHA256:
|
||||||
|
checksumBytes = computeSHA256Checksum(data, offset, len);
|
||||||
|
break;
|
||||||
|
case MD5:
|
||||||
|
checksumBytes = computeMD5Checksum(data, offset, len);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new OzoneChecksumException(checksumType);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ByteString.copyFrom(checksumBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes CRC32 checksum.
|
||||||
|
*/
|
||||||
|
private byte[] computeCRC32Checksum(byte[] data, int offset, int len) {
|
||||||
|
crc32Checksum.reset();
|
||||||
|
crc32Checksum.update(data, offset, len);
|
||||||
|
return Longs.toByteArray(crc32Checksum.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes CRC32C checksum.
|
||||||
|
*/
|
||||||
|
private byte[] computeCRC32CChecksum(byte[] data, int offset, int len) {
|
||||||
|
crc32cChecksum.reset();
|
||||||
|
crc32cChecksum.update(data, offset, len);
|
||||||
|
return Longs.toByteArray(crc32cChecksum.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes SHA-256 checksum.
|
||||||
|
*/
|
||||||
|
private byte[] computeSHA256Checksum(byte[] data, int offset, int len) {
|
||||||
|
sha.reset();
|
||||||
|
sha.update(data, offset, len);
|
||||||
|
return sha.digest();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes MD5 checksum.
|
||||||
|
*/
|
||||||
|
private byte[] computeMD5Checksum(byte[] data, int offset, int len) {
|
||||||
|
MD5Hash md5out = MD5Hash.digest(data, offset, len);
|
||||||
|
return md5out.getDigest();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes the ChecksumData for the input data and verifies that it
|
||||||
|
* matches with that of the input checksumData.
|
||||||
|
* @param byteString input data
|
||||||
|
* @param checksumData checksumData to match with
|
||||||
|
* @throws OzoneChecksumException is thrown if checksums do not match
|
||||||
|
*/
|
||||||
|
public static boolean verifyChecksum(
|
||||||
|
ByteString byteString, ChecksumData checksumData)
|
||||||
|
throws OzoneChecksumException {
|
||||||
|
return verifyChecksum(byteString.toByteArray(), checksumData);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes the ChecksumData for the input data and verifies that it
|
||||||
|
* matches with that of the input checksumData.
|
||||||
|
* @param data input data
|
||||||
|
* @param checksumData checksumData to match with
|
||||||
|
* @throws OzoneChecksumException is thrown if checksums do not match
|
||||||
|
*/
|
||||||
|
public static boolean verifyChecksum(byte[] data, ChecksumData checksumData)
|
||||||
|
throws OzoneChecksumException {
|
||||||
|
ChecksumType checksumType = checksumData.getChecksumType();
|
||||||
|
if (checksumType == ChecksumType.NONE) {
|
||||||
|
// Checksum is set to NONE. No further verification is required.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int bytesPerChecksum = checksumData.getBytesPerChecksum();
|
||||||
|
Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
|
||||||
|
ChecksumData computedChecksumData = checksum.computeChecksum(data);
|
||||||
|
|
||||||
|
return checksumData.verifyChecksumDataMatches(computedChecksumData);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,190 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.common;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.commons.lang3.builder.HashCodeBuilder;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
.ChecksumType;
|
||||||
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java class that represents Checksum ProtoBuf class. This helper class allows
|
||||||
|
* us to convert to and from protobuf to normal java.
|
||||||
|
*/
|
||||||
|
public class ChecksumData {
|
||||||
|
|
||||||
|
private ChecksumType type;
|
||||||
|
// Checksum will be computed for every bytesPerChecksum number of bytes and
|
||||||
|
// stored sequentially in checksumList
|
||||||
|
private int bytesPerChecksum;
|
||||||
|
private List<ByteString> checksums;
|
||||||
|
|
||||||
|
public ChecksumData(ChecksumType checksumType, int bytesPerChecksum) {
|
||||||
|
this.type = checksumType;
|
||||||
|
this.bytesPerChecksum = bytesPerChecksum;
|
||||||
|
this.checksums = Lists.newArrayList();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter method for checksumType.
|
||||||
|
*/
|
||||||
|
public ChecksumType getChecksumType() {
|
||||||
|
return this.type;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter method for bytesPerChecksum.
|
||||||
|
*/
|
||||||
|
public int getBytesPerChecksum() {
|
||||||
|
return this.bytesPerChecksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Getter method for checksums.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<ByteString> getChecksums() {
|
||||||
|
return this.checksums;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setter method for checksums.
|
||||||
|
* @param checksumList list of checksums
|
||||||
|
*/
|
||||||
|
public void setChecksums(List<ByteString> checksumList) {
|
||||||
|
this.checksums.clear();
|
||||||
|
this.checksums.addAll(checksumList);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct the Checksum ProtoBuf message.
|
||||||
|
* @return Checksum ProtoBuf message
|
||||||
|
*/
|
||||||
|
public ContainerProtos.ChecksumData getProtoBufMessage() {
|
||||||
|
ContainerProtos.ChecksumData.Builder checksumProtoBuilder =
|
||||||
|
ContainerProtos.ChecksumData.newBuilder()
|
||||||
|
.setType(this.type)
|
||||||
|
.setBytesPerChecksum(this.bytesPerChecksum);
|
||||||
|
|
||||||
|
checksumProtoBuilder.addAllChecksums(checksums);
|
||||||
|
|
||||||
|
return checksumProtoBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs Checksum class object from the Checksum ProtoBuf message.
|
||||||
|
* @param checksumDataProto Checksum ProtoBuf message
|
||||||
|
* @return ChecksumData object representing the proto
|
||||||
|
*/
|
||||||
|
public static ChecksumData getFromProtoBuf(
|
||||||
|
ContainerProtos.ChecksumData checksumDataProto) {
|
||||||
|
Preconditions.checkNotNull(checksumDataProto);
|
||||||
|
|
||||||
|
ChecksumData checksumData = new ChecksumData(
|
||||||
|
checksumDataProto.getType(), checksumDataProto.getBytesPerChecksum());
|
||||||
|
|
||||||
|
if (checksumDataProto.getChecksumsCount() != 0) {
|
||||||
|
checksumData.setChecksums(checksumDataProto.getChecksumsList());
|
||||||
|
}
|
||||||
|
|
||||||
|
return checksumData;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that this ChecksumData matches with the input ChecksumData.
|
||||||
|
* @param that the ChecksumData to match with
|
||||||
|
* @return true if checksums match
|
||||||
|
* @throws OzoneChecksumException
|
||||||
|
*/
|
||||||
|
public boolean verifyChecksumDataMatches(ChecksumData that) throws
|
||||||
|
OzoneChecksumException {
|
||||||
|
|
||||||
|
// pre checks
|
||||||
|
if (this.checksums.size() == 0) {
|
||||||
|
throw new OzoneChecksumException("Original checksumData has no " +
|
||||||
|
"checksums");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (that.checksums.size() == 0) {
|
||||||
|
throw new OzoneChecksumException("Computed checksumData has no " +
|
||||||
|
"checksums");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.checksums.size() != that.checksums.size()) {
|
||||||
|
throw new OzoneChecksumException("Original and Computed checksumData's " +
|
||||||
|
"has different number of checksums");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that checksum matches at each index
|
||||||
|
for (int index = 0; index < this.checksums.size(); index++) {
|
||||||
|
if (!matchChecksumAtIndex(this.checksums.get(index),
|
||||||
|
that.checksums.get(index))) {
|
||||||
|
// checksum mismatch. throw exception.
|
||||||
|
throw new OzoneChecksumException(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean matchChecksumAtIndex(
|
||||||
|
ByteString expectedChecksumAtIndex, ByteString computedChecksumAtIndex) {
|
||||||
|
return expectedChecksumAtIndex.equals(computedChecksumAtIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (!(obj instanceof ChecksumData)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ChecksumData that = (ChecksumData) obj;
|
||||||
|
|
||||||
|
if (!this.type.equals(that.getChecksumType())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (this.bytesPerChecksum != that.getBytesPerChecksum()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (this.checksums.size() != that.checksums.size()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Match checksum at each index
|
||||||
|
for (int index = 0; index < this.checksums.size(); index++) {
|
||||||
|
if (!matchChecksumAtIndex(this.checksums.get(index),
|
||||||
|
that.checksums.get(index))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
HashCodeBuilder hc = new HashCodeBuilder();
|
||||||
|
hc.append(type);
|
||||||
|
hc.append(bytesPerChecksum);
|
||||||
|
hc.append(checksums.toArray());
|
||||||
|
return hc.toHashCode();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,66 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.common;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
|
||||||
|
/** Thrown for checksum errors. */
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class OzoneChecksumException extends IOException {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OzoneChecksumException to throw when checksum verfication fails.
|
||||||
|
* @param index checksum list index at which checksum match failed
|
||||||
|
*/
|
||||||
|
public OzoneChecksumException(int index) {
|
||||||
|
super(String.format("Checksum mismatch at index %d", index));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OzoneChecksumException to throw when unrecognized checksumType is given.
|
||||||
|
* @param unrecognizedChecksumType
|
||||||
|
*/
|
||||||
|
public OzoneChecksumException(
|
||||||
|
ContainerProtos.ChecksumType unrecognizedChecksumType) {
|
||||||
|
super(String.format("Unrecognized ChecksumType: %s",
|
||||||
|
unrecognizedChecksumType));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OzoneChecksumException to wrap around NoSuchAlgorithmException.
|
||||||
|
* @param algorithm name of algorithm
|
||||||
|
* @param ex original exception thrown
|
||||||
|
*/
|
||||||
|
public OzoneChecksumException(
|
||||||
|
String algorithm, NoSuchAlgorithmException ex) {
|
||||||
|
super(String.format("NoSuchAlgorithmException thrown while computing " +
|
||||||
|
"SHA-256 checksum using algorithm %s", algorithm), ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OzoneChecksumException to throw with custom message.
|
||||||
|
*/
|
||||||
|
public OzoneChecksumException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
}
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import org.apache.hadoop.ozone.common.ChecksumData;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java class that represents ChunkInfo ProtoBuf class. This helper class allows
|
* Java class that represents ChunkInfo ProtoBuf class. This helper class allows
|
||||||
|
@ -33,7 +34,7 @@ public class ChunkInfo {
|
||||||
private final String chunkName;
|
private final String chunkName;
|
||||||
private final long offset;
|
private final long offset;
|
||||||
private final long len;
|
private final long len;
|
||||||
private String checksum;
|
private ChecksumData checksumData;
|
||||||
private final Map<String, String> metadata;
|
private final Map<String, String> metadata;
|
||||||
|
|
||||||
|
|
||||||
|
@ -86,10 +87,9 @@ public class ChunkInfo {
|
||||||
info.getMetadata(x).getValue());
|
info.getMetadata(x).getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chunkInfo.setChecksumData(
|
||||||
|
ChecksumData.getFromProtoBuf(info.getChecksumData()));
|
||||||
|
|
||||||
if (info.hasChecksum()) {
|
|
||||||
chunkInfo.setChecksum(info.getChecksum());
|
|
||||||
}
|
|
||||||
return chunkInfo;
|
return chunkInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,9 +105,7 @@ public class ChunkInfo {
|
||||||
builder.setChunkName(this.getChunkName());
|
builder.setChunkName(this.getChunkName());
|
||||||
builder.setOffset(this.getOffset());
|
builder.setOffset(this.getOffset());
|
||||||
builder.setLen(this.getLen());
|
builder.setLen(this.getLen());
|
||||||
if (this.getChecksum() != null && !this.getChecksum().isEmpty()) {
|
builder.setChecksumData(this.checksumData.getProtoBufMessage());
|
||||||
builder.setChecksum(this.getChecksum());
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
for (Map.Entry<String, String> entry : metadata.entrySet()) {
|
||||||
ContainerProtos.KeyValue.Builder keyValBuilder =
|
ContainerProtos.KeyValue.Builder keyValBuilder =
|
||||||
|
@ -147,21 +145,17 @@ public class ChunkInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the SHA256 value of this chunk.
|
* Returns the checksumData of this chunk.
|
||||||
*
|
|
||||||
* @return - Hash String
|
|
||||||
*/
|
*/
|
||||||
public String getChecksum() {
|
public ChecksumData getChecksumData() {
|
||||||
return checksum;
|
return checksumData;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the Hash value of this chunk.
|
* Sets the checksums of this chunk.
|
||||||
*
|
|
||||||
* @param checksum - Hash String.
|
|
||||||
*/
|
*/
|
||||||
public void setChecksum(String checksum) {
|
public void setChecksumData(ChecksumData cData) {
|
||||||
this.checksum = checksum;
|
this.checksumData = cData;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -355,8 +355,22 @@ message ChunkInfo {
|
||||||
required string chunkName = 1;
|
required string chunkName = 1;
|
||||||
required uint64 offset = 2;
|
required uint64 offset = 2;
|
||||||
required uint64 len = 3;
|
required uint64 len = 3;
|
||||||
optional string checksum = 4;
|
repeated KeyValue metadata = 4;
|
||||||
repeated KeyValue metadata = 5;
|
required ChecksumData checksumData =5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ChecksumData {
|
||||||
|
required ChecksumType type = 1;
|
||||||
|
required uint32 bytesPerChecksum = 2;
|
||||||
|
repeated bytes checksums = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ChecksumType {
|
||||||
|
NONE = 1;
|
||||||
|
CRC32 = 2;
|
||||||
|
CRC32C = 3;
|
||||||
|
SHA256 = 4;
|
||||||
|
MD5 = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum Stage {
|
enum Stage {
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.common;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link Checksum} class.
|
||||||
|
*/
|
||||||
|
public class TestChecksum {
|
||||||
|
|
||||||
|
private static final int BYTES_PER_CHECKSUM = 10;
|
||||||
|
private static final ContainerProtos.ChecksumType CHECKSUM_TYPE_DEFAULT =
|
||||||
|
ContainerProtos.ChecksumType.SHA256;
|
||||||
|
|
||||||
|
private Checksum getChecksum(ContainerProtos.ChecksumType type) {
|
||||||
|
if (type == null) {
|
||||||
|
type = CHECKSUM_TYPE_DEFAULT;
|
||||||
|
}
|
||||||
|
return new Checksum(type, BYTES_PER_CHECKSUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests {@link Checksum#verifyChecksum(byte[], ChecksumData)}.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testVerifyChecksum() throws Exception {
|
||||||
|
Checksum checksum = getChecksum(null);
|
||||||
|
int dataLen = 55;
|
||||||
|
byte[] data = RandomStringUtils.randomAlphabetic(dataLen).getBytes();
|
||||||
|
|
||||||
|
ChecksumData checksumData = checksum.computeChecksum(data);
|
||||||
|
|
||||||
|
// A checksum is calculate for each bytesPerChecksum number of bytes in
|
||||||
|
// the data. Since that value is 10 here and the data length is 55, we
|
||||||
|
// should have 6 checksums in checksumData.
|
||||||
|
Assert.assertEquals(6, checksumData.getChecksums().size());
|
||||||
|
|
||||||
|
// Checksum verification should pass
|
||||||
|
Assert.assertTrue("Checksum mismatch",
|
||||||
|
Checksum.verifyChecksum(data, checksumData));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that if data is modified, then the checksums should not match.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testIncorrectChecksum() throws Exception {
|
||||||
|
Checksum checksum = getChecksum(null);
|
||||||
|
byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes();
|
||||||
|
ChecksumData originalChecksumData = checksum.computeChecksum(data);
|
||||||
|
|
||||||
|
// Change the data and check if new checksum matches the original checksum.
|
||||||
|
// Modifying one byte of data should be enough for the checksum data to
|
||||||
|
// mismatch
|
||||||
|
data[50] = (byte) (data[50]+1);
|
||||||
|
ChecksumData newChecksumData = checksum.computeChecksum(data);
|
||||||
|
Assert.assertNotEquals("Checksums should not match for different data",
|
||||||
|
originalChecksumData, newChecksumData);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that checksum calculated using two different checksumTypes should
|
||||||
|
* not match.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChecksumMismatchForDifferentChecksumTypes() throws Exception {
|
||||||
|
byte[] data = RandomStringUtils.randomAlphabetic(55).getBytes();
|
||||||
|
|
||||||
|
// Checksum1 of type SHA-256
|
||||||
|
Checksum checksum1 = getChecksum(null);
|
||||||
|
ChecksumData checksumData1 = checksum1.computeChecksum(data);
|
||||||
|
|
||||||
|
// Checksum2 of type CRC32
|
||||||
|
Checksum checksum2 = getChecksum(ContainerProtos.ChecksumType.CRC32);
|
||||||
|
ChecksumData checksumData2 = checksum2.computeChecksum(data);
|
||||||
|
|
||||||
|
// The two checksums should not match as they have different types
|
||||||
|
Assert.assertNotEquals(
|
||||||
|
"Checksums should not match for different checksum types",
|
||||||
|
checksum1, checksum2);
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,8 +19,6 @@
|
||||||
package org.apache.hadoop.ozone.container.keyvalue.helpers;
|
package org.apache.hadoop.ozone.container.keyvalue.helpers;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.codec.binary.Hex;
|
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerCommandRequestProto;
|
.ContainerCommandRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
@ -47,7 +45,6 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.AsynchronousFileChannel;
|
import java.nio.channels.AsynchronousFileChannel;
|
||||||
import java.nio.channels.FileLock;
|
import java.nio.channels.FileLock;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.security.MessageDigest;
|
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
@ -90,11 +87,6 @@ public final class ChunkUtils {
|
||||||
FileLock lock = null;
|
FileLock lock = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (chunkInfo.getChecksum() != null &&
|
|
||||||
!chunkInfo.getChecksum().isEmpty()) {
|
|
||||||
verifyChecksum(chunkInfo, data, log);
|
|
||||||
}
|
|
||||||
|
|
||||||
long writeTimeStart = Time.monotonicNow();
|
long writeTimeStart = Time.monotonicNow();
|
||||||
file =
|
file =
|
||||||
AsynchronousFileChannel.open(chunkFile.toPath(),
|
AsynchronousFileChannel.open(chunkFile.toPath(),
|
||||||
|
@ -154,10 +146,8 @@ public final class ChunkUtils {
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
|
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
|
||||||
VolumeIOStats volumeIOStats)
|
VolumeIOStats volumeIOStats) throws StorageContainerException,
|
||||||
throws
|
ExecutionException, InterruptedException {
|
||||||
StorageContainerException, ExecutionException, InterruptedException,
|
|
||||||
NoSuchAlgorithmException {
|
|
||||||
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
|
||||||
if (!chunkFile.exists()) {
|
if (!chunkFile.exists()) {
|
||||||
|
@ -184,10 +174,7 @@ public final class ChunkUtils {
|
||||||
volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
|
volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
|
||||||
volumeIOStats.incReadOpCount();
|
volumeIOStats.incReadOpCount();
|
||||||
volumeIOStats.incReadBytes(data.getLen());
|
volumeIOStats.incReadBytes(data.getLen());
|
||||||
if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
|
|
||||||
buf.rewind();
|
|
||||||
verifyChecksum(data, buf, log);
|
|
||||||
}
|
|
||||||
return buf;
|
return buf;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new StorageContainerException(e, IO_EXCEPTION);
|
throw new StorageContainerException(e, IO_EXCEPTION);
|
||||||
|
@ -205,30 +192,6 @@ public final class ChunkUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Verifies the checksum of a chunk against the data buffer.
|
|
||||||
*
|
|
||||||
* @param chunkInfo - Chunk Info.
|
|
||||||
* @param data - data buffer
|
|
||||||
* @param log - log
|
|
||||||
* @throws NoSuchAlgorithmException
|
|
||||||
* @throws StorageContainerException
|
|
||||||
*/
|
|
||||||
private static void verifyChecksum(ChunkInfo chunkInfo, ByteBuffer data,
|
|
||||||
Logger log) throws NoSuchAlgorithmException, StorageContainerException {
|
|
||||||
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
|
||||||
sha.update(data);
|
|
||||||
data.rewind();
|
|
||||||
if (!Hex.encodeHexString(sha.digest()).equals(
|
|
||||||
chunkInfo.getChecksum())) {
|
|
||||||
log.error("Checksum mismatch. Provided: {} , computed: {}",
|
|
||||||
chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
|
|
||||||
throw new StorageContainerException("Checksum mismatch. Provided: " +
|
|
||||||
chunkInfo.getChecksum() + " , computed: " +
|
|
||||||
DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validates chunk data and returns a file object to Chunk File that we are
|
* Validates chunk data and returns a file object to Chunk File that we are
|
||||||
* expected to write data to.
|
* expected to write data to.
|
||||||
|
|
|
@ -200,10 +200,6 @@ public class ChunkManagerImpl implements ChunkManager {
|
||||||
containerData.incrReadBytes(length);
|
containerData.incrReadBytes(length);
|
||||||
return data.array();
|
return data.array();
|
||||||
}
|
}
|
||||||
} catch(NoSuchAlgorithmException ex) {
|
|
||||||
LOG.error("read data failed. error: {}", ex);
|
|
||||||
throw new StorageContainerException("Internal error: ",
|
|
||||||
ex, NO_SUCH_ALGORITHM);
|
|
||||||
} catch (ExecutionException ex) {
|
} catch (ExecutionException ex) {
|
||||||
LOG.error("read data failed. error: {}", ex);
|
LOG.error("read data failed. error: {}", ex);
|
||||||
throw new StorageContainerException("Internal error: ",
|
throw new StorageContainerException("Internal error: ",
|
||||||
|
|
|
@ -221,22 +221,6 @@ public class TestChunkManagerImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWriteChunkChecksumMismatch() throws Exception {
|
|
||||||
try {
|
|
||||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
|
||||||
.getLocalID(), 0), 0, data.length);
|
|
||||||
//Setting checksum to some value.
|
|
||||||
chunkInfo.setChecksum("some garbage");
|
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
|
||||||
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
|
|
||||||
fail("testWriteChunkChecksumMismatch failed");
|
|
||||||
} catch (StorageContainerException ex) {
|
|
||||||
GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
|
|
||||||
assertEquals(ContainerProtos.Result.CHECKSUM_MISMATCH, ex.getResult());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadChunkFileNotExists() throws Exception {
|
public void testReadChunkFileNotExists() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
|
||||||
import org.apache.hadoop.hdds.client.BlockID;
|
import org.apache.hadoop.hdds.client.BlockID;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
|
||||||
|
import org.apache.hadoop.ozone.common.Checksum;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||||
|
@ -83,6 +84,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
private final long watchTimeout;
|
private final long watchTimeout;
|
||||||
private final long blockSize;
|
private final long blockSize;
|
||||||
private ByteBuffer buffer;
|
private ByteBuffer buffer;
|
||||||
|
private final Checksum checksum;
|
||||||
/**
|
/**
|
||||||
* A constructor for testing purpose only.
|
* A constructor for testing purpose only.
|
||||||
*/
|
*/
|
||||||
|
@ -102,6 +104,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
buffer = ByteBuffer.allocate(1);
|
buffer = ByteBuffer.allocate(1);
|
||||||
watchTimeout = 0;
|
watchTimeout = 0;
|
||||||
blockSize = 0;
|
blockSize = 0;
|
||||||
|
this.checksum = new Checksum();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -113,7 +116,8 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void addStream(OutputStream outputStream, long length) {
|
public void addStream(OutputStream outputStream, long length) {
|
||||||
streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
|
streamEntries.add(
|
||||||
|
new ChunkOutputStreamEntry(outputStream, length, checksum));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -145,7 +149,8 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
|
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
|
||||||
OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
|
OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize,
|
||||||
String requestId, ReplicationFactor factor, ReplicationType type,
|
String requestId, ReplicationFactor factor, ReplicationType type,
|
||||||
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout) {
|
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
|
||||||
|
Checksum checksum) {
|
||||||
this.streamEntries = new ArrayList<>();
|
this.streamEntries = new ArrayList<>();
|
||||||
this.currentStreamIndex = 0;
|
this.currentStreamIndex = 0;
|
||||||
this.omClient = omClient;
|
this.omClient = omClient;
|
||||||
|
@ -163,6 +168,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
this.streamBufferMaxSize = bufferMaxSize;
|
this.streamBufferMaxSize = bufferMaxSize;
|
||||||
this.blockSize = size;
|
this.blockSize = size;
|
||||||
this.watchTimeout = watchTimeout;
|
this.watchTimeout = watchTimeout;
|
||||||
|
this.checksum = checksum;
|
||||||
|
|
||||||
Preconditions.checkState(chunkSize > 0);
|
Preconditions.checkState(chunkSize > 0);
|
||||||
Preconditions.checkState(streamBufferFlushSize > 0);
|
Preconditions.checkState(streamBufferFlushSize > 0);
|
||||||
|
@ -216,7 +222,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
|
streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
|
||||||
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
|
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
|
||||||
chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
|
chunkSize, subKeyInfo.getLength(), streamBufferFlushSize,
|
||||||
streamBufferMaxSize, watchTimeout, buffer));
|
streamBufferMaxSize, watchTimeout, buffer, checksum));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -534,6 +540,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
private long streamBufferMaxSize;
|
private long streamBufferMaxSize;
|
||||||
private long blockSize;
|
private long blockSize;
|
||||||
private long watchTimeout;
|
private long watchTimeout;
|
||||||
|
private Checksum checksum;
|
||||||
|
|
||||||
public Builder setHandler(OpenKeySession handler) {
|
public Builder setHandler(OpenKeySession handler) {
|
||||||
this.openHandler = handler;
|
this.openHandler = handler;
|
||||||
|
@ -597,10 +604,15 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setChecksum(Checksum checksumObj){
|
||||||
|
this.checksum = checksumObj;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public ChunkGroupOutputStream build() throws IOException {
|
public ChunkGroupOutputStream build() throws IOException {
|
||||||
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
|
return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient,
|
||||||
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
|
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
|
||||||
streamBufferMaxSize, blockSize, watchTimeout);
|
streamBufferMaxSize, blockSize, watchTimeout, checksum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -610,6 +622,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
private final String key;
|
private final String key;
|
||||||
private final XceiverClientManager xceiverClientManager;
|
private final XceiverClientManager xceiverClientManager;
|
||||||
private final XceiverClientSpi xceiverClient;
|
private final XceiverClientSpi xceiverClient;
|
||||||
|
private final Checksum checksum;
|
||||||
private final String requestId;
|
private final String requestId;
|
||||||
private final int chunkSize;
|
private final int chunkSize;
|
||||||
// total number of bytes that should be written to this stream
|
// total number of bytes that should be written to this stream
|
||||||
|
@ -626,7 +639,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
XceiverClientManager xceiverClientManager,
|
XceiverClientManager xceiverClientManager,
|
||||||
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
|
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
|
||||||
long length, long streamBufferFlushSize, long streamBufferMaxSize,
|
long length, long streamBufferFlushSize, long streamBufferMaxSize,
|
||||||
long watchTimeout, ByteBuffer buffer) {
|
long watchTimeout, ByteBuffer buffer, Checksum checksum) {
|
||||||
this.outputStream = null;
|
this.outputStream = null;
|
||||||
this.blockID = blockID;
|
this.blockID = blockID;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
|
@ -641,6 +654,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
this.streamBufferMaxSize = streamBufferMaxSize;
|
this.streamBufferMaxSize = streamBufferMaxSize;
|
||||||
this.watchTimeout = watchTimeout;
|
this.watchTimeout = watchTimeout;
|
||||||
this.buffer = buffer;
|
this.buffer = buffer;
|
||||||
|
this.checksum = checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -648,7 +662,8 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
* @param outputStream a existing writable output stream
|
* @param outputStream a existing writable output stream
|
||||||
* @param length the length of data to write to the stream
|
* @param length the length of data to write to the stream
|
||||||
*/
|
*/
|
||||||
ChunkOutputStreamEntry(OutputStream outputStream, long length) {
|
ChunkOutputStreamEntry(OutputStream outputStream, long length,
|
||||||
|
Checksum checksum) {
|
||||||
this.outputStream = outputStream;
|
this.outputStream = outputStream;
|
||||||
this.blockID = null;
|
this.blockID = null;
|
||||||
this.key = null;
|
this.key = null;
|
||||||
|
@ -663,6 +678,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
streamBufferMaxSize = 0;
|
streamBufferMaxSize = 0;
|
||||||
buffer = null;
|
buffer = null;
|
||||||
watchTimeout = 0;
|
watchTimeout = 0;
|
||||||
|
this.checksum = checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
long getLength() {
|
long getLength() {
|
||||||
|
@ -678,7 +694,7 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
this.outputStream =
|
this.outputStream =
|
||||||
new ChunkOutputStream(blockID, key, xceiverClientManager,
|
new ChunkOutputStream(blockID, key, xceiverClientManager,
|
||||||
xceiverClient, requestId, chunkSize, streamBufferFlushSize,
|
xceiverClient, requestId, chunkSize, streamBufferFlushSize,
|
||||||
streamBufferMaxSize, watchTimeout, buffer);
|
streamBufferMaxSize, watchTimeout, buffer, checksum);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.StorageUnit;
|
import org.apache.hadoop.conf.StorageUnit;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
.ChecksumType;
|
||||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
|
@ -42,6 +44,7 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.ozone.common.Checksum;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||||
|
@ -92,6 +95,7 @@ public class RpcClient implements ClientProtocol {
|
||||||
ozoneManagerClient;
|
ozoneManagerClient;
|
||||||
private final XceiverClientManager xceiverClientManager;
|
private final XceiverClientManager xceiverClientManager;
|
||||||
private final int chunkSize;
|
private final int chunkSize;
|
||||||
|
private final Checksum checksum;
|
||||||
private final UserGroupInformation ugi;
|
private final UserGroupInformation ugi;
|
||||||
private final OzoneAcl.OzoneACLRights userRights;
|
private final OzoneAcl.OzoneACLRights userRights;
|
||||||
private final OzoneAcl.OzoneACLRights groupRights;
|
private final OzoneAcl.OzoneACLRights groupRights;
|
||||||
|
@ -166,6 +170,26 @@ public class RpcClient implements ClientProtocol {
|
||||||
conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
|
conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
|
||||||
OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
|
OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
int configuredChecksumSize = conf.getInt(
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT);
|
||||||
|
int checksumSize;
|
||||||
|
if(configuredChecksumSize <
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
|
||||||
|
LOG.warn("The checksum size ({}) is not allowed to be less than the " +
|
||||||
|
"minimum size ({}), resetting to the minimum size.",
|
||||||
|
configuredChecksumSize,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
|
||||||
|
checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
|
||||||
|
} else {
|
||||||
|
checksumSize = configuredChecksumSize;
|
||||||
|
}
|
||||||
|
String checksumTypeStr = conf.get(
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
||||||
|
ChecksumType checksumType = ChecksumType.valueOf(checksumTypeStr);
|
||||||
|
this.checksum = new Checksum(checksumType, checksumSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private InetSocketAddress getScmAddressForClient() throws IOException {
|
private InetSocketAddress getScmAddressForClient() throws IOException {
|
||||||
|
@ -489,6 +513,7 @@ public class RpcClient implements ClientProtocol {
|
||||||
.setStreamBufferMaxSize(streamBufferMaxSize)
|
.setStreamBufferMaxSize(streamBufferMaxSize)
|
||||||
.setWatchTimeout(watchTimeout)
|
.setWatchTimeout(watchTimeout)
|
||||||
.setBlockSize(blockSize)
|
.setBlockSize(blockSize)
|
||||||
|
.setChecksum(checksum)
|
||||||
.build();
|
.build();
|
||||||
groupOutputStream.addPreallocateBlocks(
|
groupOutputStream.addPreallocateBlocks(
|
||||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.client.rpc;
|
package org.apache.hadoop.ozone.client.rpc;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.lang3.RandomStringUtils;
|
import org.apache.commons.lang3.RandomStringUtils;
|
||||||
import org.apache.commons.lang3.RandomUtils;
|
import org.apache.commons.lang3.RandomUtils;
|
||||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||||
|
@ -39,9 +40,13 @@ import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
|
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
|
import org.apache.hadoop.ozone.common.OzoneChecksumException;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueBlockIterator;
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueBlockIterator;
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.helpers
|
||||||
|
.KeyValueContainerLocationUtil;
|
||||||
import org.apache.hadoop.ozone.om.OzoneManager;
|
import org.apache.hadoop.ozone.om.OzoneManager;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||||
|
@ -87,6 +92,8 @@ public class TestOzoneRpcClient {
|
||||||
private static StorageContainerLocationProtocolClientSideTranslatorPB
|
private static StorageContainerLocationProtocolClientSideTranslatorPB
|
||||||
storageContainerLocationClient;
|
storageContainerLocationClient;
|
||||||
|
|
||||||
|
private static final String SCM_ID = UUID.randomUUID().toString();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a MiniOzoneCluster for testing.
|
* Create a MiniOzoneCluster for testing.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -98,7 +105,10 @@ public class TestOzoneRpcClient {
|
||||||
public static void init() throws Exception {
|
public static void init() throws Exception {
|
||||||
OzoneConfiguration conf = new OzoneConfiguration();
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
|
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
|
||||||
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10).build();
|
cluster = MiniOzoneCluster.newBuilder(conf)
|
||||||
|
.setNumDatanodes(10)
|
||||||
|
.setScmId(SCM_ID)
|
||||||
|
.build();
|
||||||
cluster.waitForClusterToBeReady();
|
cluster.waitForClusterToBeReady();
|
||||||
ozClient = OzoneClientFactory.getRpcClient(conf);
|
ozClient = OzoneClientFactory.getRpcClient(conf);
|
||||||
store = ozClient.getObjectStore();
|
store = ozClient.getObjectStore();
|
||||||
|
@ -821,6 +831,92 @@ public class TestOzoneRpcClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests reading a corrputed chunk file throws checksum exception.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testReadKeyWithCorruptedData() throws IOException {
|
||||||
|
String volumeName = UUID.randomUUID().toString();
|
||||||
|
String bucketName = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
String value = "sample value";
|
||||||
|
store.createVolume(volumeName);
|
||||||
|
OzoneVolume volume = store.getVolume(volumeName);
|
||||||
|
volume.createBucket(bucketName);
|
||||||
|
OzoneBucket bucket = volume.getBucket(bucketName);
|
||||||
|
String keyName = UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
// Write data into a key
|
||||||
|
OzoneOutputStream out = bucket.createKey(keyName,
|
||||||
|
value.getBytes().length, ReplicationType.STAND_ALONE,
|
||||||
|
ReplicationFactor.ONE);
|
||||||
|
out.write(value.getBytes());
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// We need to find the location of the chunk file corresponding to the
|
||||||
|
// data we just wrote.
|
||||||
|
OzoneKey key = bucket.getKey(keyName);
|
||||||
|
long containerID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
|
||||||
|
.getContainerID();
|
||||||
|
long localID = ((OzoneKeyDetails) key).getOzoneKeyLocations().get(0)
|
||||||
|
.getLocalID();
|
||||||
|
|
||||||
|
// Get the container by traversing the datanodes. Atleast one of the
|
||||||
|
// datanode must have this container.
|
||||||
|
Container container = null;
|
||||||
|
for (HddsDatanodeService hddsDatanode : cluster.getHddsDatanodes()) {
|
||||||
|
container = hddsDatanode.getDatanodeStateMachine().getContainer()
|
||||||
|
.getContainerSet().getContainer(containerID);
|
||||||
|
if (container != null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertNotNull("Container not found", container);
|
||||||
|
|
||||||
|
// From the containerData, get the block iterator for all the blocks in
|
||||||
|
// the container.
|
||||||
|
KeyValueContainerData containerData =
|
||||||
|
(KeyValueContainerData) container.getContainerData();
|
||||||
|
String containerPath = new File(containerData.getMetadataPath())
|
||||||
|
.getParent();
|
||||||
|
KeyValueBlockIterator keyValueBlockIterator = new KeyValueBlockIterator(
|
||||||
|
containerID, new File(containerPath));
|
||||||
|
|
||||||
|
// Find the block corresponding to the key we put. We use the localID of
|
||||||
|
// the BlockData to identify out key.
|
||||||
|
BlockData blockData = null;
|
||||||
|
while (keyValueBlockIterator.hasNext()) {
|
||||||
|
blockData = keyValueBlockIterator.nextBlock();
|
||||||
|
if (blockData.getBlockID().getLocalID() == localID) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertNotNull("Block not found", blockData);
|
||||||
|
|
||||||
|
// Get the location of the chunk file
|
||||||
|
String chunkName = blockData.getChunks().get(0).getChunkName();
|
||||||
|
String containreBaseDir = container.getContainerData().getVolume()
|
||||||
|
.getHddsRootDir().getPath();
|
||||||
|
File chunksLocationPath = KeyValueContainerLocationUtil
|
||||||
|
.getChunksLocationPath(containreBaseDir, SCM_ID, containerID);
|
||||||
|
File chunkFile = new File(chunksLocationPath, chunkName);
|
||||||
|
|
||||||
|
// Corrupt the contents of the chunk file
|
||||||
|
String newData = new String("corrupted data");
|
||||||
|
FileUtils.writeByteArrayToFile(chunkFile, newData.getBytes());
|
||||||
|
|
||||||
|
// Try reading the key. Since the chunk file is corrupted, it should
|
||||||
|
// throw a checksum mismatch exception.
|
||||||
|
try {
|
||||||
|
OzoneInputStream is = bucket.readKey(keyName);
|
||||||
|
is.read(new byte[100]);
|
||||||
|
Assert.fail("Reading corrupted data should fail.");
|
||||||
|
} catch (OzoneChecksumException e) {
|
||||||
|
GenericTestUtils.assertExceptionContains("Checksum mismatch", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteKey()
|
public void testDeleteKey()
|
||||||
throws IOException, OzoneException {
|
throws IOException, OzoneException {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.ozone.container;
|
package org.apache.hadoop.ozone.container;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import java.security.MessageDigest;
|
||||||
import org.apache.hadoop.conf.StorageUnit;
|
import org.apache.hadoop.conf.StorageUnit;
|
||||||
import org.apache.hadoop.hdds.HddsUtils;
|
import org.apache.hadoop.hdds.HddsUtils;
|
||||||
import org.apache.hadoop.hdds.client.ReplicationType;
|
import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
|
@ -26,13 +27,15 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||||
import org.apache.hadoop.ozone.HddsDatanodeService;
|
import org.apache.hadoop.ozone.HddsDatanodeService;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
|
import org.apache.hadoop.ozone.common.Checksum;
|
||||||
|
import org.apache.hadoop.ozone.common.OzoneChecksumException;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.commons.codec.binary.Hex;
|
|
||||||
import org.apache.hadoop.hdds.client.BlockID;
|
import org.apache.hadoop.hdds.client.BlockID;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
@ -42,7 +45,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerCommandResponseProto;
|
.ContainerCommandResponseProto;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||||
|
@ -52,7 +54,6 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
import java.security.MessageDigest;
|
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
|
@ -180,10 +181,9 @@ public final class ContainerTestHelper {
|
||||||
* @throws NoSuchAlgorithmException
|
* @throws NoSuchAlgorithmException
|
||||||
*/
|
*/
|
||||||
public static void setDataChecksum(ChunkInfo info, byte[] data)
|
public static void setDataChecksum(ChunkInfo info, byte[] data)
|
||||||
throws NoSuchAlgorithmException {
|
throws OzoneChecksumException {
|
||||||
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
Checksum checksum = new Checksum();
|
||||||
sha.update(data);
|
info.setChecksumData(checksum.computeChecksum(data));
|
||||||
info.setChecksum(Hex.encodeHexString(sha.digest()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -197,8 +197,7 @@ public final class ContainerTestHelper {
|
||||||
* @throws NoSuchAlgorithmException
|
* @throws NoSuchAlgorithmException
|
||||||
*/
|
*/
|
||||||
public static ContainerCommandRequestProto getWriteChunkRequest(
|
public static ContainerCommandRequestProto getWriteChunkRequest(
|
||||||
Pipeline pipeline, BlockID blockID, int datalen)
|
Pipeline pipeline, BlockID blockID, int datalen) throws IOException {
|
||||||
throws IOException, NoSuchAlgorithmException {
|
|
||||||
LOG.trace("writeChunk {} (blockID={}) to pipeline=",
|
LOG.trace("writeChunk {} (blockID={}) to pipeline=",
|
||||||
datalen, blockID, pipeline);
|
datalen, blockID, pipeline);
|
||||||
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
|
ContainerProtos.WriteChunkRequestProto.Builder writeRequest =
|
||||||
|
|
|
@ -140,7 +140,8 @@ public class TestBlockDeletingService {
|
||||||
.setChunkName(chunk.getAbsolutePath())
|
.setChunkName(chunk.getAbsolutePath())
|
||||||
.setLen(0)
|
.setLen(0)
|
||||||
.setOffset(0)
|
.setOffset(0)
|
||||||
.setChecksum("")
|
.setChecksumData(
|
||||||
|
ContainerProtos.ChecksumData.getDefaultInstance())
|
||||||
.build();
|
.build();
|
||||||
chunks.add(info);
|
chunks.add(info);
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.common.Checksum;
|
||||||
|
import org.apache.hadoop.ozone.common.ChecksumData;
|
||||||
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||||
|
@ -320,8 +322,7 @@ public class TestContainerPersistence {
|
||||||
Assert.assertTrue(testMap.isEmpty());
|
Assert.assertTrue(testMap.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ChunkInfo writeChunkHelper(BlockID blockID)
|
private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException {
|
||||||
throws IOException, NoSuchAlgorithmException {
|
|
||||||
final int datalen = 1024;
|
final int datalen = 1024;
|
||||||
long testContainerID = blockID.getContainerID();
|
long testContainerID = blockID.getContainerID();
|
||||||
Container container = containerSet.getContainer(testContainerID);
|
Container container = containerSet.getContainer(testContainerID);
|
||||||
|
@ -360,8 +361,7 @@ public class TestContainerPersistence {
|
||||||
* @throws NoSuchAlgorithmException
|
* @throws NoSuchAlgorithmException
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testWritReadManyChunks() throws IOException,
|
public void testWritReadManyChunks() throws IOException {
|
||||||
NoSuchAlgorithmException {
|
|
||||||
final int datalen = 1024;
|
final int datalen = 1024;
|
||||||
final int chunkCount = 1024;
|
final int chunkCount = 1024;
|
||||||
|
|
||||||
|
@ -386,32 +386,29 @@ public class TestContainerPersistence {
|
||||||
Path dataDir = Paths.get(cNewData.getChunksPath());
|
Path dataDir = Paths.get(cNewData.getChunksPath());
|
||||||
|
|
||||||
String globFormat = String.format("%s.data.*", blockID.getLocalID());
|
String globFormat = String.format("%s.data.*", blockID.getLocalID());
|
||||||
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
|
||||||
|
|
||||||
// Read chunk via file system and verify.
|
// Read chunk via file system and verify.
|
||||||
int count = 0;
|
int count = 0;
|
||||||
try (DirectoryStream<Path> stream =
|
try (DirectoryStream<Path> stream =
|
||||||
Files.newDirectoryStream(dataDir, globFormat)) {
|
Files.newDirectoryStream(dataDir, globFormat)) {
|
||||||
|
Checksum checksum = new Checksum();
|
||||||
|
|
||||||
for (Path fname : stream) {
|
for (Path fname : stream) {
|
||||||
sha.update(FileUtils.readFileToByteArray(fname.toFile()));
|
ChecksumData checksumData = checksum
|
||||||
String val = Hex.encodeHexString(sha.digest());
|
.computeChecksum(FileUtils.readFileToByteArray(fname.toFile()));
|
||||||
Assert.assertEquals(fileHashMap.get(fname.getFileName().toString())
|
Assert.assertEquals(fileHashMap.get(fname.getFileName().toString())
|
||||||
.getChecksum(), val);
|
.getChecksumData(), checksumData);
|
||||||
count++;
|
count++;
|
||||||
sha.reset();
|
|
||||||
}
|
}
|
||||||
Assert.assertEquals(chunkCount, count);
|
Assert.assertEquals(chunkCount, count);
|
||||||
|
|
||||||
// Read chunk via ReadChunk call.
|
// Read chunk via ReadChunk call.
|
||||||
sha.reset();
|
|
||||||
for (int x = 0; x < chunkCount; x++) {
|
for (int x = 0; x < chunkCount; x++) {
|
||||||
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
|
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
|
||||||
ChunkInfo info = fileHashMap.get(fileName);
|
ChunkInfo info = fileHashMap.get(fileName);
|
||||||
byte[] data = chunkManager.readChunk(container, blockID, info);
|
byte[] data = chunkManager.readChunk(container, blockID, info);
|
||||||
sha.update(data);
|
ChecksumData checksumData = checksum.computeChecksum(data);
|
||||||
Assert.assertEquals(Hex.encodeHexString(sha.digest()),
|
Assert.assertEquals(info.getChecksumData(), checksumData);
|
||||||
info.getChecksum());
|
|
||||||
sha.reset();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -571,7 +568,7 @@ public class TestContainerPersistence {
|
||||||
getBlock(container, blockData.getBlockID());
|
getBlock(container, blockData.getBlockID());
|
||||||
ChunkInfo readChunk =
|
ChunkInfo readChunk =
|
||||||
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
|
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
|
||||||
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
|
Assert.assertEquals(info.getChecksumData(), readChunk.getChecksumData());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -629,7 +626,7 @@ public class TestContainerPersistence {
|
||||||
getBlock(container, blockData.getBlockID());
|
getBlock(container, blockData.getBlockID());
|
||||||
ChunkInfo readChunk =
|
ChunkInfo readChunk =
|
||||||
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
|
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(0));
|
||||||
Assert.assertEquals(info.getChecksum(), readChunk.getChecksum());
|
Assert.assertEquals(info.getChecksumData(), readChunk.getChecksumData());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -684,7 +681,8 @@ public class TestContainerPersistence {
|
||||||
ChunkInfo readChunk =
|
ChunkInfo readChunk =
|
||||||
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData
|
ChunkInfo.getFromProtoBuf(readBlockData.getChunks().get(readBlockData
|
||||||
.getChunks().size() - 1));
|
.getChunks().size() - 1));
|
||||||
Assert.assertEquals(lastChunk.getChecksum(), readChunk.getChecksum());
|
Assert.assertEquals(
|
||||||
|
lastChunk.getChecksumData(), readChunk.getChecksumData());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,7 +24,9 @@ import org.apache.hadoop.hdds.client.ReplicationType;
|
||||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.ozone.client.io.LengthInputStream;
|
import org.apache.hadoop.ozone.client.io.LengthInputStream;
|
||||||
|
import org.apache.hadoop.ozone.common.Checksum;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||||
|
@ -84,6 +86,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
private final long streamBufferMaxSize;
|
private final long streamBufferMaxSize;
|
||||||
private final long watchTimeout;
|
private final long watchTimeout;
|
||||||
private final long blockSize;
|
private final long blockSize;
|
||||||
|
private final Checksum checksum;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new DistributedStorageHandler.
|
* Creates a new DistributedStorageHandler.
|
||||||
|
@ -128,6 +131,27 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
|
conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
|
||||||
OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
|
OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
int configuredChecksumSize = conf.getInt(
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT);
|
||||||
|
int checksumSize;
|
||||||
|
if(configuredChecksumSize <
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
|
||||||
|
LOG.warn("The checksum size ({}) is not allowed to be less than the " +
|
||||||
|
"minimum size ({}), resetting to the minimum size.",
|
||||||
|
configuredChecksumSize,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
|
||||||
|
checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
|
||||||
|
} else {
|
||||||
|
checksumSize = configuredChecksumSize;
|
||||||
|
}
|
||||||
|
String checksumTypeStr = conf.get(
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
|
||||||
|
OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
|
||||||
|
ContainerProtos.ChecksumType checksumType = ContainerProtos.ChecksumType
|
||||||
|
.valueOf(checksumTypeStr);
|
||||||
|
this.checksum = new Checksum(checksumType, checksumSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -426,6 +450,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
.setStreamBufferMaxSize(streamBufferMaxSize)
|
.setStreamBufferMaxSize(streamBufferMaxSize)
|
||||||
.setBlockSize(blockSize)
|
.setBlockSize(blockSize)
|
||||||
.setWatchTimeout(watchTimeout)
|
.setWatchTimeout(watchTimeout)
|
||||||
|
.setChecksum(checksum)
|
||||||
.build();
|
.build();
|
||||||
groupOutputStream.addPreallocateBlocks(
|
groupOutputStream.addPreallocateBlocks(
|
||||||
openKey.getKeyInfo().getLatestVersionLocations(),
|
openKey.getKeyInfo().getLatestVersionLocations(),
|
||||||
|
|
Loading…
Reference in New Issue