HADOOP-14478. Optimize NativeAzureFsInputStream for positional reads. Contributed by Rajesh Balamohan
This commit is contained in:
parent
b55a34639d
commit
2777b1d456
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.fs.azure;
|
package org.apache.hadoop.fs.azure;
|
||||||
import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
|
import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -2043,11 +2042,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
|
|
||||||
// Get blob reference and open the input buffer stream.
|
// Get blob reference and open the input buffer stream.
|
||||||
CloudBlobWrapper blob = getBlobReference(key);
|
CloudBlobWrapper blob = getBlobReference(key);
|
||||||
BufferedInputStream inBufStream = new BufferedInputStream(
|
|
||||||
openInputStream(blob));
|
|
||||||
|
|
||||||
// Return a data input stream.
|
// Return a data input stream.
|
||||||
DataInputStream inDataStream = new DataInputStream(inBufStream);
|
DataInputStream inDataStream = new DataInputStream(openInputStream(blob));
|
||||||
return inDataStream;
|
return inDataStream;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Re-throw as an Azure storage exception.
|
// Re-throw as an Azure storage exception.
|
||||||
|
|
|
@ -816,6 +816,27 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void readFully(long position, byte[] buffer, int offset, int length)
|
||||||
|
throws IOException {
|
||||||
|
validatePositionedReadArgs(position, buffer, offset, length);
|
||||||
|
|
||||||
|
int nread = 0;
|
||||||
|
while (nread < length) {
|
||||||
|
// In case BlobInputStream is used, mark() can act as a hint to read ahead only this
|
||||||
|
// length instead of 4 MB boundary.
|
||||||
|
in.mark(length - nread);
|
||||||
|
int nbytes = read(position + nread,
|
||||||
|
buffer,
|
||||||
|
offset + nread,
|
||||||
|
length - nread);
|
||||||
|
if (nbytes < 0) {
|
||||||
|
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
|
||||||
|
}
|
||||||
|
nread += nbytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Reads up to len bytes of data from the input stream into an array of
|
* Reads up to len bytes of data from the input stream into an array of
|
||||||
* bytes. An attempt is made to read as many as len bytes, but a smaller
|
* bytes. An attempt is made to read as many as len bytes, but a smaller
|
||||||
|
@ -886,9 +907,13 @@ public class NativeAzureFileSystem extends FileSystem {
|
||||||
if (pos < 0) {
|
if (pos < 0) {
|
||||||
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
||||||
}
|
}
|
||||||
|
if (this.pos > pos) {
|
||||||
IOUtils.closeStream(in);
|
IOUtils.closeStream(in);
|
||||||
in = store.retrieve(key);
|
in = store.retrieve(key);
|
||||||
this.pos = in.skip(pos);
|
this.pos = in.skip(pos);
|
||||||
|
} else {
|
||||||
|
this.pos += in.skip(pos - this.pos);
|
||||||
|
}
|
||||||
LOG.debug("Seek to position {}. Bytes skipped {}", pos,
|
LOG.debug("Seek to position {}. Bytes skipped {}", pos,
|
||||||
this.pos);
|
this.pos);
|
||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
|
|
Loading…
Reference in New Issue