HDDS-771. ChunkGroupOutputStream stream entries need to be properly updated on closed container exception. Contributed by Lokesh Jain.
This commit is contained in:
parent
73e9e43483
commit
39d088bc2c
|
@ -420,6 +420,11 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update currentStreamIndex in case of closed container exception. The
|
||||||
|
// current stream entry cannot be used for further writes because
|
||||||
|
// container is closed.
|
||||||
|
currentStreamIndex += 1;
|
||||||
|
|
||||||
// In case where not a single chunk of data has been written to the Datanode
|
// In case where not a single chunk of data has been written to the Datanode
|
||||||
// yet. This block does not yet exist on the datanode but cached on the
|
// yet. This block does not yet exist on the datanode but cached on the
|
||||||
// outputStream buffer. No need to call GetCommittedBlockLength here
|
// outputStream buffer. No need to call GetCommittedBlockLength here
|
||||||
|
@ -436,7 +441,6 @@ public class ChunkGroupOutputStream extends OutputStream {
|
||||||
// allocate new block and write this data in the datanode. The cached
|
// allocate new block and write this data in the datanode. The cached
|
||||||
// data in the buffer does not exceed chunkSize.
|
// data in the buffer does not exceed chunkSize.
|
||||||
Preconditions.checkState(buffer.position() < chunkSize);
|
Preconditions.checkState(buffer.position() < chunkSize);
|
||||||
currentStreamIndex += 1;
|
|
||||||
// readjust the byteOffset value to the length actually been written.
|
// readjust the byteOffset value to the length actually been written.
|
||||||
byteOffset -= buffer.position();
|
byteOffset -= buffer.position();
|
||||||
handleWrite(buffer.array(), 0, buffer.position());
|
handleWrite(buffer.array(), 0, buffer.position());
|
||||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.client.ObjectStore;
|
import org.apache.hadoop.ozone.client.ObjectStore;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClient;
|
import org.apache.hadoop.ozone.client.OzoneClient;
|
||||||
|
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||||
|
import org.apache.hadoop.ozone.client.OzoneVolume;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||||
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
|
import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
|
||||||
|
@ -286,6 +288,64 @@ public class TestCloseContainerHandlingByClient {
|
||||||
validateData(keyName, dataString.concat(dataString2).getBytes());
|
validateData(keyName, dataString.concat(dataString2).getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiBlockWrites3() throws Exception {
|
||||||
|
|
||||||
|
String keyName = "standalone5";
|
||||||
|
int keyLen = 4 * blockSize;
|
||||||
|
OzoneOutputStream key =
|
||||||
|
createKey(keyName, ReplicationType.RATIS, keyLen);
|
||||||
|
ChunkGroupOutputStream groupOutputStream =
|
||||||
|
(ChunkGroupOutputStream) key.getOutputStream();
|
||||||
|
// With the initial size provided, it should have preallocated 4 blocks
|
||||||
|
Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
|
||||||
|
// write data 3 blocks and one more chunk
|
||||||
|
byte[] writtenData = fixedLengthString(keyString, keyLen).getBytes();
|
||||||
|
byte[] data = Arrays.copyOfRange(writtenData, 0, 3 * blockSize + chunkSize);
|
||||||
|
Assert.assertEquals(data.length, 3 * blockSize + chunkSize);
|
||||||
|
key.write(data);
|
||||||
|
|
||||||
|
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
|
||||||
|
//get the name of a valid container
|
||||||
|
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||||
|
.setBucketName(bucketName)
|
||||||
|
.setType(HddsProtos.ReplicationType.RATIS)
|
||||||
|
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
waitForContainerClose(keyName, key,
|
||||||
|
HddsProtos.ReplicationType.RATIS);
|
||||||
|
// write 3 more chunks worth of data. It will fail and new block will be
|
||||||
|
// allocated. This write completes 4 blocks worth of data written to key
|
||||||
|
data = Arrays
|
||||||
|
.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
|
||||||
|
key.write(data);
|
||||||
|
|
||||||
|
key.close();
|
||||||
|
// read the key from OM again and match the length and data.
|
||||||
|
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||||
|
List<OmKeyLocationInfo> keyLocationInfos =
|
||||||
|
keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
|
||||||
|
OzoneVolume volume = objectStore.getVolume(volumeName);
|
||||||
|
OzoneBucket bucket = volume.getBucket(bucketName);
|
||||||
|
OzoneInputStream inputStream = bucket.readKey(keyName);
|
||||||
|
byte[] readData = new byte[keyLen];
|
||||||
|
inputStream.read(readData);
|
||||||
|
Assert.assertArrayEquals(writtenData, readData);
|
||||||
|
|
||||||
|
// Though we have written only block initially, the close will hit
|
||||||
|
// closeContainerException and remaining data in the chunkOutputStream
|
||||||
|
// buffer will be copied into a different allocated block and will be
|
||||||
|
// committed.
|
||||||
|
Assert.assertEquals(5, keyLocationInfos.size());
|
||||||
|
Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
|
||||||
|
long length = 0;
|
||||||
|
for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
|
||||||
|
length += locationInfo.getLength();
|
||||||
|
}
|
||||||
|
Assert.assertEquals(4 * blockSize, length);
|
||||||
|
}
|
||||||
|
|
||||||
private void waitForContainerClose(String keyName,
|
private void waitForContainerClose(String keyName,
|
||||||
OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
|
OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue