diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 4cb23ed418d..1271d997720 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -20,6 +20,7 @@ import java.io.FileInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -76,7 +77,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import static org.apache.hadoop.hdds.HddsConfigKeys .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; @@ -652,10 +653,10 @@ ContainerCommandResponseProto handleWriteChunk( ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto); Preconditions.checkNotNull(chunkInfo); - byte[] data = null; + ByteBuffer data = null; if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || request.getWriteChunk().getStage() == Stage.COMBINED) { - data = request.getWriteChunk().getData().toByteArray(); + data = request.getWriteChunk().getData().asReadOnlyByteBuffer(); } chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, @@ -713,7 +714,7 @@ ContainerCommandResponseProto handlePutSmallFile( ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf( putSmallFileReq.getChunkInfo()); Preconditions.checkNotNull(chunkInfo); - byte[] data = putSmallFileReq.getData().toByteArray(); + ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer(); // chunks will be committed as a part of handling putSmallFile // here. There is no need to maintain this info in openContainerBlockMap. chunkManager.writeChunk( @@ -724,7 +725,7 @@ ContainerCommandResponseProto handlePutSmallFile( blockData.setChunks(chunks); // TODO: add bcsId as a part of putSmallFile transaction blockManager.putBlock(kvContainer, blockData); - metrics.incContainerBytesStats(Type.PutSmallFile, data.length); + metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity()); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 20598d9ec60..718f5ded6ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -73,15 +73,15 @@ private ChunkUtils() { * @throws StorageContainerException */ public static void writeData(File chunkFile, ChunkInfo chunkInfo, - byte[] data, VolumeIOStats volumeIOStats) throws - StorageContainerException, ExecutionException, InterruptedException, - NoSuchAlgorithmException { - + ByteBuffer data, VolumeIOStats volumeIOStats) + throws StorageContainerException, ExecutionException, + InterruptedException, NoSuchAlgorithmException { + int bufferSize = data.capacity(); Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); - if (data.length != chunkInfo.getLen()) { + if (bufferSize != chunkInfo.getLen()) { String err = String.format("data array does not match the length " + "specified. DataLen: %d Byte Array: %d", - chunkInfo.getLen(), data.length); + chunkInfo.getLen(), bufferSize); log.error(err); throw new StorageContainerException(err, INVALID_WRITE_SIZE); } @@ -103,16 +103,16 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo, StandardOpenOption.SPARSE, StandardOpenOption.SYNC); lock = file.lock().get(); - int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); + int size = file.write(data, chunkInfo.getOffset()).get(); // Increment volumeIO stats here. volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart); volumeIOStats.incWriteOpCount(); volumeIOStats.incWriteBytes(size); - if (size != data.length) { + if (size != bufferSize) { log.error("Invalid write size found. Size:{} Expected: {} ", size, - data.length); + bufferSize); throw new StorageContainerException("Invalid write size found. " + - "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE); + "Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE); } } catch (StorageContainerException ex) { throw ex; @@ -183,7 +183,8 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data, volumeIOStats.incReadOpCount(); volumeIOStats.incReadBytes(data.getLen()); if (data.getChecksum() != null && !data.getChecksum().isEmpty()) { - verifyChecksum(data, buf.array(), log); + buf.rewind(); + verifyChecksum(data, buf, log); } return buf; } catch (IOException e) { @@ -211,10 +212,11 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data, * @throws NoSuchAlgorithmException * @throws StorageContainerException */ - private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger - log) throws NoSuchAlgorithmException, StorageContainerException { + private static void verifyChecksum(ChunkInfo chunkInfo, ByteBuffer data, + Logger log) throws NoSuchAlgorithmException, StorageContainerException { MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); sha.update(data); + data.rewind(); if (!Hex.encodeHexString(sha.digest()).equals( chunkInfo.getChecksum())) { log.error("Checksum mismatch. Provided: {} , computed: {}", diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java index 6fd8d5f5891..c630e1990e1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java @@ -66,7 +66,7 @@ public class ChunkManagerImpl implements ChunkManager { * @throws StorageContainerException */ public void writeChunk(Container container, BlockID blockID, ChunkInfo info, - byte[] data, ContainerProtos.Stage stage) + ByteBuffer data, ContainerProtos.Stage stage) throws StorageContainerException { try { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index 7134be1d34d..3b06585b12f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import java.nio.ByteBuffer; /** * Chunk Manager allows read, write, delete and listing of chunks in @@ -41,7 +42,7 @@ public interface ChunkManager { * @throws StorageContainerException */ void writeChunk(Container container, BlockID blockID, ChunkInfo info, - byte[] data, ContainerProtos.Stage stage) + ByteBuffer data, ContainerProtos.Stage stage) throws StorageContainerException; /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java index 3c0876b7a74..9e3edf74fbb 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java @@ -39,6 +39,7 @@ import org.mockito.Mockito; import java.io.File; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.UUID; @@ -109,8 +110,8 @@ public void testWriteChunkStageWriteAndCommit() throws Exception { // As no chunks are written to the volume writeBytes should be 0 checkWriteIOStats(0, 0); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.WRITE_DATA); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA); // Now a chunk file is being written with Stage WRITE_DATA, so it should // create a temporary chunk file. assertTrue(chunksPath.listFiles().length == 1); @@ -126,8 +127,8 @@ public void testWriteChunkStageWriteAndCommit() throws Exception { checkWriteIOStats(data.length, 1); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.COMMIT_DATA); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.COMMIT_DATA); checkWriteIOStats(data.length, 1); @@ -146,8 +147,8 @@ public void testWriteChunkIncorrectLength() throws Exception { long randomLength = 200L; chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.WRITE_DATA); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA); fail("testWriteChunkIncorrectLength failed"); } catch (StorageContainerException ex) { // As we got an exception, writeBytes should be 0. @@ -167,8 +168,8 @@ public void testWriteChunkStageCombinedData() throws Exception { // Initially chunks folder should be empty. assertTrue(chunksPath.listFiles().length == 0); checkWriteIOStats(0, 0); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.COMBINED); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); // Now a chunk file is being written with Stage COMBINED_DATA, so it should // create a chunk file. assertTrue(chunksPath.listFiles().length == 1); @@ -180,8 +181,8 @@ public void testWriteChunkStageCombinedData() throws Exception { @Test public void testReadChunk() throws Exception { checkWriteIOStats(0, 0); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.COMBINED); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); checkWriteIOStats(data.length, 1); checkReadIOStats(0, 0); byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, @@ -194,8 +195,8 @@ public void testReadChunk() throws Exception { @Test public void testDeleteChunk() throws Exception { File chunksPath = new File(keyValueContainerData.getChunksPath()); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.COMBINED); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); assertTrue(chunksPath.listFiles().length == 1); chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); assertTrue(chunksPath.listFiles().length == 0); @@ -204,8 +205,8 @@ public void testDeleteChunk() throws Exception { @Test public void testDeleteChunkUnsupportedRequest() throws Exception { try { - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.COMBINED); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); long randomLength = 200L; chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); @@ -224,8 +225,8 @@ public void testWriteChunkChecksumMismatch() throws Exception { .getLocalID(), 0), 0, data.length); //Setting checksum to some value. chunkInfo.setChecksum("some garbage"); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.COMBINED); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); fail("testWriteChunkChecksumMismatch failed"); } catch (StorageContainerException ex) { GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex); @@ -252,8 +253,8 @@ public void testWriteAndReadChunkMultipleTimes() throws Exception { for (int i=0; i<100; i++) { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), i), 0, data.length); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, - ContainerProtos.Stage.COMBINED); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, + ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); } checkWriteIOStats(data.length*100, 100); assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index c2941ed3592..f6c498adeab 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -59,6 +59,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; @@ -330,7 +331,8 @@ private ChunkInfo writeChunkHelper(BlockID blockID) blockID.getLocalID(), 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); return info; } @@ -371,7 +373,8 @@ public void testWritReadManyChunks() throws IOException, ChunkInfo info = getChunk(blockID.getLocalID(), x, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); fileHashMap.put(fileName, info); } @@ -431,7 +434,8 @@ public void testPartialRead() throws Exception { blockID.getLocalID(), 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); byte[] readData = chunkManager.readChunk(container, blockID, info); assertTrue(Arrays.equals(data, readData)); @@ -463,11 +467,14 @@ public void testOverWrite() throws IOException, blockID.getLocalID(), 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); // With the overwrite flag it should work now. info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); long bytesUsed = container.getContainerData().getBytesUsed(); Assert.assertEquals(datalen, bytesUsed); @@ -501,7 +508,8 @@ public void testMultipleWriteSingleRead() throws IOException, byte[] data = getData(datalen); oldSha.update(data); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); } // Request to read the whole data in a single go. @@ -532,7 +540,8 @@ public void testDeleteChunk() throws IOException, blockID.getLocalID(), 0, 0, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); chunkManager.deleteChunk(container, blockID, info); exception.expect(StorageContainerException.class); exception.expectMessage("Unable to find the chunk file."); @@ -646,7 +655,8 @@ public void testPutBlockWithLotsOfChunks() throws IOException, info = getChunk(blockID.getLocalID(), x, x * datalen, datalen); byte[] data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, data, COMBINED); + chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + COMBINED); totalSize += datalen; chunkList.add(info); }