diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java similarity index 98% rename from hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java rename to hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index 6e2ca597465..32c6b6ae986 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -65,9 +65,9 @@ import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls * This class encapsulates all state management for buffering and writing * through to the container. */ -public class ChunkOutputStream extends OutputStream { +public class BlockOutputStream extends OutputStream { public static final Logger LOG = - LoggerFactory.getLogger(ChunkOutputStream.class); + LoggerFactory.getLogger(BlockOutputStream.class); private BlockID blockID; private final String key; @@ -108,7 +108,7 @@ public class ChunkOutputStream extends OutputStream { private int currentBufferIndex; /** - * Creates a new ChunkOutputStream. + * Creates a new BlockOutputStream. * * @param blockID block ID * @param key chunk key @@ -122,7 +122,7 @@ public class ChunkOutputStream extends OutputStream { * @param watchTimeout watch timeout * @param checksum checksum */ - public ChunkOutputStream(BlockID blockID, String key, + public BlockOutputStream(BlockID blockID, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String traceID, int chunkSize, long streamBufferFlushSize, long streamBufferMaxSize, long watchTimeout, @@ -565,7 +565,7 @@ public class ChunkOutputStream extends OutputStream { */ private void checkOpen() throws IOException { if (xceiverClient == null) { - throw new IOException("ChunkOutputStream has been closed."); + throw new IOException("BlockOutputStream has been closed."); } else if (ioException != null) { adjustBuffersOnException(); throw ioException; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java similarity index 92% rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java rename to hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 49835ccd805..5e7cb9baeb1 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.storage.BlockOutputStream; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.*; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; @@ -35,7 +36,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.hdds.scm.protocolPB .StorageContainerLocationProtocolClientSideTranslatorPB; -import org.apache.hadoop.hdds.scm.storage.ChunkOutputStream; import org.apache.ratis.protocol.RaftRetryFailureException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,13 +58,13 @@ import java.util.concurrent.TimeoutException; * * TODO : currently not support multi-thread access. */ -public class ChunkGroupOutputStream extends OutputStream { +public class KeyOutputStream extends OutputStream { public static final Logger LOG = - LoggerFactory.getLogger(ChunkGroupOutputStream.class); + LoggerFactory.getLogger(KeyOutputStream.class); // array list's get(index) is O(1) - private final ArrayList streamEntries; + private final ArrayList streamEntries; private int currentStreamIndex; private final OzoneManagerProtocolClientSideTranslatorPB omClient; private final @@ -86,7 +86,7 @@ public class ChunkGroupOutputStream extends OutputStream { * A constructor for testing purpose only. */ @VisibleForTesting - public ChunkGroupOutputStream() { + public KeyOutputStream() { streamEntries = new ArrayList<>(); omClient = null; scmClient = null; @@ -116,11 +116,11 @@ public class ChunkGroupOutputStream extends OutputStream { @VisibleForTesting public void addStream(OutputStream outputStream, long length) { streamEntries.add( - new ChunkOutputStreamEntry(outputStream, length, checksum)); + new BlockOutputStreamEntry(outputStream, length, checksum)); } @VisibleForTesting - public List getStreamEntries() { + public List getStreamEntries() { return streamEntries; } @VisibleForTesting @@ -130,7 +130,7 @@ public class ChunkGroupOutputStream extends OutputStream { public List getLocationInfoList() throws IOException { List locationInfoList = new ArrayList<>(); - for (ChunkOutputStreamEntry streamEntry : streamEntries) { + for (BlockOutputStreamEntry streamEntry : streamEntries) { OmKeyLocationInfo info = new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID) .setLength(streamEntry.currentPosition).setOffset(0) @@ -143,7 +143,7 @@ public class ChunkGroupOutputStream extends OutputStream { return locationInfoList; } - public ChunkGroupOutputStream(OpenKeySession handler, + public KeyOutputStream(OpenKeySession handler, XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, OzoneManagerProtocolClientSideTranslatorPB omClient, int chunkSize, @@ -212,7 +212,7 @@ public class ChunkGroupOutputStream extends OutputStream { .getContainerWithPipeline(subKeyInfo.getContainerID()); XceiverClientSpi xceiverClient = xceiverClientManager.acquireClient(containerWithPipeline.getPipeline()); - streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(), + streamEntries.add(new BlockOutputStreamEntry(subKeyInfo.getBlockID(), keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, chunkSize, subKeyInfo.getLength(), streamBufferFlushSize, streamBufferMaxSize, watchTimeout, bufferList, checksum)); @@ -280,7 +280,7 @@ public class ChunkGroupOutputStream extends OutputStream { // in theory, this condition should never violate due the check above // still do a sanity check. Preconditions.checkArgument(currentStreamIndex < streamEntries.size()); - ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex); + BlockOutputStreamEntry current = streamEntries.get(currentStreamIndex); // length(len) will be in int range if the call is happening through // write API of chunkOutputStream. Length can be in long range if it comes @@ -323,7 +323,7 @@ public class ChunkGroupOutputStream extends OutputStream { // currentStreamIndex < streamEntries.size() signifies that, there are still // pre allocated blocks available. if (currentStreamIndex < streamEntries.size()) { - ListIterator streamEntryIterator = + ListIterator streamEntryIterator = streamEntries.listIterator(currentStreamIndex); while (streamEntryIterator.hasNext()) { if (streamEntryIterator.next().blockID.getContainerID() @@ -342,7 +342,7 @@ public class ChunkGroupOutputStream extends OutputStream { */ private void removeEmptyBlocks() { if (currentStreamIndex < streamEntries.size()) { - ListIterator streamEntryIterator = + ListIterator streamEntryIterator = streamEntries.listIterator(currentStreamIndex); while (streamEntryIterator.hasNext()) { if (streamEntryIterator.next().currentPosition == 0) { @@ -361,7 +361,7 @@ public class ChunkGroupOutputStream extends OutputStream { * @param streamIndex Index of the entry * @throws IOException Throws IOException if Write fails */ - private void handleException(ChunkOutputStreamEntry streamEntry, + private void handleException(BlockOutputStreamEntry streamEntry, int streamIndex) throws IOException { long totalSuccessfulFlushedData = streamEntry.getTotalSuccessfulFlushedData(); @@ -428,7 +428,7 @@ public class ChunkGroupOutputStream extends OutputStream { * Contact OM to get a new block. Set the new block with the index (e.g. * first block has index = 0, second has index = 1 etc.) * - * The returned block is made to new ChunkOutputStreamEntry to write. + * The returned block is made to new BlockOutputStreamEntry to write. * * @param index the index of the block. * @throws IOException @@ -457,7 +457,7 @@ public class ChunkGroupOutputStream extends OutputStream { int size = streamEntries.size(); int streamIndex = currentStreamIndex >= size ? size - 1 : currentStreamIndex; - ChunkOutputStreamEntry entry = streamEntries.get(streamIndex); + BlockOutputStreamEntry entry = streamEntries.get(streamIndex); if (entry != null) { try { if (close) { @@ -507,7 +507,7 @@ public class ChunkGroupOutputStream extends OutputStream { omClient.commitKey(keyArgs, openID); } } else { - LOG.warn("Closing ChunkGroupOutputStream, but key args is null"); + LOG.warn("Closing KeyOutputStream, but key args is null"); } } catch (IOException ioe) { throw ioe; @@ -524,7 +524,7 @@ public class ChunkGroupOutputStream extends OutputStream { } /** - * Builder class of ChunkGroupOutputStream. + * Builder class of KeyOutputStream. */ public static class Builder { private OpenKeySession openHandler; @@ -627,15 +627,15 @@ public class ChunkGroupOutputStream extends OutputStream { return this; } - public ChunkGroupOutputStream build() throws IOException { - return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, + public KeyOutputStream build() throws IOException { + return new KeyOutputStream(openHandler, xceiverManager, scmClient, omClient, chunkSize, requestID, factor, type, streamBufferFlushSize, streamBufferMaxSize, blockSize, watchTimeout, checksum, multipartUploadID, multipartNumber, isMultipartKey); } } - private static class ChunkOutputStreamEntry extends OutputStream { + private static class BlockOutputStreamEntry extends OutputStream { private OutputStream outputStream; private BlockID blockID; private final String key; @@ -654,7 +654,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final long watchTimeout; private List bufferList; - ChunkOutputStreamEntry(BlockID blockID, String key, + BlockOutputStreamEntry(BlockID blockID, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, long length, long streamBufferFlushSize, long streamBufferMaxSize, @@ -681,7 +681,7 @@ public class ChunkGroupOutputStream extends OutputStream { * @param outputStream a existing writable output stream * @param length the length of data to write to the stream */ - ChunkOutputStreamEntry(OutputStream outputStream, long length, + BlockOutputStreamEntry(OutputStream outputStream, long length, Checksum checksum) { this.outputStream = outputStream; this.blockID = null; @@ -711,7 +711,7 @@ public class ChunkGroupOutputStream extends OutputStream { private void checkStream() { if (this.outputStream == null) { this.outputStream = - new ChunkOutputStream(blockID, key, xceiverClientManager, + new BlockOutputStream(blockID, key, xceiverClientManager, xceiverClient, requestId, chunkSize, streamBufferFlushSize, streamBufferMaxSize, watchTimeout, bufferList, checksum); } @@ -744,15 +744,15 @@ public class ChunkGroupOutputStream extends OutputStream { this.outputStream.close(); // after closing the chunkOutPutStream, blockId would have been // reconstructed with updated bcsId - if (this.outputStream instanceof ChunkOutputStream) { - this.blockID = ((ChunkOutputStream) outputStream).getBlockID(); + if (this.outputStream instanceof BlockOutputStream) { + this.blockID = ((BlockOutputStream) outputStream).getBlockID(); } } } long getTotalSuccessfulFlushedData() throws IOException { - if (this.outputStream instanceof ChunkOutputStream) { - ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; blockID = out.getBlockID(); return out.getTotalSuccessfulFlushedData(); } else if (outputStream == null) { @@ -765,8 +765,8 @@ public class ChunkGroupOutputStream extends OutputStream { } long getWrittenDataLength() throws IOException { - if (this.outputStream instanceof ChunkOutputStream) { - ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; return out.getWrittenDataLength(); } else if (outputStream == null) { // For a pre allocated block for which no write has been initiated, @@ -779,16 +779,16 @@ public class ChunkGroupOutputStream extends OutputStream { void cleanup() { checkStream(); - if (this.outputStream instanceof ChunkOutputStream) { - ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; out.cleanup(); } } void writeOnRetry(long len) throws IOException { checkStream(); - if (this.outputStream instanceof ChunkOutputStream) { - ChunkOutputStream out = (ChunkOutputStream) this.outputStream; + if (this.outputStream instanceof BlockOutputStream) { + BlockOutputStream out = (BlockOutputStream) this.outputStream; out.writeOnRetry(len); this.currentPosition += len; } else { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java index 8a896ad16ff..e4a7d6a100c 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java @@ -24,14 +24,14 @@ import java.io.OutputStream; /** * OzoneOutputStream is used to write data into Ozone. - * It uses SCM's {@link ChunkGroupOutputStream} for writing the data. + * It uses SCM's {@link KeyOutputStream} for writing the data. */ public class OzoneOutputStream extends OutputStream { private final OutputStream outputStream; /** - * Constructs OzoneOutputStream with ChunkGroupOutputStream. + * Constructs OzoneOutputStream with KeyOutputStream. * * @param outputStream */ @@ -61,8 +61,8 @@ public class OzoneOutputStream extends OutputStream { } public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() { - if (outputStream instanceof ChunkGroupOutputStream) { - return ((ChunkGroupOutputStream) outputStream).getCommitUploadPartInfo(); + if (outputStream instanceof KeyOutputStream) { + return ((KeyOutputStream) outputStream).getCommitUploadPartInfo(); } // Otherwise return null. return null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 6cf6c4f080b..6a458873d70 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; @@ -501,8 +501,8 @@ public class RpcClient implements ClientProtocol { .build(); OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - ChunkGroupOutputStream groupOutputStream = - new ChunkGroupOutputStream.Builder() + KeyOutputStream groupOutputStream = + new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) .setScmClient(storageContainerLocationClient) @@ -726,8 +726,8 @@ public class RpcClient implements ClientProtocol { .build(); OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - ChunkGroupOutputStream groupOutputStream = - new ChunkGroupOutputStream.Builder() + KeyOutputStream groupOutputStream = + new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) .setScmClient(storageContainerLocationClient) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index 6b7276ec1a0..8740eba2d36 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -36,7 +36,7 @@ import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneVolume; import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; @@ -133,7 +133,7 @@ public class TestCloseContainerHandlingByClient { .getBytes(UTF_8); key.write(data); - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) @@ -165,7 +165,7 @@ public class TestCloseContainerHandlingByClient { .getBytes(UTF_8); key.write(data); - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) .setBucketName(bucketName) @@ -188,10 +188,10 @@ public class TestCloseContainerHandlingByClient { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, (4 * blockSize)); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks - Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); + Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); // write data more than 1 chunk byte[] data = ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize)) @@ -199,7 +199,7 @@ public class TestCloseContainerHandlingByClient { Assert.assertEquals(data.length, 3 * blockSize); key.write(data); - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) @@ -234,12 +234,12 @@ public class TestCloseContainerHandlingByClient { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 4 * blockSize); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); // With the initial size provided, it should have pre allocated 4 blocks - Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); + Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); String dataString = ContainerTestHelper.getFixedLengthString(keyString, (2 * blockSize)); byte[] data = dataString.getBytes(UTF_8); @@ -278,10 +278,10 @@ public class TestCloseContainerHandlingByClient { String keyName = getKeyName(); int keyLen = 4 * blockSize; OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, keyLen); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); // With the initial size provided, it should have preallocated 4 blocks - Assert.assertEquals(4, groupOutputStream.getStreamEntries().size()); + Assert.assertEquals(4, keyOutputStream.getStreamEntries().size()); // write data 3 blocks and one more chunk byte[] writtenData = ContainerTestHelper.getFixedLengthString(keyString, keyLen) @@ -290,7 +290,7 @@ public class TestCloseContainerHandlingByClient { Assert.assertEquals(data.length, 3 * blockSize + chunkSize); key.write(data); - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); //get the name of a valid container OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName) .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) @@ -329,10 +329,10 @@ public class TestCloseContainerHandlingByClient { private void waitForContainerClose(String keyName, OzoneOutputStream outputStream) throws Exception { - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) outputStream.getOutputStream(); + KeyOutputStream keyOutputStream = + (KeyOutputStream) outputStream.getOutputStream(); List locationInfoList = - groupOutputStream.getLocationInfoList(); + keyOutputStream.getLocationInfoList(); List containerIdList = new ArrayList<>(); for (OmKeyLocationInfo info : locationInfoList) { containerIdList.add(info.getContainerID()); @@ -397,18 +397,18 @@ public class TestCloseContainerHandlingByClient { String keyName = getKeyName(); OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 2 * blockSize); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + KeyOutputStream keyOutputStream = + (KeyOutputStream) key.getOutputStream(); - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); // With the initial size provided, it should have pre allocated 4 blocks - Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); String dataString = ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); byte[] data = dataString.getBytes(UTF_8); key.write(data); List locationInfos = - new ArrayList<>(groupOutputStream.getLocationInfoList()); + new ArrayList<>(keyOutputStream.getLocationInfoList()); long containerID = locationInfos.get(0).getContainerID(); ContainerInfo container = cluster.getStorageContainerManager().getContainerManager() @@ -423,16 +423,16 @@ public class TestCloseContainerHandlingByClient { ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize)); data = dataString.getBytes(UTF_8); key.write(data); - Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); + Assert.assertEquals(2, keyOutputStream.getStreamEntries().size()); // the 1st block got written. Now all the containers are closed, so the 2nd // pre allocated block will be removed from the list and new block should // have been allocated Assert.assertTrue( - groupOutputStream.getLocationInfoList().get(0).getBlockID() + keyOutputStream.getLocationInfoList().get(0).getBlockID() .equals(locationInfos.get(0).getBlockID())); Assert.assertFalse( - groupOutputStream.getLocationInfoList().get(1).getBlockID() + keyOutputStream.getLocationInfoList().get(1).getBlockID() .equals(locationInfos.get(1).getBlockID())); key.close(); } @@ -463,7 +463,7 @@ public class TestCloseContainerHandlingByClient { .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName) .build(); - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); waitForContainerClose(keyName, key); // Again Write the Data. This will throw an exception which will be handled // and new blocks will be allocated diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java index c1827c9c176..942b847a7b0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java @@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -126,8 +126,8 @@ public class TestContainerStateMachineFailures { setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis") .build(); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + KeyOutputStream groupOutputStream = + (KeyOutputStream) key.getOutputStream(); List locationInfoList = groupOutputStream.getLocationInfoList(); Assert.assertEquals(1, locationInfoList.size()); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java index 1ba482035c7..f94257cacbb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java @@ -30,7 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; @@ -121,9 +121,9 @@ public class TestFailureHandlingByClient { key.write(data); // get the name of a valid container - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream groupOutputStream = + (KeyOutputStream) key.getOutputStream(); List locationInfoList = groupOutputStream.getLocationInfoList(); Assert.assertTrue(locationInfoList.size() == 1); @@ -160,9 +160,9 @@ public class TestFailureHandlingByClient { key.write(data.getBytes()); // get the name of a valid container - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream groupOutputStream = + (KeyOutputStream) key.getOutputStream(); List locationInfoList = groupOutputStream.getLocationInfoList(); Assert.assertTrue(locationInfoList.size() == 2); @@ -201,9 +201,9 @@ public class TestFailureHandlingByClient { key.write(data.getBytes()); // get the name of a valid container - Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) key.getOutputStream(); + Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream); + KeyOutputStream groupOutputStream = + (KeyOutputStream) key.getOutputStream(); List locationInfoList = groupOutputStream.getLocationInfoList(); Assert.assertTrue(locationInfoList.size() == 6); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java index 474b9204a46..90c3c1f8b4e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.ozone.client.VolumeArgs; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.common.OzoneChecksumException; @@ -666,8 +666,8 @@ public class TestOzoneRpcClient { OzoneOutputStream out = bucket .createKey(keyName, value.getBytes().length, ReplicationType.RATIS, ReplicationFactor.THREE); - ChunkGroupOutputStream groupOutputStream = - (ChunkGroupOutputStream) out.getOutputStream(); + KeyOutputStream groupOutputStream = + (KeyOutputStream) out.getOutputStream(); XceiverClientManager manager = groupOutputStream.getXceiverClientManager(); out.write(value.getBytes()); out.close(); diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index a587ca30bf8..a078c010660 100644 --- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.om.helpers.OmBucketArgs; @@ -38,7 +39,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts.Versioning; import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocolPB.OMPBHelper; @@ -437,8 +437,8 @@ public final class DistributedStorageHandler implements StorageHandler { .build(); // contact OM to allocate a block for key. OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs); - ChunkGroupOutputStream groupOutputStream = - new ChunkGroupOutputStream.Builder() + KeyOutputStream groupOutputStream = + new KeyOutputStream.Builder() .setHandler(openKey) .setXceiverClientManager(xceiverClientManager) .setScmClient(storageContainerLocationClient) diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java index 6b9183758d7..4ef46a362b9 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestChunkStreams.java @@ -31,7 +31,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; /** - * This class tests ChunkGroupInputStream and ChunkGroupOutStream. + * This class tests ChunkGroupInputStream and KeyOutputStream. */ public class TestChunkStreams { diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java index faa36287157..3670cffa94a 100644 --- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java +++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFSOutputStream.java @@ -20,7 +20,7 @@ package org.apache.hadoop.fs.ozone; import java.io.IOException; import java.io.OutputStream; -import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; /** @@ -31,10 +31,10 @@ import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream; */ public class OzoneFSOutputStream extends OutputStream { - private final ChunkGroupOutputStream outputStream; + private final KeyOutputStream outputStream; public OzoneFSOutputStream(OutputStream outputStream) { - this.outputStream = (ChunkGroupOutputStream)outputStream; + this.outputStream = (KeyOutputStream)outputStream; } @Override