HDDS-941. Rename ChunkGroupInputStream to keyInputStream and ChunkInputStream to BlockInputStream. Contributed by Shashikant Banerjee.

This commit is contained in:
Bharat Viswanadham 2019-01-09 20:02:36 -08:00
parent c634589ab2
commit 2091d1a4af
9 changed files with 50 additions and 50 deletions

View File

@ -46,7 +46,7 @@ import java.util.List;
* instances. This class encapsulates all state management for iterating * instances. This class encapsulates all state management for iterating
* through the sequence of chunks and the sequence of buffers within each chunk. * through the sequence of chunks and the sequence of buffers within each chunk.
*/ */
public class ChunkInputStream extends InputStream implements Seekable { public class BlockInputStream extends InputStream implements Seekable {
private static final int EOF = -1; private static final int EOF = -1;
@ -61,7 +61,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
private int bufferIndex; private int bufferIndex;
/** /**
* Creates a new ChunkInputStream. * Creates a new BlockInputStream.
* *
* @param blockID block ID of the chunk * @param blockID block ID of the chunk
* @param xceiverClientManager client manager that controls client * @param xceiverClientManager client manager that controls client
@ -69,7 +69,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
* @param chunks list of chunks to read * @param chunks list of chunks to read
* @param traceID container protocol call traceID * @param traceID container protocol call traceID
*/ */
public ChunkInputStream( public BlockInputStream(
BlockID blockID, XceiverClientManager xceiverClientManager, BlockID blockID, XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) { XceiverClientSpi xceiverClient, List<ChunkInfo> chunks, String traceID) {
this.blockID = blockID; this.blockID = blockID;
@ -79,7 +79,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
this.chunks = chunks; this.chunks = chunks;
this.chunkIndex = -1; this.chunkIndex = -1;
// chunkOffset[i] stores offset at which chunk i stores data in // chunkOffset[i] stores offset at which chunk i stores data in
// ChunkInputStream // BlockInputStream
this.chunkOffset = new long[this.chunks.size()]; this.chunkOffset = new long[this.chunks.size()];
initializeChunkOffset(); initializeChunkOffset();
this.buffers = null; this.buffers = null;
@ -154,7 +154,7 @@ public class ChunkInputStream extends InputStream implements Seekable {
*/ */
private synchronized void checkOpen() throws IOException { private synchronized void checkOpen() throws IOException {
if (xceiverClient == null) { if (xceiverClient == null) {
throw new IOException("ChunkInputStream has been closed."); throw new IOException("BlockInputStream has been closed.");
} }
} }

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -44,17 +44,17 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
/** /**
* Maintaining a list of ChunkInputStream. Read based on offset. * Maintaining a list of BlockInputStream. Read based on offset.
*/ */
public class ChunkGroupInputStream extends InputStream implements Seekable { public class KeyInputStream extends InputStream implements Seekable {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ChunkGroupInputStream.class); LoggerFactory.getLogger(KeyInputStream.class);
private static final int EOF = -1; private static final int EOF = -1;
private final ArrayList<ChunkInputStreamEntry> streamEntries; private final ArrayList<ChunkInputStreamEntry> streamEntries;
// streamOffset[i] stores the offset at which chunkInputStream i stores // streamOffset[i] stores the offset at which blockInputStream i stores
// data in the key // data in the key
private long[] streamOffset = null; private long[] streamOffset = null;
private int currentStreamIndex; private int currentStreamIndex;
@ -62,7 +62,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
private boolean closed = false; private boolean closed = false;
private String key; private String key;
public ChunkGroupInputStream() { public KeyInputStream() {
streamEntries = new ArrayList<>(); streamEntries = new ArrayList<>();
currentStreamIndex = 0; currentStreamIndex = 0;
} }
@ -84,7 +84,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
* @param streamLength the max number of bytes that should be written to this * @param streamLength the max number of bytes that should be written to this
* stream. * stream.
*/ */
public synchronized void addStream(ChunkInputStream stream, public synchronized void addStream(BlockInputStream stream,
long streamLength) { long streamLength) {
streamEntries.add(new ChunkInputStreamEntry(stream, streamLength)); streamEntries.add(new ChunkInputStreamEntry(stream, streamLength));
} }
@ -129,7 +129,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
// this case. // this case.
throw new IOException(String.format( throw new IOException(String.format(
"Inconsistent read for blockID=%s length=%d numBytesRead=%d", "Inconsistent read for blockID=%s length=%d numBytesRead=%d",
current.chunkInputStream.getBlockID(), current.length, current.blockInputStream.getBlockID(), current.length,
numBytesRead)); numBytesRead));
} }
totalReadLen += numBytesRead; totalReadLen += numBytesRead;
@ -174,7 +174,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
// accordingly so that currentStreamIndex = insertionPoint - 1 // accordingly so that currentStreamIndex = insertionPoint - 1
currentStreamIndex = -currentStreamIndex - 2; currentStreamIndex = -currentStreamIndex - 2;
} }
// seek to the proper offset in the ChunkInputStream // seek to the proper offset in the BlockInputStream
streamEntries.get(currentStreamIndex) streamEntries.get(currentStreamIndex)
.seek(pos - streamOffset[currentStreamIndex]); .seek(pos - streamOffset[currentStreamIndex]);
} }
@ -207,17 +207,17 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
} }
/** /**
* Encapsulates ChunkInputStream. * Encapsulates BlockInputStream.
*/ */
public static class ChunkInputStreamEntry extends InputStream public static class ChunkInputStreamEntry extends InputStream
implements Seekable { implements Seekable {
private final ChunkInputStream chunkInputStream; private final BlockInputStream blockInputStream;
private final long length; private final long length;
public ChunkInputStreamEntry(ChunkInputStream chunkInputStream, public ChunkInputStreamEntry(BlockInputStream blockInputStream,
long length) { long length) {
this.chunkInputStream = chunkInputStream; this.blockInputStream = blockInputStream;
this.length = length; this.length = length;
} }
@ -228,29 +228,29 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
@Override @Override
public synchronized int read(byte[] b, int off, int len) public synchronized int read(byte[] b, int off, int len)
throws IOException { throws IOException {
int readLen = chunkInputStream.read(b, off, len); int readLen = blockInputStream.read(b, off, len);
return readLen; return readLen;
} }
@Override @Override
public synchronized int read() throws IOException { public synchronized int read() throws IOException {
int data = chunkInputStream.read(); int data = blockInputStream.read();
return data; return data;
} }
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
chunkInputStream.close(); blockInputStream.close();
} }
@Override @Override
public void seek(long pos) throws IOException { public void seek(long pos) throws IOException {
chunkInputStream.seek(pos); blockInputStream.seek(pos);
} }
@Override @Override
public long getPos() throws IOException { public long getPos() throws IOException {
return chunkInputStream.getPos(); return blockInputStream.getPos();
} }
@Override @Override
@ -267,7 +267,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
String requestId) throws IOException { String requestId) throws IOException {
long length = 0; long length = 0;
long containerKey; long containerKey;
ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream(); KeyInputStream groupInputStream = new KeyInputStream();
groupInputStream.key = keyInfo.getKeyName(); groupInputStream.key = keyInfo.getKeyName();
List<OmKeyLocationInfo> keyLocationInfos = List<OmKeyLocationInfo> keyLocationInfos =
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly(); keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
@ -304,7 +304,7 @@ public class ChunkGroupInputStream extends InputStream implements Seekable {
length += chunk.getLen(); length += chunk.getLen();
} }
success = true; success = true;
ChunkInputStream inputStream = new ChunkInputStream( BlockInputStream inputStream = new BlockInputStream(
omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient, omKeyLocationInfo.getBlockID(), xceiverClientManager, xceiverClient,
chunks, requestId); chunks, requestId);
groupInputStream.addStream(inputStream, groupInputStream.addStream(inputStream,

View File

@ -50,7 +50,7 @@ import java.util.ListIterator;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
/** /**
* Maintaining a list of ChunkInputStream. Write based on offset. * Maintaining a list of BlockInputStream. Write based on offset.
* *
* Note that this may write to multiple containers in one write call. In case * Note that this may write to multiple containers in one write call. In case
* that first container succeeded but later ones failed, the succeeded writes * that first container succeeded but later ones failed, the succeeded writes

View File

@ -17,21 +17,21 @@
package org.apache.hadoop.ozone.client.io; package org.apache.hadoop.ozone.client.io;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
/** /**
* OzoneInputStream is used to read data from Ozone. * OzoneInputStream is used to read data from Ozone.
* It uses SCM's {@link ChunkInputStream} for reading the data. * It uses {@link KeyInputStream} for reading the data.
*/ */
public class OzoneInputStream extends InputStream { public class OzoneInputStream extends InputStream {
private final InputStream inputStream; private final InputStream inputStream;
/** /**
* Constructs OzoneInputStream with ChunkInputStream. * Constructs OzoneInputStream with KeyInputStream.
* *
* @param inputStream * @param inputStream
*/ */

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hdds.client.OzoneQuota;
import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream; import org.apache.hadoop.ozone.client.io.OzoneInputStream;
@ -541,7 +541,7 @@ public class RpcClient implements ClientProtocol {
.build(); .build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
LengthInputStream lengthInputStream = LengthInputStream lengthInputStream =
ChunkGroupInputStream.getFromOmKeyInfo( KeyInputStream.getFromOmKeyInfo(
keyInfo, xceiverClientManager, storageContainerLocationClient, keyInfo, xceiverClientManager, storageContainerLocationClient,
requestId); requestId);
return new OzoneInputStream(lengthInputStream.getWrappedStream()); return new OzoneInputStream(lengthInputStream.getWrappedStream());

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.client.io.KeyOutputStream; import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.LengthInputStream; import org.apache.hadoop.ozone.client.io.LengthInputStream;
import org.apache.hadoop.ozone.common.Checksum; import org.apache.hadoop.ozone.common.Checksum;
@ -38,7 +39,6 @@ import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTrans
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneConsts.Versioning; import org.apache.hadoop.ozone.OzoneConsts.Versioning;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper; import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
@ -475,7 +475,7 @@ public final class DistributedStorageHandler implements StorageHandler {
.setDataSize(args.getSize()) .setDataSize(args.getSize())
.build(); .build();
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs); OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
return ChunkGroupInputStream.getFromOmKeyInfo( return KeyInputStream.getFromOmKeyInfo(
keyInfo, xceiverClientManager, storageContainerLocationClient, keyInfo, xceiverClientManager, storageContainerLocationClient,
args.getRequestID()); args.getRequestID());
} }

View File

@ -17,8 +17,8 @@
package org.apache.hadoop.ozone.om; package org.apache.hadoop.ozone.om;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream; import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -31,7 +31,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
/** /**
* This class tests ChunkGroupInputStream and KeyOutputStream. * This class tests KeyInputStream and KeyOutputStream.
*/ */
public class TestChunkStreams { public class TestChunkStreams {
@ -40,15 +40,15 @@ public class TestChunkStreams {
@Test @Test
public void testReadGroupInputStream() throws Exception { public void testReadGroupInputStream() throws Exception {
try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { try (KeyInputStream groupInputStream = new KeyInputStream()) {
String dataString = RandomStringUtils.randomAscii(500); String dataString = RandomStringUtils.randomAscii(500);
byte[] buf = dataString.getBytes(UTF_8); byte[] buf = dataString.getBytes(UTF_8);
int offset = 0; int offset = 0;
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
int tempOffset = offset; int tempOffset = offset;
ChunkInputStream in = BlockInputStream in =
new ChunkInputStream(null, null, null, new ArrayList<>(), null) { new BlockInputStream(null, null, null, new ArrayList<>(), null) {
private long pos = 0; private long pos = 0;
private ByteArrayInputStream in = private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100); new ByteArrayInputStream(buf, tempOffset, 100);
@ -96,15 +96,15 @@ public class TestChunkStreams {
@Test @Test
public void testErrorReadGroupInputStream() throws Exception { public void testErrorReadGroupInputStream() throws Exception {
try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) { try (KeyInputStream groupInputStream = new KeyInputStream()) {
String dataString = RandomStringUtils.randomAscii(500); String dataString = RandomStringUtils.randomAscii(500);
byte[] buf = dataString.getBytes(UTF_8); byte[] buf = dataString.getBytes(UTF_8);
int offset = 0; int offset = 0;
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
int tempOffset = offset; int tempOffset = offset;
ChunkInputStream in = BlockInputStream in =
new ChunkInputStream(null, null, null, new ArrayList<>(), null) { new BlockInputStream(null, null, null, new ArrayList<>(), null) {
private long pos = 0; private long pos = 0;
private ByteArrayInputStream in = private ByteArrayInputStream in =
new ByteArrayInputStream(buf, tempOffset, 100); new ByteArrayInputStream(buf, tempOffset, 100);

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.fs.ozone;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.KeyInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -36,10 +36,10 @@ import java.io.InputStream;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public final class OzoneFSInputStream extends FSInputStream { public final class OzoneFSInputStream extends FSInputStream {
private final ChunkGroupInputStream inputStream; private final KeyInputStream inputStream;
public OzoneFSInputStream(InputStream inputStream) { public OzoneFSInputStream(InputStream inputStream) {
this.inputStream = (ChunkGroupInputStream)inputStream; this.inputStream = (KeyInputStream)inputStream;
} }
@Override @Override

View File

@ -19,24 +19,24 @@
package org.apache.hadoop.ozone.s3.io; package org.apache.hadoop.ozone.s3.io;
import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.ozone.client.io.ChunkGroupInputStream; import org.apache.hadoop.ozone.client.io.KeyInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
/** /**
* S3Wrapper Input Stream which encapsulates ChunkGroupInputStream from ozone. * S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
*/ */
public class S3WrapperInputStream extends FSInputStream { public class S3WrapperInputStream extends FSInputStream {
private final ChunkGroupInputStream inputStream; private final KeyInputStream inputStream;
/** /**
* Constructs S3WrapperInputStream with ChunkInputStream. * Constructs S3WrapperInputStream with KeyInputStream.
* *
* @param inputStream * @param inputStream
*/ */
public S3WrapperInputStream(InputStream inputStream) { public S3WrapperInputStream(InputStream inputStream) {
this.inputStream = (ChunkGroupInputStream) inputStream; this.inputStream = (KeyInputStream) inputStream;
} }
@Override @Override