HDDS-799. Avoid ByteString to byte array conversion cost by using ByteBuffer in Datanode. Contributed by Mukul Kumar Singh.

This commit is contained in:
Shashikant Banerjee 2018-11-05 23:43:22 +05:30
parent c8ca1747c0
commit 942693bddd
6 changed files with 62 additions and 47 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.keyvalue;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -76,7 +77,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import static org.apache.hadoop.hdds.HddsConfigKeys
.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
@ -652,10 +653,10 @@ public class KeyValueHandler extends Handler {
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
Preconditions.checkNotNull(chunkInfo);
byte[] data = null;
ByteBuffer data = null;
if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
request.getWriteChunk().getStage() == Stage.COMBINED) {
data = request.getWriteChunk().getData().toByteArray();
data = request.getWriteChunk().getData().asReadOnlyByteBuffer();
}
chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
@ -713,7 +714,7 @@ public class KeyValueHandler extends Handler {
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
putSmallFileReq.getChunkInfo());
Preconditions.checkNotNull(chunkInfo);
byte[] data = putSmallFileReq.getData().toByteArray();
ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer();
// chunks will be committed as a part of handling putSmallFile
// here. There is no need to maintain this info in openContainerBlockMap.
chunkManager.writeChunk(
@ -724,7 +725,7 @@ public class KeyValueHandler extends Handler {
blockData.setChunks(chunks);
// TODO: add bcsId as a part of putSmallFile transaction
blockManager.putBlock(kvContainer, blockData);
metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity());
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);

View File

