diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index d9a9910fcfd..0772360149f 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -420,6 +420,11 @@ public class ChunkGroupOutputStream extends OutputStream { return; } + // update currentStreamIndex in case of closed container exception. The + // current stream entry cannot be used for further writes because + // container is closed. + currentStreamIndex += 1; + // In case where not a single chunk of data has been written to the Datanode // yet. This block does not yet exist on the datanode but cached on the // outputStream buffer. No need to call GetCommittedBlockLength here @@ -436,7 +441,6 @@ public class ChunkGroupOutputStream extends OutputStream { // allocate new block and write this data in the datanode. The cached // data in the buffer does not exceed chunkSize. Preconditions.checkState(buffer.position() < chunkSize); - currentStreamIndex += 1; // readjust the byteOffset value to the length actually been written. byteOffset -= buffer.position(); handleWrite(buffer.array(), 0, buffer.position()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index da8d334e5be..408ff8be9e3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -33,6 +33,8 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneBucket; +import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.OzoneClientFactory; import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; @@ -286,6 +288,64 @@ public class TestCloseContainerHandlingByClient { validateData(keyName, dataString.concat(dataString2).getBytes()); } + @Test + public void testMultiBlockWrites3() throws Exception { + + String keyName = "standalone5"; + int keyLen = 4 * blockSize; + OzoneOutputStream key = + createKey(keyName, ReplicationType.RATIS, keyLen); + ChunkGroupOutputStream groupOutputStream = + (ChunkGroupOutputStream) key.getOutputStream(); + // With the initial size provided, it should have preallocated 4 blocks + Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); + // write data 3 blocks and one more chunk + byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes(); + byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize); + Assert.assertEquals(data.length, 3 * blockSize + chunkSize); + key.write(data); + + Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + //get the name of a valid container + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) + .setBucketName(bucketName) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName) + .build(); + + waitForContainerClose(keyName, key, + HddsProtos.ReplicationType.RATIS); + // write 3 more chunks worth of data. It will fail and new block will be + // allocated. This write completes 4 blocks worth of data written to key + data = Arrays + .copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen); + key.write(data); + + key.close(); + // read the key from OM again and match the length and data. + OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs); + List keyLocationInfos = + keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly(); + OzoneVolume volume = objectStore.getVolume(volumeName); + OzoneBucket bucket = volume.getBucket(bucketName); + OzoneInputStream inputStream = bucket.readKey(keyName); + byte[] readData = new byte[keyLen]; + inputStream.read(readData); + Assert.assertArrayEquals(writtenData, readData); + + // Though we have written only block initially, the close will hit + // closeContainerException and remaining data in the chunkOutputStream + // buffer will be copied into a different allocated block and will be + // committed. + Assert.assertEquals(5, keyLocationInfos.size()); + Assert.assertEquals(4 * blockSize, keyInfo.getDataSize()); + long length = 0; + for (OmKeyLocationInfo locationInfo : keyLocationInfos) { + length += locationInfo.getLength(); + } + Assert.assertEquals(4 * blockSize, length); + } + private void waitForContainerClose(String keyName, OzoneOutputStream outputStream, HddsProtos.ReplicationType type) throws Exception {