diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 5130253641f..01964badf3d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -545,10 +545,12 @@ public class KeyValueHandler extends Handler { .getChunkData()); Preconditions.checkNotNull(chunkInfo); - boolean isReadFromTmpFile = dispatcherContext == null ? false : - dispatcherContext.isReadFromTmpFile(); + if (dispatcherContext == null) { + dispatcherContext = new DispatcherContext.Builder().build(); + } + data = chunkManager - .readChunk(kvContainer, blockID, chunkInfo, isReadFromTmpFile); + .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext); metrics.incContainerBytesStats(Type.ReadChunk, data.length); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -619,15 +621,17 @@ public class KeyValueHandler extends Handler { Preconditions.checkNotNull(chunkInfo); ByteBuffer data = null; - WriteChunkStage stage = - dispatcherContext == null ? WriteChunkStage.COMBINED : - dispatcherContext.getStage(); + if (dispatcherContext == null) { + dispatcherContext = new DispatcherContext.Builder().build(); + } + WriteChunkStage stage = dispatcherContext.getStage(); if (stage == WriteChunkStage.WRITE_DATA || stage == WriteChunkStage.COMBINED) { data = request.getWriteChunk().getData().asReadOnlyByteBuffer(); } - chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage); + chunkManager + .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); // We should increment stats after writeChunk if (stage == WriteChunkStage.WRITE_DATA|| @@ -677,19 +681,19 @@ public class KeyValueHandler extends Handler { putSmallFileReq.getChunkInfo()); Preconditions.checkNotNull(chunkInfo); ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer(); - WriteChunkStage stage = - dispatcherContext == null ? WriteChunkStage.COMBINED : - dispatcherContext.getStage(); + if (dispatcherContext == null) { + dispatcherContext = new DispatcherContext.Builder().build(); + } + // chunks will be committed as a part of handling putSmallFile // here. There is no need to maintain this info in openContainerBlockMap. - chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage); + chunkManager + .writeChunk(kvContainer, blockID, chunkInfo, data, dispatcherContext); List chunks = new LinkedList<>(); chunks.add(chunkInfo.getProtoBufMessage()); blockData.setChunks(chunks); - long bcsId = - dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); - blockData.setBlockCommitSequenceId(bcsId); + blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex()); blockManager.putBlock(kvContainer, blockData); metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity()); @@ -728,11 +732,13 @@ public class KeyValueHandler extends Handler { ContainerProtos.ChunkInfo chunkInfo = null; ByteString dataBuf = ByteString.EMPTY; + DispatcherContext dispatcherContext = + new DispatcherContext.Builder().build(); for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { // if the block is committed, all chunks must have been committed. // Tmp chunk files won't exist here. byte[] data = chunkManager.readChunk(kvContainer, blockID, - ChunkInfo.getFromProtoBuf(chunk), false); + ChunkInfo.getFromProtoBuf(chunk), dispatcherContext); ByteString current = ByteString.copyFrom(data); dataBuf = dataBuf.concat(current); chunkInfo = chunk; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java index a2e8e5ce9a5..e4814cb26f6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java @@ -67,13 +67,14 @@ public class ChunkManagerImpl implements ChunkManager { * @param blockID - ID of the block * @param info - ChunkInfo * @param data - data of the chunk - * @param stage - Stage of the Chunk operation + * @param dispatcherContext - dispatcherContextInfo * @throws StorageContainerException */ public void writeChunk(Container container, BlockID blockID, ChunkInfo info, - ByteBuffer data, DispatcherContext.WriteChunkStage stage) + ByteBuffer data, DispatcherContext dispatcherContext) throws StorageContainerException { - + Preconditions.checkNotNull(dispatcherContext); + DispatcherContext.WriteChunkStage stage = dispatcherContext.getStage(); try { KeyValueContainerData containerData = (KeyValueContainerData) container @@ -85,7 +86,7 @@ public class ChunkManagerImpl implements ChunkManager { boolean isOverwrite = ChunkUtils.validateChunkForOverwrite( chunkFile, info); - File tmpChunkFile = getTmpChunkFile(chunkFile, info); + File tmpChunkFile = getTmpChunkFile(chunkFile, dispatcherContext); LOG.debug( "writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file:{}", @@ -137,6 +138,8 @@ public class ChunkManagerImpl implements ChunkManager { LOG.warn("ChunkFile already exists" + chunkFile); return; } + // While committing a chunk , just rename the tmp chunk file which has + // the same term and log index appended as the current transaction commitChunk(tmpChunkFile, chunkFile); // Increment container stats here, as we commit the data. containerData.incrBytesUsed(info.getLen()); @@ -179,14 +182,14 @@ public class ChunkManagerImpl implements ChunkManager { * @param container - Container for the chunk * @param blockID - ID of the block. * @param info - ChunkInfo. - * @param readFromTmpFile whether to read from tmp chunk file or not. + * @param dispatcherContext dispatcher context info. * @return byte array * @throws StorageContainerException * TODO: Right now we do not support partial reads and writes of chunks. * TODO: Explore if we need to do that for ozone. */ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, - boolean readFromTmpFile) throws StorageContainerException { + DispatcherContext dispatcherContext) throws StorageContainerException { try { KeyValueContainerData containerData = (KeyValueContainerData) container .getContainerData(); @@ -204,8 +207,8 @@ public class ChunkManagerImpl implements ChunkManager { // In case the chunk file does not exist but tmp chunk file exist, // read from tmp chunk file if readFromTmpFile is set to true - if (!chunkFile.exists() && readFromTmpFile) { - chunkFile = getTmpChunkFile(chunkFile, info); + if (!chunkFile.exists() && dispatcherContext.isReadFromTmpFile()) { + chunkFile = getTmpChunkFile(chunkFile, dispatcherContext); } data = ChunkUtils.readData(chunkFile, info, volumeIOStats); containerData.incrReadCount(); @@ -279,17 +282,21 @@ public class ChunkManagerImpl implements ChunkManager { /** * Returns the temporary chunkFile path. - * @param chunkFile - * @param info + * @param chunkFile chunkFileName + * @param dispatcherContext dispatcher context info * @return temporary chunkFile path * @throws StorageContainerException */ - private File getTmpChunkFile(File chunkFile, ChunkInfo info) - throws StorageContainerException { + private File getTmpChunkFile(File chunkFile, + DispatcherContext dispatcherContext) { return new File(chunkFile.getParent(), chunkFile.getName() + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + - OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX + + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + + dispatcherContext.getTerm() + + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + + dispatcherContext.getLogIndex()); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index 4282e466c72..5a6898f558a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -39,11 +39,11 @@ public interface ChunkManager { * @param container - Container for the chunk * @param blockID - ID of the block. * @param info - ChunkInfo. - * @param stage - Chunk Stage write. + * @param dispatcherContext - dispatcher context info. * @throws StorageContainerException */ void writeChunk(Container container, BlockID blockID, ChunkInfo info, - ByteBuffer data, DispatcherContext.WriteChunkStage stage) + ByteBuffer data, DispatcherContext dispatcherContext) throws StorageContainerException; /** @@ -52,7 +52,7 @@ public interface ChunkManager { * @param container - Container for the chunk * @param blockID - ID of the block. * @param info - ChunkInfo. - * @param readFromTmpFile whether to read from tmp chunk file or not + * @param dispatcherContext - dispatcher context info. * @return byte array * @throws StorageContainerException * @@ -60,7 +60,7 @@ public interface ChunkManager { * TODO: Explore if we need to do that for ozone. */ byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, - boolean readFromTmpFile) throws StorageContainerException; + DispatcherContext dispatcherContext) throws StorageContainerException; /** * Deletes a given chunk. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java index fd48bf51c94..cf9ea891ebe 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java @@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; @@ -103,6 +103,10 @@ public class TestChunkManagerImpl { } + private DispatcherContext getDispatcherContext() { + return new DispatcherContext.Builder().build(); + } + @Test public void testWriteChunkStageWriteAndCommit() throws Exception { //As in Setup, we try to create container, these paths should exist. @@ -115,16 +119,20 @@ public class TestChunkManagerImpl { // As no chunks are written to the volume writeBytes should be 0 checkWriteIOStats(0, 0); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA); + ByteBuffer.wrap(data), new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build()); // Now a chunk file is being written with Stage WRITE_DATA, so it should // create a temporary chunk file. assertTrue(chunksPath.listFiles().length == 1); + long term = 0; + long index = 0; File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo); File tempChunkFile = new File(chunkFile.getParent(), - chunkFile.getName() + - OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + - OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX); + chunkFile.getName() + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX + + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + term + + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + index); // As chunk write stage is WRITE_DATA, temp chunk file will be created. assertTrue(tempChunkFile.exists()); @@ -132,7 +140,8 @@ public class TestChunkManagerImpl { checkWriteIOStats(data.length, 1); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.COMMIT_DATA); + ByteBuffer.wrap(data), new DispatcherContext.Builder() + .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build()); checkWriteIOStats(data.length, 1); @@ -152,7 +161,7 @@ public class TestChunkManagerImpl { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA); + ByteBuffer.wrap(data), getDispatcherContext()); fail("testWriteChunkIncorrectLength failed"); } catch (StorageContainerException ex) { // As we got an exception, writeBytes should be 0. @@ -173,7 +182,7 @@ public class TestChunkManagerImpl { assertTrue(chunksPath.listFiles().length == 0); checkWriteIOStats(0, 0); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.COMBINED); + ByteBuffer.wrap(data), getDispatcherContext()); // Now a chunk file is being written with Stage COMBINED_DATA, so it should // create a chunk file. assertTrue(chunksPath.listFiles().length == 1); @@ -186,11 +195,11 @@ public class TestChunkManagerImpl { public void testReadChunk() throws Exception { checkWriteIOStats(0, 0); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.COMBINED); + ByteBuffer.wrap(data), getDispatcherContext()); checkWriteIOStats(data.length, 1); checkReadIOStats(0, 0); byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, - chunkInfo, false); + chunkInfo, getDispatcherContext()); assertEquals(expectedData.length, data.length); assertTrue(Arrays.equals(expectedData, data)); checkReadIOStats(data.length, 1); @@ -200,7 +209,7 @@ public class TestChunkManagerImpl { public void testDeleteChunk() throws Exception { File chunksPath = new File(keyValueContainerData.getChunksPath()); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.COMBINED); + ByteBuffer.wrap(data), getDispatcherContext()); assertTrue(chunksPath.listFiles().length == 1); chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); assertTrue(chunksPath.listFiles().length == 0); @@ -210,7 +219,7 @@ public class TestChunkManagerImpl { public void testDeleteChunkUnsupportedRequest() throws Exception { try { chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.COMBINED); + ByteBuffer.wrap(data), getDispatcherContext()); long randomLength = 200L; chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); @@ -227,7 +236,7 @@ public class TestChunkManagerImpl { try { // trying to read a chunk, where chunk file does not exist byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, - chunkInfo, false); + chunkInfo, getDispatcherContext()); fail("testReadChunkFileNotExists failed"); } catch (StorageContainerException ex) { GenericTestUtils.assertExceptionContains("Unable to find the chunk " + @@ -242,7 +251,7 @@ public class TestChunkManagerImpl { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), i), 0, data.length); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), WriteChunkStage.COMBINED); + ByteBuffer.wrap(data), getDispatcherContext()); } checkWriteIOStats(data.length*100, 100); assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0); @@ -250,7 +259,8 @@ public class TestChunkManagerImpl { for (int i=0; i<100; i++) { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), i), 0, data.length); - chunkManager.readChunk(keyValueContainer, blockID, chunkInfo, false); + chunkManager.readChunk(keyValueContainer, blockID, chunkInfo, + getDispatcherContext()); } checkReadIOStats(data.length*100, 100); assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 32b01ae4aa9..d89ffb62379 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; @@ -78,7 +79,6 @@ import java.util.UUID; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum; @@ -156,6 +156,10 @@ public class TestContainerPersistence { return ContainerTestHelper.getTestContainerID(); } + private DispatcherContext getDispatcherContext() { + return new DispatcherContext.Builder().build(); + } + private Container addContainer(ContainerSet cSet, long cID) throws IOException { KeyValueContainerData data = new KeyValueContainerData(cID, @@ -334,7 +338,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); return info; } @@ -375,7 +379,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); fileHashMap.put(fileName, info); } @@ -406,7 +410,8 @@ public class TestContainerPersistence { for (int x = 0; x < chunkCount; x++) { String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); ChunkInfo info = fileHashMap.get(fileName); - byte[] data = chunkManager.readChunk(container, blockID, info, false); + byte[] data = chunkManager + .readChunk(container, blockID, info, getDispatcherContext()); ChecksumData checksumData = checksum.computeChecksum(data); Assert.assertEquals(info.getChecksumData(), checksumData); } @@ -433,13 +438,15 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); - byte[] readData = chunkManager.readChunk(container, blockID, info, false); + byte[] readData = chunkManager + .readChunk(container, blockID, info, getDispatcherContext()); assertTrue(Arrays.equals(data, readData)); ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length); - byte[] readData2 = chunkManager.readChunk(container, blockID, info2, false); + byte[] readData2 = chunkManager + .readChunk(container, blockID, info2, getDispatcherContext()); assertEquals(length, readData2.length); assertTrue(Arrays.equals( Arrays.copyOfRange(data, start, start + length), readData2)); @@ -466,13 +473,13 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); // With the overwrite flag it should work now. info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); long bytesUsed = container.getContainerData().getBytesUsed(); Assert.assertEquals(datalen, bytesUsed); @@ -507,14 +514,15 @@ public class TestContainerPersistence { oldSha.update(data); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); } // Request to read the whole data in a single go. ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0, datalen * chunkCount); byte[] newdata = - chunkManager.readChunk(container, blockID, largeChunk, false); + chunkManager.readChunk(container, blockID, largeChunk, + getDispatcherContext()); MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); newSha.update(newdata); Assert.assertEquals(Hex.encodeHexString(oldSha.digest()), @@ -540,11 +548,11 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); chunkManager.deleteChunk(container, blockID, info); exception.expect(StorageContainerException.class); exception.expectMessage("Unable to find the chunk file."); - chunkManager.readChunk(container, blockID, info, false); + chunkManager.readChunk(container, blockID, info, getDispatcherContext()); } /** @@ -655,7 +663,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - WriteChunkStage.COMBINED); + getDispatcherContext()); totalSize += datalen; chunkList.add(info); }