@ -73,15 +73,15 @@ public final class ChunkUtils {
* @throws StorageContainerException
*/
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
byte[] data, VolumeIOStats volumeIOStats) throws
StorageContainerException, ExecutionException, InterruptedException,
NoSuchAlgorithmException {
ByteBuffer data, VolumeIOStats volumeIOStats)
throws StorageContainerException, ExecutionException,
InterruptedException, NoSuchAlgorithmException {
int bufferSize = data.capacity();
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
if (data.length != chunkInfo.getLen()) {
if (bufferSize != chunkInfo.getLen()) {
String err = String.format("data array does not match the length " +
"specified. DataLen: %d Byte Array: %d",
chunkInfo.getLen(), data.length);
chunkInfo.getLen(), bufferSize);
log.error(err);
throw new StorageContainerException(err, INVALID_WRITE_SIZE);
}
@ -103,16 +103,16 @@ public final class ChunkUtils {
StandardOpenOption.SPARSE,
StandardOpenOption.SYNC);
lock = file.lock().get();
int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
int size = file.write(data, chunkInfo.getOffset()).get();
// Increment volumeIO stats here.
volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
volumeIOStats.incWriteOpCount();
volumeIOStats.incWriteBytes(size);
if (size != data.length) {
if (size != bufferSize) {
log.error("Invalid write size found. Size:{} Expected: {} ", size,
data.length);
bufferSize);
throw new StorageContainerException("Invalid write size found. " +
"Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
"Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE);
}
} catch (StorageContainerException ex) {
throw ex;
@ -183,7 +183,8 @@ public final class ChunkUtils {
volumeIOStats.incReadOpCount();
volumeIOStats.incReadBytes(data.getLen());
if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
verifyChecksum(data, buf.array(), log);
buf.rewind();
verifyChecksum(data, buf, log);
}
return buf;
} catch (IOException e) {
@ -211,10 +212,11 @@ public final class ChunkUtils {
* @throws NoSuchAlgorithmException
* @throws StorageContainerException
*/
private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
log) throws NoSuchAlgorithmException, StorageContainerException {
private static void verifyChecksum(ChunkInfo chunkInfo, ByteBuffer data,
Logger log) throws NoSuchAlgorithmException, StorageContainerException {
MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
sha.update(data);
data.rewind();
if (!Hex.encodeHexString(sha.digest()).equals(
chunkInfo.getChecksum())) {
log.error("Checksum mismatch. Provided: {} , computed: {}",

View File

@ -66,7 +66,7 @@ public class ChunkManagerImpl implements ChunkManager {
* @throws StorageContainerException
*/
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
byte[] data, ContainerProtos.Stage stage)
ByteBuffer data, ContainerProtos.Stage stage)
throws StorageContainerException {
try {

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import java.nio.ByteBuffer;
/**
* Chunk Manager allows read, write, delete and listing of chunks in
@ -41,7 +42,7 @@ public interface ChunkManager {
* @throws StorageContainerException
*/
void writeChunk(Container container, BlockID blockID, ChunkInfo info,
byte[] data, ContainerProtos.Stage stage)
ByteBuffer data, ContainerProtos.Stage stage)
throws StorageContainerException;
/**

View File

@ -39,6 +39,7 @@ import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.UUID;
@ -109,8 +110,8 @@ public class TestChunkManagerImpl {
// As no chunks are written to the volume writeBytes should be 0
checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.WRITE_DATA);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
// Now a chunk file is being written with Stage WRITE_DATA, so it should
// create a temporary chunk file.
assertTrue(chunksPath.listFiles().length == 1);
@ -126,8 +127,8 @@ public class TestChunkManagerImpl {
checkWriteIOStats(data.length, 1);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMMIT_DATA);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMMIT_DATA);
checkWriteIOStats(data.length, 1);
@ -146,8 +147,8 @@ public class TestChunkManagerImpl {
long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.WRITE_DATA);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
fail("testWriteChunkIncorrectLength failed");
} catch (StorageContainerException ex) {
// As we got an exception, writeBytes should be 0.
@ -167,8 +168,8 @@ public class TestChunkManagerImpl {
// Initially chunks folder should be empty.
assertTrue(chunksPath.listFiles().length == 0);
checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
// Now a chunk file is being written with Stage COMBINED_DATA, so it should
// create a chunk file.
assertTrue(chunksPath.listFiles().length == 1);
@ -180,8 +181,8 @@ public class TestChunkManagerImpl {
@Test
public void testReadChunk() throws Exception {
checkWriteIOStats(0, 0);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
checkWriteIOStats(data.length, 1);
checkReadIOStats(0, 0);
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
@ -194,8 +195,8 @@ public class TestChunkManagerImpl {
@Test
public void testDeleteChunk() throws Exception {
File chunksPath = new File(keyValueContainerData.getChunksPath());
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
assertTrue(chunksPath.listFiles().length == 1);
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
assertTrue(chunksPath.listFiles().length == 0);
@ -204,8 +205,8 @@ public class TestChunkManagerImpl {
@Test
public void testDeleteChunkUnsupportedRequest() throws Exception {
try {
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
long randomLength = 200L;
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), 0), 0, randomLength);
@ -224,8 +225,8 @@ public class TestChunkManagerImpl {
.getLocalID(), 0), 0, data.length);
//Setting checksum to some value.
chunkInfo.setChecksum("some garbage");
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
fail("testWriteChunkChecksumMismatch failed");
} catch (StorageContainerException ex) {
GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
@ -252,8 +253,8 @@ public class TestChunkManagerImpl {
for (int i=0; i<100; i++) {
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
.getLocalID(), i), 0, data.length);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
ContainerProtos.Stage.COMBINED);
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
}
checkWriteIOStats(data.length*100, 100);
assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);

View File

@ -59,6 +59,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@ -330,7 +331,8 @@ public class TestContainerPersistence {
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
return info;
}
@ -371,7 +373,8 @@ public class TestContainerPersistence {
ChunkInfo info = getChunk(blockID.getLocalID(), x, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
fileHashMap.put(fileName, info);
}
@ -431,7 +434,8 @@ public class TestContainerPersistence {
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
byte[] readData = chunkManager.readChunk(container, blockID, info);
assertTrue(Arrays.equals(data, readData));
@ -463,11 +467,14 @@ public class TestContainerPersistence {
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
// With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
long bytesUsed = container.getContainerData().getBytesUsed();
Assert.assertEquals(datalen, bytesUsed);
@ -501,7 +508,8 @@ public class TestContainerPersistence {
byte[] data = getData(datalen);
oldSha.update(data);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
}
// Request to read the whole data in a single go.
@ -532,7 +540,8 @@ public class TestContainerPersistence {
blockID.getLocalID(), 0, 0, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
chunkManager.deleteChunk(container, blockID, info);
exception.expect(StorageContainerException.class);
exception.expectMessage("Unable to find the chunk file.");
@ -646,7 +655,8 @@ public class TestContainerPersistence {
info = getChunk(blockID.getLocalID(), x, x * datalen, datalen);
byte[] data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data, COMBINED);
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
COMBINED);
totalSize += datalen;
chunkList.add(info);
}