HDDS-372. There are three buffer copies in BlockOutputStream. Contributed by Shashikant Banerjee.

This commit is contained in:
Shashikant Banerjee 2019-04-09 10:27:58 +05:30
parent 69e3745b86
commit 2d4f6b6daa
13 changed files with 242 additions and 26 deletions

View File

@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ByteStringHelper;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@ -585,7 +586,7 @@ public class BlockOutputStream extends OutputStream {
} finally {
cleanup(false);
}
// TODO: Turn the below buffer empty check on whne Standalone pipeline
// TODO: Turn the below buffer empty check on when Standalone pipeline
// is removed in the write path in tests
// Preconditions.checkArgument(buffer.position() == 0);
// bufferPool.checkBufferPoolEmpty();
@ -676,9 +677,9 @@ public class BlockOutputStream extends OutputStream {
*/
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
int effectiveChunkSize = chunk.remaining();
ByteString data = ByteString.copyFrom(chunk);
ByteString data = ByteStringHelper.getByteString(chunk);
Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
ChecksumData checksumData = checksum.computeChecksum(data);
ChecksumData checksumData = checksum.computeChecksum(chunk);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(DigestUtils.md5Hex(key) + "_stream_" + streamId +
"_chunk_" + ++chunkIndex)

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Helper class to perform Unsafe ByteString conversion from byteBuffer or byte
* array depending on the config "ozone.UnsafeByteOperations.enabled".
*/
public final class ByteStringHelper {
private static final AtomicBoolean INITIALIZED = new AtomicBoolean();
private static volatile boolean isUnsafeByteOperationsEnabled;
/**
* There is no need to instantiate this class.
*/
private ByteStringHelper() {
}
public static void init(boolean isUnsafeByteOperation) {
final boolean set = INITIALIZED.compareAndSet(false, true);
if (set) {
ByteStringHelper.isUnsafeByteOperationsEnabled =
isUnsafeByteOperation;
} else {
// already initialized, check values
Preconditions.checkState(isUnsafeByteOperationsEnabled
== isUnsafeByteOperation);
}
}
private static ByteString copyFrom(ByteBuffer buffer) {
final ByteString bytes = ByteString.copyFrom(buffer);
// flip the buffer so as to read the data starting from pos 0 again
buffer.flip();
return bytes;
}
public static ByteString getByteString(ByteBuffer buffer) {
return isUnsafeByteOperationsEnabled ?
UnsafeByteOperations.unsafeWrap(buffer) : copyFrom(buffer);
}
public static ByteString getByteString(byte[] bytes) {
return isUnsafeByteOperationsEnabled ?
UnsafeByteOperations.unsafeWrap(bytes) : ByteString.copyFrom(bytes);
}
}

View File

@ -355,7 +355,7 @@ public final class ContainerProtocolCalls {
KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true")
.build();
Checksum checksum = new Checksum();
ChecksumData checksumData = checksum.computeChecksum(data);
ChecksumData checksumData = checksum.computeChecksum(data, 0, data.length);
ChunkInfo chunk =
ChunkInfo.newBuilder()
.setChunkName(blockID.getLocalID() + "_chunk")

View File

@ -94,6 +94,11 @@ public final class OzoneConfigKeys {
public static final String OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF =
"OFF";
public static final String OZONE_UNSAFEBYTEOPERATIONS_ENABLED =
"ozone.UnsafeByteOperations.enabled";
public static final boolean OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT
= true;
public static final String OZONE_CONTAINER_CACHE_SIZE =
"ozone.container.cache.size";
public static final int OZONE_CONTAINER_CACHE_DEFAULT = 1024;

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.common;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
@ -76,12 +78,13 @@ public class Checksum {
/**
* Computes checksum for give data.
* @param byteString input data in the form of ByteString.
* @param byteBuffer input data in the form of ByteString.
* @return ChecksumData computed for input data.
*/
public ChecksumData computeChecksum(ByteString byteString)
public ChecksumData computeChecksum(ByteBuffer byteBuffer)
throws OzoneChecksumException {
return computeChecksum(byteString.toByteArray());
return computeChecksum(byteBuffer.array(), byteBuffer.position(),
byteBuffer.limit());
}
/**
@ -91,6 +94,16 @@ public class Checksum {
*/
public ChecksumData computeChecksum(byte[] data)
throws OzoneChecksumException {
return computeChecksum(data, 0, data.length);
}
/**
* Computes checksum for give data.
* @param data input data in the form of byte array.
* @return ChecksumData computed for input data.
*/
public ChecksumData computeChecksum(byte[] data, int offset, int len)
throws OzoneChecksumException {
ChecksumData checksumData = new ChecksumData(this.checksumType, this
.bytesPerChecksum);
if (checksumType == ChecksumType.NONE) {
@ -120,7 +133,7 @@ public class Checksum {
// Compute number of checksums needs for given data length based on bytes
// per checksum.
int dataSize = data.length;
int dataSize = len - offset;
int numChecksums = (dataSize + bytesPerChecksum - 1) / bytesPerChecksum;
// Checksum is computed for each bytesPerChecksum number of bytes of data
@ -128,7 +141,7 @@ public class Checksum {
// remaining data with length less than bytesPerChecksum.
List<ByteString> checksumList = new ArrayList<>(numChecksums);
for (int index = 0; index < numChecksums; index++) {
checksumList.add(computeChecksumAtIndex(data, index));
checksumList.add(computeChecksumAtIndex(data, index, offset, len));
}
checksumData.setChecksums(checksumList);
@ -140,15 +153,19 @@ public class Checksum {
* and a max length of bytesPerChecksum.
* @param data input data
* @param index index to compute the offset from where data must be read
* @param start start pos of the array where the computation has to start
* @length length of array till which checksum needs to be computed
* @return computed checksum ByteString
* @throws OzoneChecksumException thrown when ChecksumType is not recognized
*/
private ByteString computeChecksumAtIndex(byte[] data, int index)
private ByteString computeChecksumAtIndex(byte[] data, int index, int start,
int length)
throws OzoneChecksumException {
int offset = index * bytesPerChecksum;
int offset = start + index * bytesPerChecksum;
int dataLength = length - start;
int len = bytesPerChecksum;
if ((offset + len) > data.length) {
len = data.length - offset;
if ((offset + len) > dataLength) {
len = dataLength - offset;
}
byte[] checksumBytes = null;
switch (checksumType) {
@ -236,7 +253,8 @@ public class Checksum {
int bytesPerChecksum = checksumData.getBytesPerChecksum();
Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
ChecksumData computedChecksumData = checksum.computeChecksum(data);
ChecksumData computedChecksumData =
checksum.computeChecksum(data, 0, data.length);
return checksumData.verifyChecksumDataMatches(computedChecksumData);
}

View File

@ -386,6 +386,14 @@
assumed.
</description>
</property>
<property>
<name>ozone.UnsafeByteOperations.enabled</name>
<value>true</value>
<tag>OZONE, PERFORMANCE, CLIENT</tag>
<description>It specifies whether to use unsafe or safe buffer to byteString
copy.
</description>
</property>
<property>
<name>ozone.client.connection.timeout</name>
<value>5000ms</value>

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.scm.ByteStringHelper;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
@ -146,6 +147,10 @@ public class KeyValueHandler extends Handler {
// this handler lock is used for synchronizing createContainer Requests,
// so using a fair lock here.
containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
ByteStringHelper.init(isUnsafeByteOperationsEnabled);
}
@VisibleForTesting

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkResponseProto;
import org.apache.hadoop.hdds.scm.ByteStringHelper;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.io.IOUtils;
@ -33,7 +34,6 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
@ -315,7 +315,8 @@ public final class ChunkUtils {
ReadChunkResponseProto.Builder response =
ReadChunkResponseProto.newBuilder();
response.setChunkData(info.getProtoBufMessage());
response.setData(ByteString.copyFrom(data));
response.setData(
ByteStringHelper.getByteString(data));
response.setBlockID(msg.getReadChunk().getBlockID());
ContainerCommandResponseProto.Builder builder =

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.scm.ByteStringHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.tracing.TracingUtil;
@ -215,6 +216,10 @@ public class RpcClient implements ClientProtocol, KeyProviderTokenIssuer {
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
dtService =
getOMProxyProvider().getProxy().getDelegationTokenService();
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
ByteStringHelper.init(isUnsafeByteOperationsEnabled);
}
private InetSocketAddress getScmAddressForClient() throws IOException {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.web.storage;
import com.google.common.base.Strings;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.ByteStringHelper;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.io.IOUtils;
@ -158,6 +159,11 @@ public final class DistributedStorageHandler implements StorageHandler {
this.maxRetryCount =
conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
OZONE_CLIENT_MAX_RETRIES_DEFAULT);
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
ByteStringHelper.init(isUnsafeByteOperationsEnabled);
}
@Override

View File

@ -23,18 +23,15 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests Freon, with MiniOzoneCluster and validate data.
*/
public class TestDataValidate {
public abstract class TestDataValidate {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static MiniOzoneCluster cluster = null;
/**
* Create a MiniDFSCluster for testing.
@ -42,9 +39,7 @@ public class TestDataValidate {
* Ozone is made active by setting OZONE_ENABLED = true
*
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
static void startCluster(OzoneConfiguration conf) throws Exception {
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5).build();
@ -54,8 +49,7 @@ public class TestDataValidate {
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
static void shutdownCluster() {
if (cluster != null) {
cluster.shutdown();
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.freon;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
* Tests Freon, with MiniOzoneCluster and validate data.
*/
public class TestDataValidateWithSafeByteOperations extends TestDataValidate {
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean(OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
false);
startCluster(conf);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
shutdownCluster();
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.freon;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
* Tests Freon, with MiniOzoneCluster and validate data.
*/
public class TestDataValidateWithUnsafeByteOperations extends TestDataValidate {
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setBoolean(OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
true);
startCluster(conf);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
shutdownCluster();
}
}