HADOOP-14535 wasb: implement high-performance random access and seek of block blobs.
Contributed by Thomas Marquardt
This commit is contained in:
parent
12c8fdceaf
commit
d670c3a4da
|
@ -1486,6 +1486,14 @@ public class ContractTestUtils extends Assert {
|
|||
return now() - startTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Elapsed time in milliseconds; no rounding.
|
||||
* @return elapsed time
|
||||
*/
|
||||
public long elapsedTimeMs() {
|
||||
return elapsedTime() / 1000000;
|
||||
}
|
||||
|
||||
public double bandwidth(long bytes) {
|
||||
return bandwidthMBs(bytes, duration());
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.fs.azure;
|
||||
import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -121,6 +120,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size";
|
||||
private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout";
|
||||
private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size";
|
||||
@VisibleForTesting
|
||||
static final String KEY_INPUT_STREAM_VERSION = "fs.azure.input.stream.version.for.internal.use.only";
|
||||
|
||||
// Property controlling whether to allow reads on blob which are concurrently
|
||||
// appended out-of-band.
|
||||
|
@ -222,6 +223,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
|
||||
public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
|
||||
|
||||
private static final int DEFAULT_INPUT_STREAM_VERSION = 2;
|
||||
|
||||
// Retry parameter defaults.
|
||||
//
|
||||
|
||||
|
@ -280,6 +283,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
|
||||
private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
|
||||
private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
|
||||
private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION;
|
||||
|
||||
// Bandwidth throttling exponential back-off parameters
|
||||
//
|
||||
|
@ -691,6 +695,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
this.uploadBlockSizeBytes = sessionConfiguration.getInt(
|
||||
KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
|
||||
|
||||
this.inputStreamVersion = sessionConfiguration.getInt(
|
||||
KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION);
|
||||
|
||||
// The job may want to specify a timeout to use when engaging the
|
||||
// storage service. The default is currently 90 seconds. It may
|
||||
// be necessary to increase this value for long latencies in larger
|
||||
|
@ -1417,8 +1424,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
private InputStream openInputStream(CloudBlobWrapper blob)
|
||||
throws StorageException, IOException {
|
||||
if (blob instanceof CloudBlockBlobWrapper) {
|
||||
LOG.debug("Using stream seek algorithm {}", inputStreamVersion);
|
||||
switch(inputStreamVersion) {
|
||||
case 1:
|
||||
return blob.openInputStream(getDownloadOptions(),
|
||||
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
|
||||
case 2:
|
||||
return new BlockBlobInputStream((CloudBlockBlobWrapper) blob,
|
||||
getDownloadOptions(),
|
||||
getInstrumentedContext(isConcurrentOOBAppendAllowed()));
|
||||
default:
|
||||
throw new IOException("Unknown seek algorithm: " + inputStreamVersion);
|
||||
}
|
||||
} else {
|
||||
return new PageBlobInputStream(
|
||||
(CloudPageBlobWrapper) blob, getInstrumentedContext(
|
||||
|
@ -2023,32 +2040,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DataInputStream retrieve(String key) throws AzureException, IOException {
|
||||
try {
|
||||
// Check if a session exists, if not create a session with the
|
||||
// Azure storage server.
|
||||
if (null == storageInteractionLayer) {
|
||||
final String errMsg = String.format(
|
||||
"Storage session expected for URI '%s' but does not exist.",
|
||||
sessionUri);
|
||||
throw new AssertionError(errMsg);
|
||||
}
|
||||
checkContainer(ContainerAccessType.PureRead);
|
||||
|
||||
// Get blob reference and open the input buffer stream.
|
||||
CloudBlobWrapper blob = getBlobReference(key);
|
||||
|
||||
// Return a data input stream.
|
||||
DataInputStream inDataStream = new DataInputStream(openInputStream(blob));
|
||||
return inDataStream;
|
||||
} catch (Exception e) {
|
||||
// Re-throw as an Azure storage exception.
|
||||
throw new AzureException(e);
|
||||
}
|
||||
public InputStream retrieve(String key) throws AzureException, IOException {
|
||||
return retrieve(key, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataInputStream retrieve(String key, long startByteOffset)
|
||||
public InputStream retrieve(String key, long startByteOffset)
|
||||
throws AzureException, IOException {
|
||||
try {
|
||||
// Check if a session exists, if not create a session with the
|
||||
|
@ -2061,21 +2058,16 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
}
|
||||
checkContainer(ContainerAccessType.PureRead);
|
||||
|
||||
// Get blob reference and open the input buffer stream.
|
||||
CloudBlobWrapper blob = getBlobReference(key);
|
||||
|
||||
// Open input stream and seek to the start offset.
|
||||
InputStream in = blob.openInputStream(
|
||||
getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed()));
|
||||
|
||||
// Create a data input stream.
|
||||
DataInputStream inDataStream = new DataInputStream(in);
|
||||
|
||||
InputStream inputStream = openInputStream(getBlobReference(key));
|
||||
if (startByteOffset > 0) {
|
||||
// Skip bytes and ignore return value. This is okay
|
||||
// because if you try to skip too far you will be positioned
|
||||
// at the end and reads will not return data.
|
||||
inDataStream.skip(startByteOffset);
|
||||
return inDataStream;
|
||||
inputStream.skip(startByteOffset);
|
||||
}
|
||||
return inputStream;
|
||||
} catch (IOException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
// Re-throw as an Azure storage exception.
|
||||
throw new AzureException(e);
|
||||
|
|
|
@ -0,0 +1,396 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
|
||||
|
||||
/**
|
||||
* Encapsulates the BlobInputStream used by block blobs and adds support for
|
||||
* random access and seek. Random access performance is improved by several
|
||||
* orders of magnitude.
|
||||
*/
|
||||
final class BlockBlobInputStream extends InputStream implements Seekable {
|
||||
private final CloudBlockBlobWrapper blob;
|
||||
private final BlobRequestOptions options;
|
||||
private final OperationContext opContext;
|
||||
private InputStream blobInputStream = null;
|
||||
private int minimumReadSizeInBytes = 0;
|
||||
private long streamPositionAfterLastRead = -1;
|
||||
private long streamPosition = 0;
|
||||
private long streamLength = 0;
|
||||
private boolean closed = false;
|
||||
private byte[] streamBuffer;
|
||||
private int streamBufferPosition;
|
||||
private int streamBufferLength;
|
||||
|
||||
/**
|
||||
* Creates a seek-able stream for reading from block blobs.
|
||||
* @param blob a block blob reference.
|
||||
* @param options the blob request options.
|
||||
* @param opContext the blob operation context.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
BlockBlobInputStream(CloudBlockBlobWrapper blob,
|
||||
BlobRequestOptions options,
|
||||
OperationContext opContext) throws IOException {
|
||||
this.blob = blob;
|
||||
this.options = options;
|
||||
this.opContext = opContext;
|
||||
|
||||
this.minimumReadSizeInBytes = blob.getStreamMinimumReadSizeInBytes();
|
||||
|
||||
try {
|
||||
this.blobInputStream = blob.openInputStream(options, opContext);
|
||||
} catch (StorageException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
this.streamLength = blob.getProperties().getLength();
|
||||
}
|
||||
|
||||
private void checkState() throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the read position of the stream.
|
||||
* @return the zero-based byte offset of the read position.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Override
|
||||
public synchronized long getPos() throws IOException {
|
||||
checkState();
|
||||
return streamPosition;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the read position of the stream.
|
||||
* @param pos a zero-based byte offset in the stream.
|
||||
* @throws EOFException if read is out of range
|
||||
*/
|
||||
@Override
|
||||
public synchronized void seek(long pos) throws IOException {
|
||||
checkState();
|
||||
if (pos < 0) {
|
||||
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
|
||||
}
|
||||
if (pos > streamLength) {
|
||||
throw new EOFException(
|
||||
FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
|
||||
}
|
||||
if (pos == getPos()) {
|
||||
// no=op, no state change
|
||||
return;
|
||||
}
|
||||
|
||||
if (streamBuffer != null) {
|
||||
long offset = streamPosition - pos;
|
||||
if (offset > 0 && offset < streamBufferLength) {
|
||||
streamBufferPosition = streamBufferLength - (int) offset;
|
||||
} else {
|
||||
streamBufferPosition = streamBufferLength;
|
||||
}
|
||||
}
|
||||
|
||||
streamPosition = pos;
|
||||
// close BlobInputStream after seek is invoked because BlobInputStream
|
||||
// does not support seek
|
||||
closeBlobInputStream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Seeks an secondary copy of the data. This method is not supported.
|
||||
* @param targetPos a zero-based byte offset in the stream.
|
||||
* @return false
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Override
|
||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of bytes that can be read (or skipped over) without
|
||||
* performing a network operation.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Override
|
||||
public synchronized int available() throws IOException {
|
||||
checkState();
|
||||
if (blobInputStream != null) {
|
||||
return blobInputStream.available();
|
||||
} else {
|
||||
return (streamBuffer == null)
|
||||
? 0
|
||||
: streamBufferLength - streamBufferPosition;
|
||||
}
|
||||
}
|
||||
|
||||
private void closeBlobInputStream() throws IOException {
|
||||
if (blobInputStream != null) {
|
||||
try {
|
||||
blobInputStream.close();
|
||||
} finally {
|
||||
blobInputStream = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes this stream and releases any system resources associated with it.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
closed = true;
|
||||
closeBlobInputStream();
|
||||
streamBuffer = null;
|
||||
streamBufferPosition = 0;
|
||||
streamBufferLength = 0;
|
||||
}
|
||||
|
||||
private int doNetworkRead(byte[] buffer, int offset, int len)
|
||||
throws IOException {
|
||||
MemoryOutputStream outputStream;
|
||||
boolean needToCopy = false;
|
||||
|
||||
if (streamPositionAfterLastRead == streamPosition) {
|
||||
// caller is reading sequentially, so initialize the stream buffer
|
||||
if (streamBuffer == null) {
|
||||
streamBuffer = new byte[(int) Math.min(minimumReadSizeInBytes,
|
||||
streamLength)];
|
||||
}
|
||||
streamBufferPosition = 0;
|
||||
streamBufferLength = 0;
|
||||
outputStream = new MemoryOutputStream(streamBuffer, streamBufferPosition,
|
||||
streamBuffer.length);
|
||||
needToCopy = true;
|
||||
} else {
|
||||
outputStream = new MemoryOutputStream(buffer, offset, len);
|
||||
}
|
||||
|
||||
long bytesToRead = Math.min(
|
||||
minimumReadSizeInBytes,
|
||||
Math.min(
|
||||
outputStream.capacity(),
|
||||
streamLength - streamPosition));
|
||||
|
||||
try {
|
||||
blob.downloadRange(streamPosition, bytesToRead, outputStream, options,
|
||||
opContext);
|
||||
} catch (StorageException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
int bytesRead = outputStream.size();
|
||||
if (bytesRead > 0) {
|
||||
streamPosition += bytesRead;
|
||||
streamPositionAfterLastRead = streamPosition;
|
||||
int count = Math.min(bytesRead, len);
|
||||
if (needToCopy) {
|
||||
streamBufferLength = bytesRead;
|
||||
System.arraycopy(streamBuffer, streamBufferPosition, buffer, offset,
|
||||
count);
|
||||
streamBufferPosition += count;
|
||||
}
|
||||
return count;
|
||||
} else {
|
||||
// This may happen if the blob was modified after the length was obtained.
|
||||
throw new EOFException("End of stream reached unexpectedly.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads up to <code>len</code> bytes of data from the input stream into an
|
||||
* array of bytes.
|
||||
* @param b a buffer into which the data is written.
|
||||
* @param offset a start offset into {@code buffer} where the data is written.
|
||||
* @param len the maximum number of bytes to be read.
|
||||
* @return the number of bytes written into {@code buffer}, or -1.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Override
|
||||
public synchronized int read(byte[] b, int offset, int len)
|
||||
throws IOException {
|
||||
checkState();
|
||||
NativeAzureFileSystemHelper.validateReadArgs(b, offset, len);
|
||||
if (blobInputStream != null) {
|
||||
int numberOfBytesRead = blobInputStream.read(b, offset, len);
|
||||
streamPosition += numberOfBytesRead;
|
||||
return numberOfBytesRead;
|
||||
} else {
|
||||
if (offset < 0 || len < 0 || len > b.length - offset) {
|
||||
throw new IndexOutOfBoundsException("read arguments out of range");
|
||||
}
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int bytesRead = 0;
|
||||
int available = available();
|
||||
if (available > 0) {
|
||||
bytesRead = Math.min(available, len);
|
||||
System.arraycopy(streamBuffer, streamBufferPosition, b, offset,
|
||||
bytesRead);
|
||||
streamBufferPosition += bytesRead;
|
||||
}
|
||||
|
||||
if (len == bytesRead) {
|
||||
return len;
|
||||
}
|
||||
if (streamPosition >= streamLength) {
|
||||
return (bytesRead > 0) ? bytesRead : -1;
|
||||
}
|
||||
|
||||
offset += bytesRead;
|
||||
len -= bytesRead;
|
||||
|
||||
return bytesRead + doNetworkRead(b, offset, len);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the next byte of data from the stream.
|
||||
* @return the next byte of data, or -1
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
byte[] buffer = new byte[1];
|
||||
int numberOfBytesRead = read(buffer, 0, 1);
|
||||
return (numberOfBytesRead < 1) ? -1 : buffer[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Skips over and discards n bytes of data from this input stream.
|
||||
* @param n the number of bytes to be skipped.
|
||||
* @return the actual number of bytes skipped.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
@Override
|
||||
public synchronized long skip(long n) throws IOException {
|
||||
checkState();
|
||||
|
||||
if (blobInputStream != null) {
|
||||
return blobInputStream.skip(n);
|
||||
} else {
|
||||
if (n < 0 || streamPosition + n > streamLength) {
|
||||
throw new IndexOutOfBoundsException("skip range");
|
||||
}
|
||||
|
||||
if (streamBuffer != null) {
|
||||
streamBufferPosition = (n < streamBufferLength - streamBufferPosition)
|
||||
? streamBufferPosition + (int) n
|
||||
: streamBufferLength;
|
||||
}
|
||||
|
||||
streamPosition += n;
|
||||
return n;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An <code>OutputStream</code> backed by a user-supplied buffer.
|
||||
*/
|
||||
static class MemoryOutputStream extends OutputStream {
|
||||
private final byte[] buffer;
|
||||
private final int offset;
|
||||
private final int length;
|
||||
private int writePosition;
|
||||
|
||||
/**
|
||||
* Creates a <code>MemoryOutputStream</code> from a user-supplied buffer.
|
||||
* @param buffer an array of bytes.
|
||||
* @param offset a starting offset in <code>buffer</code> where the data
|
||||
* will be written.
|
||||
* @param length the maximum number of bytes to be written to the stream.
|
||||
*/
|
||||
MemoryOutputStream(byte[] buffer, int offset, int length) {
|
||||
if (buffer == null) {
|
||||
throw new NullPointerException("buffer");
|
||||
}
|
||||
if (offset < 0 || length < 0 || length > buffer.length - offset) {
|
||||
throw new IndexOutOfBoundsException("offset out of range of buffer");
|
||||
}
|
||||
this.buffer = buffer;
|
||||
this.offset = offset;
|
||||
this.length = length;
|
||||
this.writePosition = offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current size of the stream.
|
||||
*/
|
||||
public synchronized int size() {
|
||||
return writePosition - offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current capacity of the stream.
|
||||
*/
|
||||
public synchronized int capacity() {
|
||||
return length - offset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the next byte to the stream.
|
||||
* @param b the byte to be written.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public synchronized void write(int b) throws IOException {
|
||||
if (size() > length - 1) {
|
||||
throw new IOException("No space for more writes");
|
||||
}
|
||||
buffer[writePosition++] = (byte) b;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a range of bytes to the stream.
|
||||
* @param b a byte array.
|
||||
* @param off the start offset in <code>buffer</code> from which the data
|
||||
* is read.
|
||||
* @param length the number of bytes to be written.
|
||||
* @throws IOException IO failure
|
||||
*/
|
||||
public synchronized void write(byte[] b, int off, int length)
|
||||
throws IOException {
|
||||
if (b == null) {
|
||||
throw new NullPointerException("Null buffer argument");
|
||||
}
|
||||
if (off < 0 || length < 0 || length > b.length - off) {
|
||||
throw new IndexOutOfBoundsException("array write offset");
|
||||
}
|
||||
System.arraycopy(b, off, buffer, writePosition, length);
|
||||
writePosition += length;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
|
@ -60,6 +59,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.Seekable;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
|
||||
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
|
||||
import org.apache.hadoop.fs.azure.security.Constants;
|
||||
|
@ -743,7 +743,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
// File length, valid only for streams over block blobs.
|
||||
private long fileLength;
|
||||
|
||||
public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) {
|
||||
NativeAzureFsInputStream(InputStream in, String key, long fileLength) {
|
||||
this.in = in;
|
||||
this.key = key;
|
||||
this.isPageBlob = store.isPageBlobKey(key);
|
||||
|
@ -817,27 +817,6 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void readFully(long position, byte[] buffer, int offset, int length)
|
||||
throws IOException {
|
||||
validatePositionedReadArgs(position, buffer, offset, length);
|
||||
|
||||
int nread = 0;
|
||||
while (nread < length) {
|
||||
// In case BlobInputStream is used, mark() can act as a hint to read ahead only this
|
||||
// length instead of 4 MB boundary.
|
||||
in.mark(length - nread);
|
||||
int nbytes = read(position + nread,
|
||||
buffer,
|
||||
offset + nread,
|
||||
length - nread);
|
||||
if (nbytes < 0) {
|
||||
throw new EOFException(FSExceptionMessages.EOF_IN_READ_FULLY);
|
||||
}
|
||||
nread += nbytes;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Reads up to len bytes of data from the input stream into an array of
|
||||
* bytes. An attempt is made to read as many as len bytes, but a smaller
|
||||
|
@ -909,9 +888,14 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
|
||||
}
|
||||
if (this.pos > pos) {
|
||||
if (in instanceof Seekable) {
|
||||
((Seekable) in).seek(pos);
|
||||
this.pos = pos;
|
||||
} else {
|
||||
IOUtils.closeStream(in);
|
||||
in = store.retrieve(key);
|
||||
this.pos = in.skip(pos);
|
||||
}
|
||||
} else {
|
||||
this.pos += in.skip(pos - this.pos);
|
||||
}
|
||||
|
@ -2538,7 +2522,7 @@ public class NativeAzureFileSystem extends FileSystem {
|
|||
+ " is a directory not a file.");
|
||||
}
|
||||
|
||||
DataInputStream inputStream = null;
|
||||
InputStream inputStream;
|
||||
try {
|
||||
inputStream = store.retrieve(key);
|
||||
} catch(Exception ex) {
|
||||
|
|
|
@ -18,9 +18,11 @@
|
|||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -29,6 +31,8 @@ import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
|||
import com.microsoft.azure.storage.StorageException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
|
||||
/**
|
||||
* Utility class that has helper methods.
|
||||
*
|
||||
|
@ -104,4 +108,28 @@ final class NativeAzureFileSystemHelper {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validation code, based on
|
||||
* {@code FSInputStream.validatePositionedReadArgs()}.
|
||||
* @param buffer destination buffer
|
||||
* @param offset offset within the buffer
|
||||
* @param length length of bytes to read
|
||||
* @throws EOFException if the position is negative
|
||||
* @throws IndexOutOfBoundsException if there isn't space for the amount of
|
||||
* data requested.
|
||||
* @throws IllegalArgumentException other arguments are invalid.
|
||||
*/
|
||||
static void validateReadArgs(byte[] buffer, int offset, int length)
|
||||
throws EOFException {
|
||||
Preconditions.checkArgument(length >= 0, "length is negative");
|
||||
Preconditions.checkArgument(buffer != null, "Null buffer");
|
||||
if (buffer.length - offset < length) {
|
||||
throw new IndexOutOfBoundsException(
|
||||
FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
|
||||
+ ": request length=" + length
|
||||
+ ", with offset =" + offset
|
||||
+ "; buffer capacity =" + (buffer.length - offset));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.Date;
|
||||
|
||||
|
@ -46,9 +46,9 @@ interface NativeFileSystemStore {
|
|||
|
||||
FileMetadata retrieveMetadata(String key) throws IOException;
|
||||
|
||||
DataInputStream retrieve(String key) throws IOException;
|
||||
InputStream retrieve(String key) throws IOException;
|
||||
|
||||
DataInputStream retrieve(String key, long byteRangeStart) throws IOException;
|
||||
InputStream retrieve(String key, long byteRangeStart) throws IOException;
|
||||
|
||||
DataOutputStream storefile(String key, PermissionStatus permissionStatus)
|
||||
throws AzureException;
|
||||
|
|
|
@ -465,6 +465,11 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStreamMinimumReadSizeInBytes() {
|
||||
return getBlob().getStreamMinimumReadSizeInBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
|
||||
getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
|
||||
|
|
|
@ -583,6 +583,13 @@ abstract class StorageInterface {
|
|||
|
||||
SelfRenewingLease acquireLease() throws StorageException;
|
||||
|
||||
/**
|
||||
* Gets the minimum read block size to use with this Blob.
|
||||
*
|
||||
* @return The minimum block size, in bytes, for reading from a block blob.
|
||||
*/
|
||||
int getStreamMinimumReadSizeInBytes();
|
||||
|
||||
/**
|
||||
* Sets the minimum read block size to use with this Blob.
|
||||
*
|
||||
|
|
|
@ -398,6 +398,11 @@ class StorageInterfaceImpl extends StorageInterface {
|
|||
getBlob().uploadProperties(getLeaseCondition(lease), null, opContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStreamMinimumReadSizeInBytes() {
|
||||
return getBlob().getStreamMinimumReadSizeInBytes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
|
||||
getBlob().setStreamMinimumReadSizeInBytes(minimumReadSizeBytes);
|
||||
|
|
|
@ -82,13 +82,22 @@ public final class AzureBlobStorageTestAccount {
|
|||
private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
|
||||
new ConcurrentLinkedQueue<MetricsRecord>();
|
||||
private static boolean metricsConfigSaved = false;
|
||||
private boolean skipContainerDelete = false;
|
||||
|
||||
private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
|
||||
CloudStorageAccount account,
|
||||
CloudBlobContainer container) {
|
||||
this(fs, account, container, false);
|
||||
}
|
||||
|
||||
private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
|
||||
CloudStorageAccount account,
|
||||
CloudBlobContainer container,
|
||||
boolean skipContainerDelete) {
|
||||
this.account = account;
|
||||
this.container = container;
|
||||
this.fs = fs;
|
||||
this.skipContainerDelete = skipContainerDelete;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -524,8 +533,19 @@ public final class AzureBlobStorageTestAccount {
|
|||
return create(containerNameSuffix, createOptions, null);
|
||||
}
|
||||
|
||||
public static AzureBlobStorageTestAccount create(String containerNameSuffix,
|
||||
EnumSet<CreateOptions> createOptions, Configuration initialConfiguration)
|
||||
public static AzureBlobStorageTestAccount create(
|
||||
String containerNameSuffix,
|
||||
EnumSet<CreateOptions> createOptions,
|
||||
Configuration initialConfiguration)
|
||||
throws Exception {
|
||||
return create(containerNameSuffix, createOptions, initialConfiguration, false);
|
||||
}
|
||||
|
||||
public static AzureBlobStorageTestAccount create(
|
||||
String containerNameSuffix,
|
||||
EnumSet<CreateOptions> createOptions,
|
||||
Configuration initialConfiguration,
|
||||
boolean useContainerSuffixAsContainerName)
|
||||
throws Exception {
|
||||
saveMetricsConfigFile();
|
||||
NativeAzureFileSystem fs = null;
|
||||
|
@ -538,12 +558,17 @@ public final class AzureBlobStorageTestAccount {
|
|||
return null;
|
||||
}
|
||||
fs = new NativeAzureFileSystem();
|
||||
String containerName = String.format("wasbtests-%s-%tQ%s",
|
||||
System.getProperty("user.name"), new Date(), containerNameSuffix);
|
||||
String containerName = useContainerSuffixAsContainerName
|
||||
? containerNameSuffix
|
||||
: String.format(
|
||||
"wasbtests-%s-%tQ%s",
|
||||
System.getProperty("user.name"),
|
||||
new Date(),
|
||||
containerNameSuffix);
|
||||
container = account.createCloudBlobClient().getContainerReference(
|
||||
containerName);
|
||||
if (createOptions.contains(CreateOptions.CreateContainer)) {
|
||||
container.create();
|
||||
container.createIfNotExists();
|
||||
}
|
||||
String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
|
||||
if (createOptions.contains(CreateOptions.UseSas)) {
|
||||
|
@ -578,7 +603,8 @@ public final class AzureBlobStorageTestAccount {
|
|||
// Create test account initializing the appropriate member variables.
|
||||
//
|
||||
AzureBlobStorageTestAccount testAcct =
|
||||
new AzureBlobStorageTestAccount(fs, account, container);
|
||||
new AzureBlobStorageTestAccount(fs, account, container,
|
||||
useContainerSuffixAsContainerName);
|
||||
|
||||
return testAcct;
|
||||
}
|
||||
|
@ -824,7 +850,7 @@ public final class AzureBlobStorageTestAccount {
|
|||
fs.close();
|
||||
fs = null;
|
||||
}
|
||||
if (container != null) {
|
||||
if (!skipContainerDelete && container != null) {
|
||||
container.deleteIfExists();
|
||||
container = null;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -474,12 +475,30 @@ public class MockStorageInterface extends StorageInterface {
|
|||
public void downloadRange(long offset, long length, OutputStream os,
|
||||
BlobRequestOptions options, OperationContext opContext)
|
||||
throws StorageException {
|
||||
throw new NotImplementedException();
|
||||
if (offset < 0 || length <= 0) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
}
|
||||
if (!backingStore.exists(convertUriToDecodedString(uri))) {
|
||||
throw new StorageException("BlobNotFound",
|
||||
"Resource does not exist.",
|
||||
HttpURLConnection.HTTP_NOT_FOUND,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
byte[] content = backingStore.getContent(convertUriToDecodedString(uri));
|
||||
try {
|
||||
os.write(content, (int) offset, (int) length);
|
||||
} catch (IOException e) {
|
||||
throw new StorageException("Unknown error", "Unexpected error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MockCloudBlockBlobWrapper extends MockCloudBlobWrapper
|
||||
implements CloudBlockBlobWrapper {
|
||||
|
||||
int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE;
|
||||
|
||||
public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata,
|
||||
int length) {
|
||||
super(uri, metadata, length);
|
||||
|
@ -492,8 +511,14 @@ public class MockStorageInterface extends StorageInterface {
|
|||
metadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStreamMinimumReadSizeInBytes() {
|
||||
return this.minimumReadSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) {
|
||||
this.minimumReadSize = minimumReadSizeBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -546,6 +571,9 @@ public class MockStorageInterface extends StorageInterface {
|
|||
|
||||
class MockCloudPageBlobWrapper extends MockCloudBlobWrapper
|
||||
implements CloudPageBlobWrapper {
|
||||
|
||||
int minimumReadSize = AzureNativeFileSystemStore.DEFAULT_DOWNLOAD_BLOCK_SIZE;
|
||||
|
||||
public MockCloudPageBlobWrapper(URI uri, HashMap<String, String> metadata,
|
||||
int length) {
|
||||
super(uri, metadata, length);
|
||||
|
@ -570,8 +598,14 @@ public class MockStorageInterface extends StorageInterface {
|
|||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getStreamMinimumReadSizeInBytes() {
|
||||
return this.minimumReadSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStreamMinimumReadSizeInBytes(int minimumReadSize) {
|
||||
this.minimumReadSize = minimumReadSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -155,7 +155,7 @@ public class TestAzureConcurrentOutOfBandIo {
|
|||
"WASB_String.txt");
|
||||
writeBlockTask.startWriting();
|
||||
int count = 0;
|
||||
DataInputStream inputStream = null;
|
||||
InputStream inputStream = null;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,756 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azure;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.junit.FixMethodOrder;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.junit.runners.MethodSorters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assume.*;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.*;
|
||||
|
||||
/**
|
||||
* Test semantics and performance of the original block blob input stream
|
||||
* (KEY_INPUT_STREAM_VERSION=1) and the new
|
||||
* <code>BlockBlobInputStream</code> (KEY_INPUT_STREAM_VERSION=2).
|
||||
*/
|
||||
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||
|
||||
public class TestBlockBlobInputStream extends AbstractWasbTestBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestBlockBlobInputStream.class);
|
||||
private static final int KILOBYTE = 1024;
|
||||
private static final int MEGABYTE = KILOBYTE * KILOBYTE;
|
||||
private static final int TEST_FILE_SIZE = 6 * MEGABYTE;
|
||||
private static final Path TEST_FILE_PATH = new Path(
|
||||
"TestBlockBlobInputStream.txt");
|
||||
|
||||
private AzureBlobStorageTestAccount accountUsingInputStreamV1;
|
||||
private AzureBlobStorageTestAccount accountUsingInputStreamV2;
|
||||
private long testFileLength;
|
||||
|
||||
/**
|
||||
* Long test timeout.
|
||||
*/
|
||||
@Rule
|
||||
public Timeout testTimeout = new Timeout(10 * 60 * 1000);
|
||||
private FileStatus testFileStatus;
|
||||
private Path hugefile;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
|
||||
|
||||
accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
|
||||
"testblockblobinputstream",
|
||||
EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
|
||||
conf,
|
||||
true);
|
||||
|
||||
accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
|
||||
"testblockblobinputstream",
|
||||
EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
|
||||
null,
|
||||
true);
|
||||
|
||||
assumeNotNull(accountUsingInputStreamV1);
|
||||
assumeNotNull(accountUsingInputStreamV2);
|
||||
hugefile = fs.makeQualified(TEST_FILE_PATH);
|
||||
try {
|
||||
testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
|
||||
testFileLength = testFileStatus.getLen();
|
||||
} catch (FileNotFoundException e) {
|
||||
// file doesn't exist
|
||||
testFileLength = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(AzureNativeFileSystemStore.KEY_INPUT_STREAM_VERSION, 1);
|
||||
|
||||
accountUsingInputStreamV1 = AzureBlobStorageTestAccount.create(
|
||||
"testblockblobinputstream",
|
||||
EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
|
||||
conf,
|
||||
true);
|
||||
|
||||
accountUsingInputStreamV2 = AzureBlobStorageTestAccount.create(
|
||||
"testblockblobinputstream",
|
||||
EnumSet.noneOf(AzureBlobStorageTestAccount.CreateOptions.class),
|
||||
null,
|
||||
true);
|
||||
|
||||
assumeNotNull(accountUsingInputStreamV1);
|
||||
assumeNotNull(accountUsingInputStreamV2);
|
||||
return accountUsingInputStreamV1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a test file by repeating the characters in the alphabet.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void createTestFileAndSetLength() throws IOException {
|
||||
FileSystem fs = accountUsingInputStreamV1.getFileSystem();
|
||||
|
||||
// To reduce test run time, the test file can be reused.
|
||||
if (fs.exists(TEST_FILE_PATH)) {
|
||||
testFileStatus = fs.getFileStatus(TEST_FILE_PATH);
|
||||
testFileLength = testFileStatus.getLen();
|
||||
LOG.info("Reusing test file: {}", testFileStatus);
|
||||
return;
|
||||
}
|
||||
|
||||
int sizeOfAlphabet = ('z' - 'a' + 1);
|
||||
byte[] buffer = new byte[26 * KILOBYTE];
|
||||
char character = 'a';
|
||||
for (int i = 0; i < buffer.length; i++) {
|
||||
buffer[i] = (byte) character;
|
||||
character = (character == 'z') ? 'a' : (char) ((int) character + 1);
|
||||
}
|
||||
|
||||
LOG.info("Creating test file {} of size: {}", TEST_FILE_PATH,
|
||||
TEST_FILE_SIZE );
|
||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||
|
||||
try(FSDataOutputStream outputStream = fs.create(TEST_FILE_PATH)) {
|
||||
int bytesWritten = 0;
|
||||
while (bytesWritten < TEST_FILE_SIZE) {
|
||||
outputStream.write(buffer);
|
||||
bytesWritten += buffer.length;
|
||||
}
|
||||
LOG.info("Closing stream {}", outputStream);
|
||||
ContractTestUtils.NanoTimer closeTimer
|
||||
= new ContractTestUtils.NanoTimer();
|
||||
outputStream.close();
|
||||
closeTimer.end("time to close() output stream");
|
||||
}
|
||||
timer.end("time to write %d KB", TEST_FILE_SIZE / 1024);
|
||||
testFileLength = fs.getFileStatus(TEST_FILE_PATH).getLen();
|
||||
}
|
||||
|
||||
void assumeHugeFileExists() throws IOException {
|
||||
ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
|
||||
FileStatus status = fs.getFileStatus(hugefile);
|
||||
ContractTestUtils.assertIsFile(hugefile, status);
|
||||
assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate megabits per second from the specified values for bytes and
|
||||
* milliseconds.
|
||||
* @param bytes The number of bytes.
|
||||
* @param milliseconds The number of milliseconds.
|
||||
* @return The number of megabits per second.
|
||||
*/
|
||||
private static double toMbps(long bytes, long milliseconds) {
|
||||
return bytes / 1000.0 * 8 / milliseconds;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_0100_CreateHugeFile() throws IOException {
|
||||
createTestFileAndSetLength();
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.markSupported.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0301_MarkSupportedV1() throws IOException {
|
||||
validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.markSupported.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0302_MarkSupportedV2() throws IOException {
|
||||
validateMarkSupported(accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
private void validateMarkSupported(FileSystem fs) throws IOException {
|
||||
assumeHugeFileExists();
|
||||
try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
||||
assertTrue("mark is not supported", inputStream.markSupported());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.mark and reset
|
||||
* for version 1 of the block blob input stream.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0303_MarkAndResetV1() throws Exception {
|
||||
validateMarkAndReset(accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.mark and reset
|
||||
* for version 2 of the block blob input stream.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0304_MarkAndResetV2() throws Exception {
|
||||
validateMarkAndReset(accountUsingInputStreamV2.getFileSystem());
|
||||
}
|
||||
|
||||
private void validateMarkAndReset(FileSystem fs) throws Exception {
|
||||
assumeHugeFileExists();
|
||||
try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
||||
inputStream.mark(KILOBYTE - 1);
|
||||
|
||||
byte[] buffer = new byte[KILOBYTE];
|
||||
int bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
|
||||
inputStream.reset();
|
||||
assertEquals("rest -> pos 0", 0, inputStream.getPos());
|
||||
|
||||
inputStream.mark(8 * KILOBYTE - 1);
|
||||
|
||||
buffer = new byte[8 * KILOBYTE];
|
||||
bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
|
||||
intercept(IOException.class,
|
||||
"Resetting to invalid mark",
|
||||
new Callable<FSDataInputStream>() {
|
||||
@Override
|
||||
public FSDataInputStream call() throws Exception {
|
||||
inputStream.reset();
|
||||
return inputStream;
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of Seekable.seekToNewSource, which should
|
||||
* return false for version 1 of the block blob input stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0305_SeekToNewSourceV1() throws IOException {
|
||||
validateSeekToNewSource(accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of Seekable.seekToNewSource, which should
|
||||
* return false for version 2 of the block blob input stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0306_SeekToNewSourceV2() throws IOException {
|
||||
validateSeekToNewSource(accountUsingInputStreamV2.getFileSystem());
|
||||
}
|
||||
|
||||
private void validateSeekToNewSource(FileSystem fs) throws IOException {
|
||||
assumeHugeFileExists();
|
||||
try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
||||
assertFalse(inputStream.seekToNewSource(0));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.skip and ensures there is no
|
||||
* network I/O for version 1 of the block blob input stream.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0307_SkipBoundsV1() throws Exception {
|
||||
validateSkipBounds(accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.skip and ensures there is no
|
||||
* network I/O for version 2 of the block blob input stream.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0308_SkipBoundsV2() throws Exception {
|
||||
validateSkipBounds(accountUsingInputStreamV2.getFileSystem());
|
||||
}
|
||||
|
||||
private void validateSkipBounds(FileSystem fs) throws Exception {
|
||||
assumeHugeFileExists();
|
||||
try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
||||
NanoTimer timer = new NanoTimer();
|
||||
|
||||
long skipped = inputStream.skip(-1);
|
||||
assertEquals(0, skipped);
|
||||
|
||||
skipped = inputStream.skip(0);
|
||||
assertEquals(0, skipped);
|
||||
|
||||
assertTrue(testFileLength > 0);
|
||||
|
||||
skipped = inputStream.skip(testFileLength);
|
||||
assertEquals(testFileLength, skipped);
|
||||
|
||||
intercept(EOFException.class,
|
||||
new Callable<Long>() {
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
return inputStream.skip(1);
|
||||
}
|
||||
}
|
||||
);
|
||||
long elapsedTimeMs = timer.elapsedTimeMs();
|
||||
assertTrue(
|
||||
String.format(
|
||||
"There should not be any network I/O (elapsedTimeMs=%1$d).",
|
||||
elapsedTimeMs),
|
||||
elapsedTimeMs < 20);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of Seekable.seek and ensures there is no
|
||||
* network I/O for forward seek.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0309_SeekBoundsV1() throws Exception {
|
||||
validateSeekBounds(accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of Seekable.seek and ensures there is no
|
||||
* network I/O for forward seek.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0310_SeekBoundsV2() throws Exception {
|
||||
validateSeekBounds(accountUsingInputStreamV2.getFileSystem());
|
||||
}
|
||||
|
||||
private void validateSeekBounds(FileSystem fs) throws Exception {
|
||||
assumeHugeFileExists();
|
||||
try (
|
||||
FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
|
||||
) {
|
||||
NanoTimer timer = new NanoTimer();
|
||||
|
||||
inputStream.seek(0);
|
||||
assertEquals(0, inputStream.getPos());
|
||||
|
||||
intercept(EOFException.class,
|
||||
FSExceptionMessages.NEGATIVE_SEEK,
|
||||
new Callable<FSDataInputStream>() {
|
||||
@Override
|
||||
public FSDataInputStream call() throws Exception {
|
||||
inputStream.seek(-1);
|
||||
return inputStream;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
assertTrue("Test file length only " + testFileLength, testFileLength > 0);
|
||||
inputStream.seek(testFileLength);
|
||||
assertEquals(testFileLength, inputStream.getPos());
|
||||
|
||||
intercept(EOFException.class,
|
||||
FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
|
||||
new Callable<FSDataInputStream>() {
|
||||
@Override
|
||||
public FSDataInputStream call() throws Exception {
|
||||
inputStream.seek(testFileLength + 1);
|
||||
return inputStream;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
long elapsedTimeMs = timer.elapsedTimeMs();
|
||||
assertTrue(
|
||||
String.format(
|
||||
"There should not be any network I/O (elapsedTimeMs=%1$d).",
|
||||
elapsedTimeMs),
|
||||
elapsedTimeMs < 20);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of Seekable.seek, Seekable.getPos,
|
||||
* and InputStream.available.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0311_SeekAndAvailableAndPositionV1() throws Exception {
|
||||
validateSeekAndAvailableAndPosition(
|
||||
accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of Seekable.seek, Seekable.getPos,
|
||||
* and InputStream.available.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void test_0312_SeekAndAvailableAndPositionV2() throws Exception {
|
||||
validateSeekAndAvailableAndPosition(
|
||||
accountUsingInputStreamV2.getFileSystem());
|
||||
}
|
||||
|
||||
private void validateSeekAndAvailableAndPosition(FileSystem fs)
|
||||
throws Exception {
|
||||
assumeHugeFileExists();
|
||||
try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
||||
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
|
||||
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
|
||||
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
|
||||
byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
|
||||
byte[] buffer = new byte[3];
|
||||
|
||||
int bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected1, buffer);
|
||||
assertEquals(buffer.length, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
|
||||
bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected2, buffer);
|
||||
assertEquals(2 * buffer.length, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
|
||||
// reverse seek
|
||||
int seekPos = 0;
|
||||
inputStream.seek(seekPos);
|
||||
|
||||
bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected1, buffer);
|
||||
assertEquals(buffer.length + seekPos, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
|
||||
// reverse seek
|
||||
seekPos = 1;
|
||||
inputStream.seek(seekPos);
|
||||
|
||||
bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected3, buffer);
|
||||
assertEquals(buffer.length + seekPos, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
|
||||
// forward seek
|
||||
seekPos = 6;
|
||||
inputStream.seek(seekPos);
|
||||
|
||||
bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected4, buffer);
|
||||
assertEquals(buffer.length + seekPos, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.skip, Seekable.getPos,
|
||||
* and InputStream.available.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0313_SkipAndAvailableAndPositionV1() throws IOException {
|
||||
validateSkipAndAvailableAndPosition(
|
||||
accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the implementation of InputStream.skip, Seekable.getPos,
|
||||
* and InputStream.available.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0314_SkipAndAvailableAndPositionV2() throws IOException {
|
||||
validateSkipAndAvailableAndPosition(
|
||||
accountUsingInputStreamV1.getFileSystem());
|
||||
}
|
||||
|
||||
private void validateSkipAndAvailableAndPosition(FileSystem fs)
|
||||
throws IOException {
|
||||
assumeHugeFileExists();
|
||||
try (
|
||||
FSDataInputStream inputStream = fs.open(TEST_FILE_PATH);
|
||||
) {
|
||||
byte[] expected1 = {(byte) 'a', (byte) 'b', (byte) 'c'};
|
||||
byte[] expected2 = {(byte) 'd', (byte) 'e', (byte) 'f'};
|
||||
byte[] expected3 = {(byte) 'b', (byte) 'c', (byte) 'd'};
|
||||
byte[] expected4 = {(byte) 'g', (byte) 'h', (byte) 'i'};
|
||||
|
||||
assertEquals(testFileLength, inputStream.available());
|
||||
assertEquals(0, inputStream.getPos());
|
||||
|
||||
int n = 3;
|
||||
long skipped = inputStream.skip(n);
|
||||
|
||||
assertEquals(skipped, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
assertEquals(skipped, n);
|
||||
|
||||
byte[] buffer = new byte[3];
|
||||
int bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected2, buffer);
|
||||
assertEquals(buffer.length + skipped, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
|
||||
// does skip still work after seek?
|
||||
int seekPos = 1;
|
||||
inputStream.seek(seekPos);
|
||||
|
||||
bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected3, buffer);
|
||||
assertEquals(buffer.length + seekPos, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
|
||||
long currentPosition = inputStream.getPos();
|
||||
n = 2;
|
||||
skipped = inputStream.skip(n);
|
||||
|
||||
assertEquals(currentPosition + skipped, inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
assertEquals(skipped, n);
|
||||
|
||||
bytesRead = inputStream.read(buffer);
|
||||
assertEquals(buffer.length, bytesRead);
|
||||
assertArrayEquals(expected4, buffer);
|
||||
assertEquals(buffer.length + skipped + currentPosition,
|
||||
inputStream.getPos());
|
||||
assertEquals(testFileLength - inputStream.getPos(),
|
||||
inputStream.available());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures parity in the performance of sequential read for
|
||||
* version 1 and version 2 of the block blob input stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0315_SequentialReadPerformance() throws IOException {
|
||||
assumeHugeFileExists();
|
||||
final int maxAttempts = 10;
|
||||
final double maxAcceptableRatio = 1.01;
|
||||
double v1ElapsedMs = 0, v2ElapsedMs = 0;
|
||||
double ratio = Double.MAX_VALUE;
|
||||
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
|
||||
v1ElapsedMs = sequentialRead(1,
|
||||
accountUsingInputStreamV1.getFileSystem(), false);
|
||||
v2ElapsedMs = sequentialRead(2,
|
||||
accountUsingInputStreamV2.getFileSystem(), false);
|
||||
ratio = v2ElapsedMs / v1ElapsedMs;
|
||||
LOG.info(String.format(
|
||||
"v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
|
||||
(long) v1ElapsedMs,
|
||||
(long) v2ElapsedMs,
|
||||
ratio));
|
||||
}
|
||||
assertTrue(String.format(
|
||||
"Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
|
||||
+ " v2ElapsedMs=%2$d, ratio=%3$.2f",
|
||||
(long) v1ElapsedMs,
|
||||
(long) v2ElapsedMs,
|
||||
ratio),
|
||||
ratio < maxAcceptableRatio);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures parity in the performance of sequential read after reverse seek for
|
||||
* version 2 of the block blob input stream.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void test_0316_SequentialReadAfterReverseSeekPerformanceV2()
|
||||
throws IOException {
|
||||
assumeHugeFileExists();
|
||||
final int maxAttempts = 10;
|
||||
final double maxAcceptableRatio = 1.01;
|
||||
double beforeSeekElapsedMs = 0, afterSeekElapsedMs = 0;
|
||||
double ratio = Double.MAX_VALUE;
|
||||
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
|
||||
beforeSeekElapsedMs = sequentialRead(2,
|
||||
accountUsingInputStreamV2.getFileSystem(), false);
|
||||
afterSeekElapsedMs = sequentialRead(2,
|
||||
accountUsingInputStreamV2.getFileSystem(), true);
|
||||
ratio = afterSeekElapsedMs / beforeSeekElapsedMs;
|
||||
LOG.info(String.format(
|
||||
"beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d, ratio=%3$.2f",
|
||||
(long) beforeSeekElapsedMs,
|
||||
(long) afterSeekElapsedMs,
|
||||
ratio));
|
||||
}
|
||||
assertTrue(String.format(
|
||||
"Performance of version 2 after reverse seek is not acceptable:"
|
||||
+ " beforeSeekElapsedMs=%1$d, afterSeekElapsedMs=%2$d,"
|
||||
+ " ratio=%3$.2f",
|
||||
(long) beforeSeekElapsedMs,
|
||||
(long) afterSeekElapsedMs,
|
||||
ratio),
|
||||
ratio < maxAcceptableRatio);
|
||||
}
|
||||
|
||||
private long sequentialRead(int version,
|
||||
FileSystem fs,
|
||||
boolean afterReverseSeek) throws IOException {
|
||||
byte[] buffer = new byte[16 * KILOBYTE];
|
||||
long totalBytesRead = 0;
|
||||
long bytesRead = 0;
|
||||
|
||||
try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
||||
if (afterReverseSeek) {
|
||||
while (bytesRead > 0 && totalBytesRead < 4 * MEGABYTE) {
|
||||
bytesRead = inputStream.read(buffer);
|
||||
totalBytesRead += bytesRead;
|
||||
}
|
||||
totalBytesRead = 0;
|
||||
inputStream.seek(0);
|
||||
}
|
||||
|
||||
NanoTimer timer = new NanoTimer();
|
||||
while ((bytesRead = inputStream.read(buffer)) > 0) {
|
||||
totalBytesRead += bytesRead;
|
||||
}
|
||||
long elapsedTimeMs = timer.elapsedTimeMs();
|
||||
|
||||
LOG.info(String.format(
|
||||
"v%1$d: bytesRead=%2$d, elapsedMs=%3$d, Mbps=%4$.2f,"
|
||||
+ " afterReverseSeek=%5$s",
|
||||
version,
|
||||
totalBytesRead,
|
||||
elapsedTimeMs,
|
||||
toMbps(totalBytesRead, elapsedTimeMs),
|
||||
afterReverseSeek));
|
||||
|
||||
assertEquals(testFileLength, totalBytesRead);
|
||||
inputStream.close();
|
||||
return elapsedTimeMs;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_0317_RandomReadPerformance() throws IOException {
|
||||
assumeHugeFileExists();
|
||||
final int maxAttempts = 10;
|
||||
final double maxAcceptableRatio = 0.10;
|
||||
double v1ElapsedMs = 0, v2ElapsedMs = 0;
|
||||
double ratio = Double.MAX_VALUE;
|
||||
for (int i = 0; i < maxAttempts && ratio >= maxAcceptableRatio; i++) {
|
||||
v1ElapsedMs = randomRead(1,
|
||||
accountUsingInputStreamV1.getFileSystem());
|
||||
v2ElapsedMs = randomRead(2,
|
||||
accountUsingInputStreamV2.getFileSystem());
|
||||
ratio = v2ElapsedMs / v1ElapsedMs;
|
||||
LOG.info(String.format(
|
||||
"v1ElapsedMs=%1$d, v2ElapsedMs=%2$d, ratio=%3$.2f",
|
||||
(long) v1ElapsedMs,
|
||||
(long) v2ElapsedMs,
|
||||
ratio));
|
||||
}
|
||||
assertTrue(String.format(
|
||||
"Performance of version 2 is not acceptable: v1ElapsedMs=%1$d,"
|
||||
+ " v2ElapsedMs=%2$d, ratio=%3$.2f",
|
||||
(long) v1ElapsedMs,
|
||||
(long) v2ElapsedMs,
|
||||
ratio),
|
||||
ratio < maxAcceptableRatio);
|
||||
}
|
||||
|
||||
private long randomRead(int version, FileSystem fs) throws IOException {
|
||||
assumeHugeFileExists();
|
||||
final int minBytesToRead = 2 * MEGABYTE;
|
||||
Random random = new Random();
|
||||
byte[] buffer = new byte[8 * KILOBYTE];
|
||||
long totalBytesRead = 0;
|
||||
long bytesRead = 0;
|
||||
try(FSDataInputStream inputStream = fs.open(TEST_FILE_PATH)) {
|
||||
NanoTimer timer = new NanoTimer();
|
||||
|
||||
do {
|
||||
bytesRead = inputStream.read(buffer);
|
||||
totalBytesRead += bytesRead;
|
||||
inputStream.seek(random.nextInt(
|
||||
(int) (testFileLength - buffer.length)));
|
||||
} while (bytesRead > 0 && totalBytesRead < minBytesToRead);
|
||||
|
||||
long elapsedTimeMs = timer.elapsedTimeMs();
|
||||
|
||||
inputStream.close();
|
||||
|
||||
LOG.info(String.format(
|
||||
"v%1$d: totalBytesRead=%2$d, elapsedTimeMs=%3$d, Mbps=%4$.2f",
|
||||
version,
|
||||
totalBytesRead,
|
||||
elapsedTimeMs,
|
||||
toMbps(totalBytesRead, elapsedTimeMs)));
|
||||
|
||||
assertTrue(minBytesToRead <= totalBytesRead);
|
||||
|
||||
return elapsedTimeMs;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_999_DeleteHugeFiles() throws IOException {
|
||||
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
|
||||
fs.delete(TEST_FILE_PATH, false);
|
||||
timer.end("time to delete %s", TEST_FILE_PATH);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue