diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 4c6cd7b1c8c..f3ba6564264 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -29,6 +29,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; @@ -76,7 +77,8 @@ public class BlockOutputStream extends OutputStream { private final BlockData.Builder containerBlockData; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; - private final Checksum checksum; + private final ContainerProtos.ChecksumType checksumType; + private final int bytesPerChecksum; private final String streamId; private int chunkIndex; private int chunkSize; @@ -121,14 +123,16 @@ public class BlockOutputStream extends OutputStream { * @param streamBufferFlushSize flush size * @param streamBufferMaxSize max size of the currentBuffer * @param watchTimeout watch timeout - * @param checksum checksum + * @param checksumType checksum type + * @param bytesPerChecksum Bytes per checksum */ @SuppressWarnings("parameternumber") public BlockOutputStream(BlockID blockID, String key, XceiverClientManager xceiverClientManager, Pipeline pipeline, String traceID, int chunkSize, long streamBufferFlushSize, long streamBufferMaxSize, long watchTimeout, List bufferList, - Checksum checksum) throws IOException { + ChecksumType checksumType, int bytesPerChecksum) + throws IOException { this.blockID = blockID; this.key = key; this.traceID = traceID; @@ -146,7 +150,8 @@ public BlockOutputStream(BlockID blockID, String key, this.streamBufferMaxSize = streamBufferMaxSize; this.watchTimeout = watchTimeout; this.bufferList = bufferList; - this.checksum = checksum; + this.checksumType = checksumType; + this.bytesPerChecksum = bytesPerChecksum; // A single thread executor handle the responses of async requests responseExecutor = Executors.newSingleThreadExecutor(); @@ -585,6 +590,7 @@ private void checkOpen() throws IOException { private void writeChunkToContainer(ByteBuffer chunk) throws IOException { int effectiveChunkSize = chunk.remaining(); ByteString data = ByteString.copyFrom(chunk); + Checksum checksum = new Checksum(checksumType, bytesPerChecksum); ChecksumData checksumData = checksum.computeChecksum(data); ChunkInfo chunkInfo = ChunkInfo.newBuilder() .setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId + diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java index 56ad71b21b3..277753545df 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java @@ -38,6 +38,8 @@ /** * Class to compute and verify checksums for chunks. + * + * This class is not thread safe. */ public class Checksum { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java index 7ba17666dfa..5ae7e8bf7e2 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java @@ -23,11 +23,12 @@ import java.util.List; import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ChecksumType; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier; -import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -41,7 +42,8 @@ public final class BlockOutputStreamEntry extends OutputStream { private final String key; private final XceiverClientManager xceiverClientManager; private final Pipeline pipeline; - private final Checksum checksum; + private final ChecksumType checksumType; + private final int bytesPerChecksum; private final String requestId; private final int chunkSize; // total number of bytes that should be written to this stream @@ -60,7 +62,8 @@ private BlockOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, Pipeline pipeline, String requestId, int chunkSize, long length, long streamBufferFlushSize, long streamBufferMaxSize, - long watchTimeout, List bufferList, Checksum checksum, + long watchTimeout, List bufferList, + ChecksumType checksumType, int bytesPerChecksum, Token token) { this.outputStream = null; this.blockID = blockID; @@ -76,7 +79,8 @@ private BlockOutputStreamEntry(BlockID blockID, String key, this.streamBufferMaxSize = streamBufferMaxSize; this.watchTimeout = watchTimeout; this.bufferList = bufferList; - this.checksum = checksum; + this.checksumType = checksumType; + this.bytesPerChecksum = bytesPerChecksum; } long getLength() { @@ -105,7 +109,8 @@ private void checkStream() throws IOException { this.outputStream = new BlockOutputStream(blockID, key, xceiverClientManager, pipeline, requestId, chunkSize, streamBufferFlushSize, - streamBufferMaxSize, watchTimeout, bufferList, checksum); + streamBufferMaxSize, watchTimeout, bufferList, checksumType, + bytesPerChecksum); } } @@ -198,10 +203,16 @@ public static class Builder { private long watchTimeout; private List bufferList; private Token token; - private Checksum checksum; + private ChecksumType checksumType; + private int bytesPerChecksum; - public Builder setChecksum(Checksum cs) { - this.checksum = cs; + public Builder setChecksumType(ChecksumType type) { + this.checksumType = type; + return this; + } + + public Builder setBytesPerChecksum(int bytes) { + this.bytesPerChecksum = bytes; return this; } @@ -270,7 +281,7 @@ public BlockOutputStreamEntry build() { return new BlockOutputStreamEntry(blockID, key, xceiverClientManager, pipeline, requestId, chunkSize, length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout, - bufferList, checksum, token); + bufferList, checksumType, bytesPerChecksum, token); } } @@ -294,10 +305,6 @@ public Pipeline getPipeline() { return pipeline; } - public Checksum getChecksum() { - return checksum; - } - public String getRequestId() { return requestId; } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 44972aedc0d..d74a240a78d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -21,10 +21,12 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ChecksumType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; @@ -79,7 +81,8 @@ public class KeyOutputStream extends OutputStream { private final long streamBufferMaxSize; private final long watchTimeout; private final long blockSize; - private final Checksum checksum; + private final int bytesPerChecksum; + private final ChecksumType checksumType; private List bufferList; private OmMultipartCommitUploadPartInfo commitUploadPartInfo; private FileEncryptionInfo feInfo; @@ -106,7 +109,10 @@ public KeyOutputStream() { bufferList.add(buffer); watchTimeout = 0; blockSize = 0; - this.checksum = new Checksum(); + this.checksumType = ChecksumType.valueOf( + OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); + this.bytesPerChecksum = OzoneConfigKeys + .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB } @VisibleForTesting @@ -142,7 +148,8 @@ public KeyOutputStream(OpenKeySession handler, OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize, String requestId, ReplicationFactor factor, ReplicationType type, long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout, - Checksum checksum, String uploadID, int partNumber, boolean isMultipart) { + ChecksumType checksumType, int bytesPerChecksum, + String uploadID, int partNumber, boolean isMultipart) { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.omClient = omClient; @@ -165,7 +172,8 @@ public KeyOutputStream(OpenKeySession handler, this.streamBufferMaxSize = bufferMaxSize; this.blockSize = size; this.watchTimeout = watchTimeout; - this.checksum = checksum; + this.bytesPerChecksum = bytesPerChecksum; + this.checksumType = checksumType; Preconditions.checkState(chunkSize > 0); Preconditions.checkState(streamBufferFlushSize > 0); @@ -220,7 +228,8 @@ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) .setStreamBufferMaxSize(streamBufferMaxSize) .setWatchTimeout(watchTimeout) .setBufferList(bufferList) - .setChecksum(checksum) + .setChecksumType(checksumType) + .setBytesPerChecksum(bytesPerChecksum) .setToken(subKeyInfo.getToken()); streamEntries.add(builder.build()); } @@ -573,7 +582,8 @@ public static class Builder { private long streamBufferMaxSize; private long blockSize; private long watchTimeout; - private Checksum checksum; + private ChecksumType checksumType; + private int bytesPerChecksum; private String multipartUploadID; private int multipartNumber; private boolean isMultipartKey; @@ -651,8 +661,13 @@ public Builder setWatchTimeout(long timeout) { return this; } - public Builder setChecksum(Checksum checksumObj){ - this.checksum = checksumObj; + public Builder setChecksumType(ChecksumType cType){ + this.checksumType = cType; + return this; + } + + public Builder setBytesPerChecksum(int bytes){ + this.bytesPerChecksum = bytes; return this; } @@ -664,8 +679,8 @@ public Builder setIsMultipartKey(boolean isMultipart) { public KeyOutputStream build() throws IOException { return new KeyOutputStream(openHandler, xceiverManager, scmClient, omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, - streamBufferMaxSize, blockSize, watchTimeout, checksum, - multipartUploadID, multipartNumber, isMultipartKey); + streamBufferMaxSize, blockSize, watchTimeout, checksumType, + bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey); } } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index eb27b7b2652..fec05308110 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -48,7 +48,6 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.protocol.ClientProtocol; -import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; @@ -110,7 +109,8 @@ public class RpcClient implements ClientProtocol { ozoneManagerClient; private final XceiverClientManager xceiverClientManager; private final int chunkSize; - private final Checksum checksum; + private final ChecksumType checksumType; + private final int bytesPerChecksum; private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; @@ -189,22 +189,22 @@ public RpcClient(Configuration conf) throws IOException { OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT, StorageUnit.BYTES); - int checksumSize; if(configuredChecksumSize < OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) { LOG.warn("The checksum size ({}) is not allowed to be less than the " + "minimum size ({}), resetting to the minimum size.", configuredChecksumSize, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE); - checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; + bytesPerChecksum = + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; } else { - checksumSize = configuredChecksumSize; + bytesPerChecksum = configuredChecksumSize; } + String checksumTypeStr = conf.get( OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - ChecksumType checksumType = ChecksumType.valueOf(checksumTypeStr); - this.checksum = new Checksum(checksumType, checksumSize); + checksumType = ChecksumType.valueOf(checksumTypeStr); } private InetSocketAddress getScmAddressForClient() throws IOException { @@ -602,7 +602,8 @@ public OzoneOutputStream createKey( .setStreamBufferMaxSize(streamBufferMaxSize) .setWatchTimeout(watchTimeout) .setBlockSize(blockSize) - .setChecksum(checksum) + .setChecksumType(checksumType) + .setBytesPerChecksum(bytesPerChecksum) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), @@ -863,7 +864,8 @@ public OzoneOutputStream createMultipartKey(String volumeName, .setStreamBufferMaxSize(streamBufferMaxSize) .setWatchTimeout(watchTimeout) .setBlockSize(blockSize) - .setChecksum(checksum) + .setBytesPerChecksum(bytesPerChecksum) + .setChecksumType(checksumType) .setMultipartNumber(partNumber) .setMultipartUploadID(uploadID) .setIsMultipartKey(true) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java index 4e77bfdb780..73a7963618a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java @@ -27,6 +27,9 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; @@ -681,6 +684,66 @@ public void testPutKeyRatisThreeNodes() } } + + @Test + public void testPutKeyRatisThreeNodesParallel() throws IOException, + InterruptedException { + String volumeName = UUID.randomUUID().toString(); + String bucketName = UUID.randomUUID().toString(); + long currentTime = Time.now(); + store.createVolume(volumeName); + OzoneVolume volume = store.getVolume(volumeName); + volume.createBucket(bucketName); + OzoneBucket bucket = volume.getBucket(bucketName); + + CountDownLatch latch = new CountDownLatch(2); + AtomicInteger failCount = new AtomicInteger(0); + + Runnable r = () -> { + try { + for (int i = 0; i < 5; i++) { + String keyName = UUID.randomUUID().toString(); + String data = generateData(5 * 1024 * 1024, + (byte) RandomUtils.nextLong()).toString(); + OzoneOutputStream out = bucket.createKey(keyName, + data.getBytes().length, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + out.write(data.getBytes()); + out.close(); + OzoneKey key = bucket.getKey(keyName); + Assert.assertEquals(keyName, key.getName()); + OzoneInputStream is = bucket.readKey(keyName); + byte[] fileContent = new byte[data.getBytes().length]; + is.read(fileContent); + is.close(); + Assert.assertTrue(verifyRatisReplication(volumeName, bucketName, + keyName, ReplicationType.RATIS, + ReplicationFactor.THREE)); + Assert.assertEquals(data, new String(fileContent)); + Assert.assertTrue(key.getCreationTime() >= currentTime); + Assert.assertTrue(key.getModificationTime() >= currentTime); + } + latch.countDown(); + } catch (IOException ex) { + latch.countDown(); + failCount.incrementAndGet(); + } + }; + + Thread thread1 = new Thread(r); + Thread thread2 = new Thread(r); + + thread1.start(); + thread2.start(); + + latch.await(600, TimeUnit.SECONDS); + + if (failCount.get() > 0) { + fail("testPutKeyRatisThreeNodesParallel failed"); + } + + } + private void readKey(OzoneBucket bucket, String keyName, String data) throws IOException { OzoneKey key = bucket.getKey(keyName); diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index b73f297054d..8197ce8a729 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -24,11 +24,11 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ChecksumType; import org.apache.hadoop.ozone.client.io.KeyInputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; -import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -86,7 +86,8 @@ public final class DistributedStorageHandler implements StorageHandler { private final long streamBufferMaxSize; private final long watchTimeout; private final long blockSize; - private final Checksum checksum; + private final ChecksumType checksumType; + private final int bytesPerChecksum; /** * Creates a new DistributedStorageHandler. @@ -136,23 +137,23 @@ public DistributedStorageHandler(OzoneConfiguration conf, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT, StorageUnit.BYTES); - int checksumSize; + if(configuredChecksumSize < OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) { LOG.warn("The checksum size ({}) is not allowed to be less than the " + "minimum size ({}), resetting to the minimum size.", configuredChecksumSize, OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE); - checksumSize = OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; + bytesPerChecksum = + OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE; } else { - checksumSize = configuredChecksumSize; + bytesPerChecksum = configuredChecksumSize; } String checksumTypeStr = conf.get( OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT); - ContainerProtos.ChecksumType checksumType = ContainerProtos.ChecksumType - .valueOf(checksumTypeStr); - this.checksum = new Checksum(checksumType, checksumSize); + this.checksumType = ChecksumType.valueOf(checksumTypeStr); + } @Override @@ -451,7 +452,8 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException, .setStreamBufferMaxSize(streamBufferMaxSize) .setBlockSize(blockSize) .setWatchTimeout(watchTimeout) - .setChecksum(checksum) + .setChecksumType(checksumType) + .setBytesPerChecksum(bytesPerChecksum) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(),