HADOOP-17347. ABFS: Read optimizations
- Contributed by Bilahari T H
(cherry picked from commit 1448add08f
)
This commit is contained in:
parent
f3a0ca66c2
commit
cb6729224e
|
@ -100,6 +100,16 @@ public class AbfsConfiguration{
|
|||
DefaultValue = DEFAULT_WRITE_BUFFER_SIZE)
|
||||
private int writeBufferSize;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(
|
||||
ConfigurationKey = AZURE_READ_SMALL_FILES_COMPLETELY,
|
||||
DefaultValue = DEFAULT_READ_SMALL_FILES_COMPLETELY)
|
||||
private boolean readSmallFilesCompletely;
|
||||
|
||||
@BooleanConfigurationValidatorAnnotation(
|
||||
ConfigurationKey = AZURE_READ_OPTIMIZE_FOOTER_READ,
|
||||
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
|
||||
private boolean optimizeFooterRead;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
|
||||
MinValue = MIN_BUFFER_SIZE,
|
||||
MaxValue = MAX_BUFFER_SIZE,
|
||||
|
@ -527,6 +537,14 @@ public class AbfsConfiguration{
|
|||
return this.writeBufferSize;
|
||||
}
|
||||
|
||||
public boolean readSmallFilesCompletely() {
|
||||
return this.readSmallFilesCompletely;
|
||||
}
|
||||
|
||||
public boolean optimizeFooterRead() {
|
||||
return this.optimizeFooterRead;
|
||||
}
|
||||
|
||||
public int getReadBufferSize() {
|
||||
return this.readBufferSize;
|
||||
}
|
||||
|
@ -925,4 +943,14 @@ public class AbfsConfiguration{
|
|||
return authority;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setReadSmallFilesCompletely(boolean readSmallFilesCompletely) {
|
||||
this.readSmallFilesCompletely = readSmallFilesCompletely;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setOptimizeFooterRead(boolean optimizeFooterRead) {
|
||||
this.optimizeFooterRead = optimizeFooterRead;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -643,6 +643,8 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
|
||||
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
|
||||
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
|
||||
.withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
|
||||
.withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
|
||||
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
|
||||
.withShouldReadBufferSizeAlways(
|
||||
abfsConfiguration.shouldReadBufferSizeAlways())
|
||||
|
|
|
@ -56,6 +56,8 @@ public final class ConfigurationKeys {
|
|||
public static final String AZURE_WRITE_MAX_REQUESTS_TO_QUEUE = "fs.azure.write.max.requests.to.queue";
|
||||
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
|
||||
public static final String AZURE_READ_BUFFER_SIZE = "fs.azure.read.request.size";
|
||||
public static final String AZURE_READ_SMALL_FILES_COMPLETELY = "fs.azure.read.smallfilescompletely";
|
||||
public static final String AZURE_READ_OPTIMIZE_FOOTER_READ = "fs.azure.read.optimizefooterread";
|
||||
public static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
|
||||
public static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost";
|
||||
public static final String AZURE_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
|
||||
|
|
|
@ -50,13 +50,15 @@ public final class FileSystemConfigurations {
|
|||
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_BACKOFF_INTERVAL = SIXTY_SECONDS;
|
||||
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_DELTA_BACKOFF = 2;
|
||||
|
||||
private static final int ONE_KB = 1024;
|
||||
private static final int ONE_MB = ONE_KB * ONE_KB;
|
||||
public static final int ONE_KB = 1024;
|
||||
public static final int ONE_MB = ONE_KB * ONE_KB;
|
||||
|
||||
// Default upload and download buffer size
|
||||
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
|
||||
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
||||
public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB
|
||||
public static final boolean DEFAULT_READ_SMALL_FILES_COMPLETELY = false;
|
||||
public static final boolean DEFAULT_OPTIMIZE_FOOTER_READ = false;
|
||||
public static final boolean DEFAULT_ALWAYS_READ_BUFFER_SIZE = false;
|
||||
public static final int DEFAULT_READ_AHEAD_BLOCK_SIZE = 4 * ONE_MB;
|
||||
public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB
|
||||
|
|
|
@ -38,6 +38,10 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
|
|||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
|
||||
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
|
||||
import static org.apache.hadoop.util.StringUtils.toLowerCase;
|
||||
|
||||
/**
|
||||
|
@ -46,6 +50,9 @@ import static org.apache.hadoop.util.StringUtils.toLowerCase;
|
|||
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
||||
StreamCapabilities {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class);
|
||||
// Footer size is set to qualify for both ORC and parquet files
|
||||
public static final int FOOTER_SIZE = 16 * ONE_KB;
|
||||
public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
|
||||
|
||||
private int readAheadBlockSize;
|
||||
private final AbfsClient client;
|
||||
|
@ -59,6 +66,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
private final boolean readAheadEnabled; // whether enable readAhead;
|
||||
private final boolean alwaysReadBufferSize;
|
||||
|
||||
private boolean firstRead = true;
|
||||
// SAS tokens can be re-used until they expire
|
||||
private CachedSASToken cachedSasToken;
|
||||
private byte[] buffer = null; // will be initialized on first use
|
||||
|
@ -70,11 +78,21 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
// of valid bytes in buffer)
|
||||
private boolean closed = false;
|
||||
|
||||
// Optimisations modify the pointer fields.
|
||||
// For better resilience the following fields are used to save the
|
||||
// existing state before optimization flows.
|
||||
private int limitBkp;
|
||||
private int bCursorBkp;
|
||||
private long fCursorBkp;
|
||||
private long fCursorAfterLastReadBkp;
|
||||
|
||||
/** Stream statistics. */
|
||||
private final AbfsInputStreamStatistics streamStatistics;
|
||||
private long bytesFromReadAhead; // bytes read from readAhead; for testing
|
||||
private long bytesFromRemoteRead; // bytes read remotely; for testing
|
||||
|
||||
private final AbfsInputStreamContext context;
|
||||
|
||||
public AbfsInputStream(
|
||||
final AbfsClient client,
|
||||
final Statistics statistics,
|
||||
|
@ -96,6 +114,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
this.cachedSasToken = new CachedSASToken(
|
||||
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
|
||||
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
|
||||
this.context = abfsInputStreamContext;
|
||||
readAheadBlockSize = abfsInputStreamContext.getReadAheadBlockSize();
|
||||
|
||||
// Propagate the config values to ReadBufferManager so that the first instance
|
||||
|
@ -137,7 +156,13 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
}
|
||||
incrementReadOps();
|
||||
do {
|
||||
if (shouldReadFully()) {
|
||||
lastReadBytes = readFileCompletely(b, currentOff, currentLen);
|
||||
} else if (shouldReadLastBlock()) {
|
||||
lastReadBytes = readLastBlock(b, currentOff, currentLen);
|
||||
} else {
|
||||
lastReadBytes = readOneBlock(b, currentOff, currentLen);
|
||||
}
|
||||
if (lastReadBytes > 0) {
|
||||
currentOff += lastReadBytes;
|
||||
currentLen -= lastReadBytes;
|
||||
|
@ -150,27 +175,24 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
|
||||
}
|
||||
|
||||
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
||||
private boolean shouldReadFully() {
|
||||
return this.firstRead && this.context.readSmallFilesCompletely()
|
||||
&& this.contentLength <= this.bufferSize;
|
||||
}
|
||||
|
||||
Preconditions.checkNotNull(b);
|
||||
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
|
||||
off, len);
|
||||
private boolean shouldReadLastBlock() {
|
||||
long footerStart = max(0, this.contentLength - FOOTER_SIZE);
|
||||
return this.firstRead && this.context.optimizeFooterRead()
|
||||
&& this.fCursor >= footerStart;
|
||||
}
|
||||
|
||||
private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (this.available() == 0) {
|
||||
if (!validate(b, off, len)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (off < 0 || len < 0 || len > b.length - off) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
}
|
||||
|
||||
//If buffer is empty, then fill the buffer.
|
||||
if (bCursor == limit) {
|
||||
//If EOF, then return -1
|
||||
|
@ -197,6 +219,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
|
||||
}
|
||||
}
|
||||
if (firstRead) {
|
||||
firstRead = false;
|
||||
}
|
||||
|
||||
if (bytesRead == -1) {
|
||||
return -1;
|
||||
|
@ -206,11 +231,123 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
fCursor += bytesRead;
|
||||
fCursorAfterLastRead = fCursor;
|
||||
}
|
||||
return copyToUserBuffer(b, off, len);
|
||||
}
|
||||
|
||||
private int readFileCompletely(final byte[] b, final int off, final int len)
|
||||
throws IOException {
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (!validate(b, off, len)) {
|
||||
return -1;
|
||||
}
|
||||
savePointerState();
|
||||
// data need to be copied to user buffer from index bCursor, bCursor has
|
||||
// to be the current fCusor
|
||||
bCursor = (int) fCursor;
|
||||
return optimisedRead(b, off, len, 0, contentLength);
|
||||
}
|
||||
|
||||
private int readLastBlock(final byte[] b, final int off, final int len)
|
||||
throws IOException {
|
||||
if (len == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (!validate(b, off, len)) {
|
||||
return -1;
|
||||
}
|
||||
savePointerState();
|
||||
// data need to be copied to user buffer from index bCursor,
|
||||
// AbfsInutStream buffer is going to contain data from last block start. In
|
||||
// that case bCursor will be set to fCursor - lastBlockStart
|
||||
long lastBlockStart = max(0, contentLength - bufferSize);
|
||||
bCursor = (int) (fCursor - lastBlockStart);
|
||||
// 0 if contentlength is < buffersize
|
||||
long actualLenToRead = min(bufferSize, contentLength);
|
||||
return optimisedRead(b, off, len, lastBlockStart, actualLenToRead);
|
||||
}
|
||||
|
||||
private int optimisedRead(final byte[] b, final int off, final int len,
|
||||
final long readFrom, final long actualLen) throws IOException {
|
||||
fCursor = readFrom;
|
||||
int totalBytesRead = 0;
|
||||
int lastBytesRead = 0;
|
||||
try {
|
||||
buffer = new byte[bufferSize];
|
||||
for (int i = 0;
|
||||
i < MAX_OPTIMIZED_READ_ATTEMPTS && fCursor < contentLength; i++) {
|
||||
lastBytesRead = readInternal(fCursor, buffer, limit,
|
||||
(int) actualLen - limit, true);
|
||||
if (lastBytesRead > 0) {
|
||||
totalBytesRead += lastBytesRead;
|
||||
limit += lastBytesRead;
|
||||
fCursor += lastBytesRead;
|
||||
fCursorAfterLastRead = fCursor;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Optimized read failed. Defaulting to readOneBlock {}", e);
|
||||
restorePointerState();
|
||||
return readOneBlock(b, off, len);
|
||||
} finally {
|
||||
firstRead = false;
|
||||
}
|
||||
if (totalBytesRead < 1) {
|
||||
restorePointerState();
|
||||
return -1;
|
||||
}
|
||||
// If the read was partial and the user requested part of data has
|
||||
// not read then fallback to readoneblock. When limit is smaller than
|
||||
// bCursor that means the user requested data has not been read.
|
||||
if (fCursor < contentLength && bCursor > limit) {
|
||||
restorePointerState();
|
||||
return readOneBlock(b, off, len);
|
||||
}
|
||||
return copyToUserBuffer(b, off, len);
|
||||
}
|
||||
|
||||
private void savePointerState() {
|
||||
// Saving the current state for fall back ifn case optimization fails
|
||||
this.limitBkp = this.limit;
|
||||
this.fCursorBkp = this.fCursor;
|
||||
this.fCursorAfterLastReadBkp = this.fCursorAfterLastRead;
|
||||
this.bCursorBkp = this.bCursor;
|
||||
}
|
||||
|
||||
private void restorePointerState() {
|
||||
// Saving the current state for fall back ifn case optimization fails
|
||||
this.limit = this.limitBkp;
|
||||
this.fCursor = this.fCursorBkp;
|
||||
this.fCursorAfterLastRead = this.fCursorAfterLastReadBkp;
|
||||
this.bCursor = this.bCursorBkp;
|
||||
}
|
||||
|
||||
private boolean validate(final byte[] b, final int off, final int len)
|
||||
throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
|
||||
}
|
||||
|
||||
Preconditions.checkNotNull(b);
|
||||
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
|
||||
off, len);
|
||||
|
||||
if (this.available() == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (off < 0 || len < 0 || len > b.length - off) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private int copyToUserBuffer(byte[] b, int off, int len){
|
||||
//If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
|
||||
//(bytes returned may be less than requested)
|
||||
int bytesRemaining = limit - bCursor;
|
||||
int bytesToRead = Math.min(len, bytesRemaining);
|
||||
int bytesToRead = min(len, bytesRemaining);
|
||||
System.arraycopy(buffer, bCursor, b, off, bytesToRead);
|
||||
bCursor += bytesToRead;
|
||||
if (statistics != null) {
|
||||
|
@ -224,7 +361,6 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
return bytesToRead;
|
||||
}
|
||||
|
||||
|
||||
private int readInternal(final long position, final byte[] b, final int offset, final int length,
|
||||
final boolean bypassReadAhead) throws IOException {
|
||||
if (readAheadEnabled && !bypassReadAhead) {
|
||||
|
@ -239,7 +375,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
long nextOffset = position;
|
||||
// First read to queue needs to be of readBufferSize and later
|
||||
// of readAhead Block size
|
||||
long nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
|
||||
long nextSize = min((long) bufferSize, contentLength - nextOffset);
|
||||
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
|
||||
while (numReadAheads > 0 && nextOffset < contentLength) {
|
||||
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
|
||||
|
@ -248,7 +384,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
nextOffset = nextOffset + nextSize;
|
||||
numReadAheads--;
|
||||
// From next round onwards should be of readahead block size.
|
||||
nextSize = Math.min((long) readAheadBlockSize, contentLength - nextOffset);
|
||||
nextSize = min((long) readAheadBlockSize, contentLength - nextOffset);
|
||||
}
|
||||
|
||||
// try reading from buffers first
|
||||
|
@ -572,4 +708,24 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
|
|||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
int getBCursor() {
|
||||
return this.bCursor;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getFCursor() {
|
||||
return this.fCursor;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getFCursorAfterLastRead() {
|
||||
return this.fCursorAfterLastRead;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
long getLimit() {
|
||||
return this.limit;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
|||
|
||||
private AbfsInputStreamStatistics streamStatistics;
|
||||
|
||||
private boolean readSmallFilesCompletely;
|
||||
|
||||
private boolean optimizeFooterRead;
|
||||
|
||||
public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
|
||||
super(sasTokenRenewPeriodForStreamsInSeconds);
|
||||
}
|
||||
|
@ -69,6 +73,18 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
|||
return this;
|
||||
}
|
||||
|
||||
public AbfsInputStreamContext withReadSmallFilesCompletely(
|
||||
final boolean readSmallFilesCompletely) {
|
||||
this.readSmallFilesCompletely = readSmallFilesCompletely;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsInputStreamContext withOptimizeFooterRead(
|
||||
final boolean optimizeFooterRead) {
|
||||
this.optimizeFooterRead = optimizeFooterRead;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AbfsInputStreamContext withShouldReadBufferSizeAlways(
|
||||
final boolean alwaysReadBufferSize) {
|
||||
this.alwaysReadBufferSize = alwaysReadBufferSize;
|
||||
|
@ -110,6 +126,14 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
|
|||
return streamStatistics;
|
||||
}
|
||||
|
||||
public boolean readSmallFilesCompletely() {
|
||||
return this.readSmallFilesCompletely;
|
||||
}
|
||||
|
||||
public boolean optimizeFooterRead() {
|
||||
return this.optimizeFooterRead;
|
||||
}
|
||||
|
||||
public boolean shouldReadBufferSizeAlways() {
|
||||
return alwaysReadBufferSize;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,256 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
|
||||
|
||||
protected static final int HUNDRED = 100;
|
||||
|
||||
public ITestAbfsInputStream() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithNoOptimization() throws Exception {
|
||||
for (int i = 2; i <= 7; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
protected void testWithNoOptimization(final FileSystem fs,
|
||||
final Path testFilePath, final int seekPos, final byte[] fileContent)
|
||||
throws IOException {
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
|
||||
iStream = new FSDataInputStream(abfsInputStream);
|
||||
seek(iStream, seekPos);
|
||||
long totalBytesRead = 0;
|
||||
int length = HUNDRED * HUNDRED;
|
||||
do {
|
||||
byte[] buffer = new byte[length];
|
||||
int bytesRead = iStream.read(buffer, 0, length);
|
||||
totalBytesRead += bytesRead;
|
||||
if ((totalBytesRead + seekPos) >= fileContent.length) {
|
||||
length = (fileContent.length - seekPos) % length;
|
||||
}
|
||||
assertEquals(length, bytesRead);
|
||||
assertContentReadCorrectly(fileContent,
|
||||
(int) (seekPos + totalBytesRead - length), length, buffer);
|
||||
|
||||
assertTrue(abfsInputStream.getFCursor() >= seekPos + totalBytesRead);
|
||||
assertTrue(abfsInputStream.getFCursorAfterLastRead() >= seekPos + totalBytesRead);
|
||||
assertTrue(abfsInputStream.getBCursor() >= totalBytesRead % abfsInputStream.getBufferSize());
|
||||
assertTrue(abfsInputStream.getLimit() >= totalBytesRead % abfsInputStream.getBufferSize());
|
||||
} while (totalBytesRead + seekPos < fileContent.length);
|
||||
} finally {
|
||||
iStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionInOptimization() throws Exception {
|
||||
for (int i = 2; i <= 7; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
|
||||
fileSize / 4, fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
private void testExceptionInOptimization(final FileSystem fs,
|
||||
final Path testFilePath,
|
||||
final int seekPos, final int length, final byte[] fileContent)
|
||||
throws IOException {
|
||||
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
abfsInputStream = spy(abfsInputStream);
|
||||
doThrow(new IOException())
|
||||
.doCallRealMethod()
|
||||
.when(abfsInputStream)
|
||||
.readRemote(anyLong(), any(), anyInt(), anyInt());
|
||||
|
||||
iStream = new FSDataInputStream(abfsInputStream);
|
||||
verifyBeforeSeek(abfsInputStream);
|
||||
seek(iStream, seekPos);
|
||||
byte[] buffer = new byte[length];
|
||||
int bytesRead = iStream.read(buffer, 0, length);
|
||||
long actualLength = length;
|
||||
if (seekPos + length > fileContent.length) {
|
||||
long delta = seekPos + length - fileContent.length;
|
||||
actualLength = length - delta;
|
||||
}
|
||||
assertEquals(bytesRead, actualLength);
|
||||
assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer);
|
||||
assertEquals(fileContent.length, abfsInputStream.getFCursor());
|
||||
assertEquals(fileContent.length, abfsInputStream.getFCursorAfterLastRead());
|
||||
assertEquals(actualLength, abfsInputStream.getBCursor());
|
||||
assertTrue(abfsInputStream.getLimit() >= actualLength);
|
||||
} finally {
|
||||
iStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected AzureBlobFileSystem getFileSystem(boolean readSmallFilesCompletely)
|
||||
throws IOException {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setReadSmallFilesCompletely(readSmallFilesCompletely);
|
||||
return fs;
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
|
||||
boolean readSmallFileCompletely, int fileSize) throws IOException {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setOptimizeFooterRead(optimizeFooterRead);
|
||||
if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
|
||||
.getReadBufferSize()) {
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setReadSmallFilesCompletely(readSmallFileCompletely);
|
||||
}
|
||||
return fs;
|
||||
}
|
||||
|
||||
protected byte[] getRandomBytesArray(int length) {
|
||||
final byte[] b = new byte[length];
|
||||
new Random().nextBytes(b);
|
||||
return b;
|
||||
}
|
||||
|
||||
protected Path createFileWithContent(FileSystem fs, String fileName,
|
||||
byte[] fileContent) throws IOException {
|
||||
Path testFilePath = path(fileName);
|
||||
try (FSDataOutputStream oStream = fs.create(testFilePath)) {
|
||||
oStream.write(fileContent);
|
||||
oStream.flush();
|
||||
}
|
||||
return testFilePath;
|
||||
}
|
||||
|
||||
protected AzureBlobFileSystemStore getAbfsStore(FileSystem fs)
|
||||
throws NoSuchFieldException, IllegalAccessException {
|
||||
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
|
||||
Field abfsStoreField = AzureBlobFileSystem.class
|
||||
.getDeclaredField("abfsStore");
|
||||
abfsStoreField.setAccessible(true);
|
||||
return (AzureBlobFileSystemStore) abfsStoreField.get(abfs);
|
||||
}
|
||||
|
||||
protected Map<String, Long> getInstrumentationMap(FileSystem fs)
|
||||
throws NoSuchFieldException, IllegalAccessException {
|
||||
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fs;
|
||||
Field abfsCountersField = AzureBlobFileSystem.class
|
||||
.getDeclaredField("abfsCounters");
|
||||
abfsCountersField.setAccessible(true);
|
||||
AbfsCounters abfsCounters = (AbfsCounters) abfsCountersField.get(abfs);
|
||||
return abfsCounters.toMap();
|
||||
}
|
||||
|
||||
protected void assertContentReadCorrectly(byte[] actualFileContent, int from,
|
||||
int len, byte[] contentRead) {
|
||||
for (int i = 0; i < len; i++) {
|
||||
assertEquals(contentRead[i], actualFileContent[i + from]);
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertBuffersAreNotEqual(byte[] actualContent,
|
||||
byte[] contentRead, AbfsConfiguration conf) {
|
||||
assertBufferEquality(actualContent, contentRead, conf, false);
|
||||
}
|
||||
|
||||
protected void assertBuffersAreEqual(byte[] actualContent, byte[] contentRead,
|
||||
AbfsConfiguration conf) {
|
||||
assertBufferEquality(actualContent, contentRead, conf, true);
|
||||
}
|
||||
|
||||
private void assertBufferEquality(byte[] actualContent, byte[] contentRead,
|
||||
AbfsConfiguration conf, boolean assertEqual) {
|
||||
int bufferSize = conf.getReadBufferSize();
|
||||
int actualContentSize = actualContent.length;
|
||||
int n = (actualContentSize < bufferSize) ? actualContentSize : bufferSize;
|
||||
int matches = 0;
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (actualContent[i] == contentRead[i]) {
|
||||
matches++;
|
||||
}
|
||||
}
|
||||
if (assertEqual) {
|
||||
assertEquals(n, matches);
|
||||
} else {
|
||||
assertNotEquals(n, matches);
|
||||
}
|
||||
}
|
||||
|
||||
protected void seek(FSDataInputStream iStream, long seekPos)
|
||||
throws IOException {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream.getWrappedStream();
|
||||
verifyBeforeSeek(abfsInputStream);
|
||||
iStream.seek(seekPos);
|
||||
verifyAfterSeek(abfsInputStream, seekPos);
|
||||
}
|
||||
|
||||
private void verifyBeforeSeek(AbfsInputStream abfsInputStream){
|
||||
assertEquals(0, abfsInputStream.getFCursor());
|
||||
assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
|
||||
assertEquals(0, abfsInputStream.getLimit());
|
||||
assertEquals(0, abfsInputStream.getBCursor());
|
||||
}
|
||||
|
||||
private void verifyAfterSeek(AbfsInputStream abfsInputStream, long seekPos){
|
||||
assertEquals(seekPos, abfsInputStream.getFCursor());
|
||||
assertEquals(-1, abfsInputStream.getFCursorAfterLastRead());
|
||||
assertEquals(0, abfsInputStream.getLimit());
|
||||
assertEquals(0, abfsInputStream.getBCursor());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,358 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
||||
|
||||
public class ITestAbfsInputStreamReadFooter extends ITestAbfsInputStream {
|
||||
|
||||
private static final int TEN = 10;
|
||||
private static final int TWENTY = 20;
|
||||
|
||||
public ITestAbfsInputStreamReadFooter() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
|
||||
testNumBackendCalls(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
|
||||
throws Exception {
|
||||
testNumBackendCalls(false);
|
||||
}
|
||||
|
||||
private void testNumBackendCalls(boolean optimizeFooterRead)
|
||||
throws Exception {
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
|
||||
fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
int length = AbfsInputStream.FOOTER_SIZE;
|
||||
try (FSDataInputStream iStream = fs.open(testFilePath)) {
|
||||
byte[] buffer = new byte[length];
|
||||
|
||||
Map<String, Long> metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeBeforeTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
iStream.seek(fileSize - 8);
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
iStream.seek(fileSize - (TEN * ONE_KB));
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
iStream.seek(fileSize - (TWENTY * ONE_KB));
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeAfterTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
if (optimizeFooterRead) {
|
||||
assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
} else {
|
||||
assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeginAndReadWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(true, SeekTo.BEGIN);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeginAndReadWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(false, SeekTo.BEGIN);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeforeFooterAndReadWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(true, SeekTo.BEFORE_FOOTER_START);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeforeFooterAndReadWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(false, SeekTo.BEFORE_FOOTER_START);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToFooterAndReadWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(true, SeekTo.AT_FOOTER_START);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToFooterAndReadWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(false, SeekTo.AT_FOOTER_START);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToAfterFooterAndReadWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(true, SeekTo.AFTER_FOOTER_START);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToToAfterFooterAndReadWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(false, SeekTo.AFTER_FOOTER_START);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToEndAndReadWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(true, SeekTo.END);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToEndAndReadWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(false, SeekTo.END);
|
||||
}
|
||||
|
||||
private void testSeekAndReadWithConf(boolean optimizeFooterRead,
|
||||
SeekTo seekTo) throws Exception {
|
||||
for (int i = 2; i <= 6; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(optimizeFooterRead,
|
||||
fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
seekReadAndTest(fs, testFilePath, seekPos(seekTo, fileSize), HUNDRED,
|
||||
fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
private int seekPos(SeekTo seekTo, int fileSize) {
|
||||
if (seekTo == SeekTo.BEGIN) {
|
||||
return 0;
|
||||
}
|
||||
if (seekTo == SeekTo.BEFORE_FOOTER_START) {
|
||||
return fileSize - AbfsInputStream.FOOTER_SIZE - 1;
|
||||
}
|
||||
if (seekTo == SeekTo.AT_FOOTER_START) {
|
||||
return fileSize - AbfsInputStream.FOOTER_SIZE;
|
||||
}
|
||||
if (seekTo == SeekTo.END) {
|
||||
return fileSize - 1;
|
||||
}
|
||||
//seekTo == SeekTo.AFTER_FOOTER_START
|
||||
return fileSize - AbfsInputStream.FOOTER_SIZE + 1;
|
||||
}
|
||||
|
||||
private void seekReadAndTest(final FileSystem fs, final Path testFilePath,
|
||||
final int seekPos, final int length, final byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
|
||||
long actualContentLength = fileContent.length;
|
||||
try (FSDataInputStream iStream = fs.open(testFilePath)) {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
long bufferSize = abfsInputStream.getBufferSize();
|
||||
seek(iStream, seekPos);
|
||||
byte[] buffer = new byte[length];
|
||||
long bytesRead = iStream.read(buffer, 0, length);
|
||||
|
||||
long footerStart = max(0,
|
||||
actualContentLength - AbfsInputStream.FOOTER_SIZE);
|
||||
boolean optimizationOn =
|
||||
conf.optimizeFooterRead() && seekPos >= footerStart;
|
||||
|
||||
long actualLength = length;
|
||||
if (seekPos + length > actualContentLength) {
|
||||
long delta = seekPos + length - actualContentLength;
|
||||
actualLength = length - delta;
|
||||
}
|
||||
long expectedLimit;
|
||||
long expectedBCurson;
|
||||
long expectedFCursor;
|
||||
if (optimizationOn) {
|
||||
if (actualContentLength <= bufferSize) {
|
||||
expectedLimit = actualContentLength;
|
||||
expectedBCurson = seekPos + actualLength;
|
||||
} else {
|
||||
expectedLimit = bufferSize;
|
||||
long lastBlockStart = max(0, actualContentLength - bufferSize);
|
||||
expectedBCurson = seekPos - lastBlockStart + actualLength;
|
||||
}
|
||||
expectedFCursor = actualContentLength;
|
||||
} else {
|
||||
if (seekPos + bufferSize < actualContentLength) {
|
||||
expectedLimit = bufferSize;
|
||||
expectedFCursor = bufferSize;
|
||||
} else {
|
||||
expectedLimit = actualContentLength - seekPos;
|
||||
expectedFCursor = min(seekPos + bufferSize, actualContentLength);
|
||||
}
|
||||
expectedBCurson = actualLength;
|
||||
}
|
||||
|
||||
assertEquals(expectedFCursor, abfsInputStream.getFCursor());
|
||||
assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
|
||||
assertEquals(expectedLimit, abfsInputStream.getLimit());
|
||||
assertEquals(expectedBCurson, abfsInputStream.getBCursor());
|
||||
assertEquals(actualLength, bytesRead);
|
||||
// Verify user-content read
|
||||
assertContentReadCorrectly(fileContent, seekPos, (int) actualLength, buffer);
|
||||
// Verify data read to AbfsInputStream buffer
|
||||
int from = seekPos;
|
||||
if (optimizationOn) {
|
||||
from = (int) max(0, actualContentLength - bufferSize);
|
||||
}
|
||||
assertContentReadCorrectly(fileContent, from, (int) abfsInputStream.getLimit(),
|
||||
abfsInputStream.getBuffer());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartialReadWithNoData()
|
||||
throws Exception {
|
||||
for (int i = 2; i <= 6; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testPartialReadWithNoData(fs, testFilePath,
|
||||
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
||||
fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
private void testPartialReadWithNoData(final FileSystem fs,
|
||||
final Path testFilePath, final int seekPos, final int length,
|
||||
final byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
abfsInputStream = spy(abfsInputStream);
|
||||
doReturn(10).doReturn(10).doCallRealMethod().when(abfsInputStream)
|
||||
.readRemote(anyLong(), any(), anyInt(), anyInt());
|
||||
|
||||
iStream = new FSDataInputStream(abfsInputStream);
|
||||
seek(iStream, seekPos);
|
||||
|
||||
byte[] buffer = new byte[length];
|
||||
int bytesRead = iStream.read(buffer, 0, length);
|
||||
assertEquals(length, bytesRead);
|
||||
assertContentReadCorrectly(fileContent, seekPos, length, buffer);
|
||||
assertEquals(fileContent.length, abfsInputStream.getFCursor());
|
||||
assertEquals(length, abfsInputStream.getBCursor());
|
||||
assertTrue(abfsInputStream.getLimit() >= length);
|
||||
} finally {
|
||||
iStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartialReadWithSomeDat()
|
||||
throws Exception {
|
||||
for (int i = 3; i <= 6; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true, fileSize);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
testPartialReadWithSomeDat(fs, testFilePath,
|
||||
fileSize - AbfsInputStream.FOOTER_SIZE, AbfsInputStream.FOOTER_SIZE,
|
||||
fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
private void testPartialReadWithSomeDat(final FileSystem fs,
|
||||
final Path testFilePath, final int seekPos, final int length,
|
||||
final byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
abfsInputStream = spy(abfsInputStream);
|
||||
// first readRemote, will return first 10 bytes
|
||||
// second readRemote returns data till the last 2 bytes
|
||||
int someDataLength = 2;
|
||||
int secondReturnSize =
|
||||
min(fileContent.length, abfsInputStream.getBufferSize()) - 10
|
||||
- someDataLength;
|
||||
doReturn(10).doReturn(secondReturnSize).doCallRealMethod()
|
||||
.when(abfsInputStream)
|
||||
.readRemote(anyLong(), any(), anyInt(), anyInt());
|
||||
|
||||
iStream = new FSDataInputStream(abfsInputStream);
|
||||
seek(iStream, seekPos);
|
||||
|
||||
byte[] buffer = new byte[length];
|
||||
int bytesRead = iStream.read(buffer, 0, length);
|
||||
assertEquals(length, bytesRead);
|
||||
assertEquals(fileContent.length, abfsInputStream.getFCursor());
|
||||
// someDataLength(2), because in the do-while loop in read, the 2nd loop
|
||||
// will go to readoneblock and that resets the bCursor to 0 as
|
||||
// bCursor == limit finally when the 2 bytes are read bCursor and limit
|
||||
// will be at someDataLength(2)
|
||||
assertEquals(someDataLength, abfsInputStream.getBCursor());
|
||||
assertEquals(someDataLength, abfsInputStream.getLimit());
|
||||
} finally {
|
||||
iStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem getFileSystem(boolean optimizeFooterRead,
|
||||
int fileSize) throws IOException {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setOptimizeFooterRead(optimizeFooterRead);
|
||||
if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
|
||||
.getReadBufferSize()) {
|
||||
getAbfsStore(fs).getAbfsConfiguration()
|
||||
.setReadSmallFilesCompletely(false);
|
||||
}
|
||||
return fs;
|
||||
}
|
||||
|
||||
private enum SeekTo {
|
||||
BEGIN, AT_FOOTER_START, BEFORE_FOOTER_START, AFTER_FOOTER_START, END
|
||||
}
|
||||
}
|
|
@ -0,0 +1,326 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs.services;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
|
||||
|
||||
public class ITestAbfsInputStreamSmallFileReads extends ITestAbfsInputStream {
|
||||
|
||||
public ITestAbfsInputStreamSmallFileReads() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOnlyOneServerCallIsMadeWhenTheConfIsTrue() throws Exception {
|
||||
testNumBackendCalls(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleServerCallsAreMadeWhenTheConfIsFalse()
|
||||
throws Exception {
|
||||
testNumBackendCalls(false);
|
||||
}
|
||||
|
||||
private void testNumBackendCalls(boolean readSmallFilesCompletely)
|
||||
throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
int fileSize = i * ONE_MB;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
int length = ONE_KB;
|
||||
try (FSDataInputStream iStream = fs.open(testFilePath)) {
|
||||
byte[] buffer = new byte[length];
|
||||
|
||||
Map<String, Long> metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeBeforeTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
iStream.seek(seekPos(SeekTo.END, fileSize, length));
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
iStream.seek(seekPos(SeekTo.MIDDLE, fileSize, length));
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
iStream.seek(seekPos(SeekTo.BEGIN, fileSize, length));
|
||||
iStream.read(buffer, 0, length);
|
||||
|
||||
metricMap = getInstrumentationMap(fs);
|
||||
long requestsMadeAfterTest = metricMap
|
||||
.get(CONNECTIONS_MADE.getStatName());
|
||||
|
||||
if (readSmallFilesCompletely) {
|
||||
assertEquals(1, requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
} else {
|
||||
assertEquals(3, requestsMadeAfterTest - requestsMadeBeforeTest);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeginingAndReadSmallFileWithConfTrue()
|
||||
throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeginingAndReadSmallFileWithConfFalse()
|
||||
throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.BEGIN, 2, 4, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeginingAndReadBigFileWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToBeginingAndReadBigFileWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.BEGIN, 5, 6, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToEndAndReadSmallFileWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.END, 2, 4, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToEndAndReadSmallFileWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.END, 2, 4, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToEndAndReadBigFileWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.END, 5, 6, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToEndAndReaBigFiledWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.END, 5, 6, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToMiddleAndReadSmallFileWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToMiddleAndReadSmallFileWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.MIDDLE, 2, 4, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToMiddleAndReaBigFileWithConfTrue() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekToMiddleAndReadBigFileWithConfFalse() throws Exception {
|
||||
testSeekAndReadWithConf(SeekTo.MIDDLE, 5, 6, false);
|
||||
}
|
||||
|
||||
private void testSeekAndReadWithConf(SeekTo seekTo, int startFileSizeInMB,
|
||||
int endFileSizeInMB, boolean readSmallFilesCompletely) throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem(readSmallFilesCompletely);
|
||||
for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) {
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
int fileSize = i * ONE_MB;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
int length = ONE_KB;
|
||||
int seekPos = seekPos(seekTo, fileSize, length);
|
||||
seekReadAndTest(fs, testFilePath, seekPos, length, fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
private int seekPos(SeekTo seekTo, int fileSize, int length) {
|
||||
if (seekTo == SeekTo.BEGIN) {
|
||||
return 0;
|
||||
}
|
||||
if (seekTo == SeekTo.END) {
|
||||
return fileSize - length;
|
||||
}
|
||||
return fileSize / 2;
|
||||
}
|
||||
|
||||
private void seekReadAndTest(FileSystem fs, Path testFilePath, int seekPos,
|
||||
int length, byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
AbfsConfiguration conf = getAbfsStore(fs).getAbfsConfiguration();
|
||||
try (FSDataInputStream iStream = fs.open(testFilePath)) {
|
||||
seek(iStream, seekPos);
|
||||
byte[] buffer = new byte[length];
|
||||
int bytesRead = iStream.read(buffer, 0, length);
|
||||
assertEquals(bytesRead, length);
|
||||
assertContentReadCorrectly(fileContent, seekPos, length, buffer);
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
|
||||
final int readBufferSize = conf.getReadBufferSize();
|
||||
final int fileContentLength = fileContent.length;
|
||||
final boolean smallFile = fileContentLength <= readBufferSize;
|
||||
int expectedLimit, expectedFCursor;
|
||||
int expectedBCursor;
|
||||
if (conf.readSmallFilesCompletely() && smallFile) {
|
||||
assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf);
|
||||
expectedFCursor = fileContentLength;
|
||||
expectedLimit = fileContentLength;
|
||||
expectedBCursor = seekPos + length;
|
||||
} else {
|
||||
if ((seekPos == 0)) {
|
||||
assertBuffersAreEqual(fileContent, abfsInputStream.getBuffer(), conf);
|
||||
} else {
|
||||
assertBuffersAreNotEqual(fileContent, abfsInputStream.getBuffer(),
|
||||
conf);
|
||||
}
|
||||
expectedBCursor = length;
|
||||
expectedFCursor = (fileContentLength < (seekPos + readBufferSize))
|
||||
? fileContentLength
|
||||
: (seekPos + readBufferSize);
|
||||
expectedLimit = (fileContentLength < (seekPos + readBufferSize))
|
||||
? (fileContentLength - seekPos)
|
||||
: readBufferSize;
|
||||
}
|
||||
assertEquals(expectedFCursor, abfsInputStream.getFCursor());
|
||||
assertEquals(expectedFCursor, abfsInputStream.getFCursorAfterLastRead());
|
||||
assertEquals(expectedBCursor, abfsInputStream.getBCursor());
|
||||
assertEquals(expectedLimit, abfsInputStream.getLimit());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartialReadWithNoData() throws Exception {
|
||||
for (int i = 2; i <= 4; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4,
|
||||
fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
private void partialReadWithNoData(final FileSystem fs,
|
||||
final Path testFilePath,
|
||||
final int seekPos, final int length, final byte[] fileContent)
|
||||
throws IOException {
|
||||
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
abfsInputStream = spy(abfsInputStream);
|
||||
doReturn(10)
|
||||
.doReturn(10)
|
||||
.doCallRealMethod()
|
||||
.when(abfsInputStream)
|
||||
.readRemote(anyLong(), any(), anyInt(), anyInt());
|
||||
|
||||
iStream = new FSDataInputStream(abfsInputStream);
|
||||
seek(iStream, seekPos);
|
||||
byte[] buffer = new byte[length];
|
||||
int bytesRead = iStream.read(buffer, 0, length);
|
||||
assertEquals(bytesRead, length);
|
||||
assertContentReadCorrectly(fileContent, seekPos, length, buffer);
|
||||
assertEquals(fileContent.length, abfsInputStream.getFCursor());
|
||||
assertEquals(fileContent.length,
|
||||
abfsInputStream.getFCursorAfterLastRead());
|
||||
assertEquals(length, abfsInputStream.getBCursor());
|
||||
assertTrue(abfsInputStream.getLimit() >= length);
|
||||
} finally {
|
||||
iStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPartialReadWithSomeData() throws Exception {
|
||||
for (int i = 2; i <= 4; i++) {
|
||||
int fileSize = i * ONE_MB;
|
||||
final AzureBlobFileSystem fs = getFileSystem(true);
|
||||
String fileName = methodName.getMethodName() + i;
|
||||
byte[] fileContent = getRandomBytesArray(fileSize);
|
||||
Path testFilePath = createFileWithContent(fs, fileName, fileContent);
|
||||
partialReadWithSomeData(fs, testFilePath, fileSize / 2,
|
||||
fileSize / 4, fileContent);
|
||||
}
|
||||
}
|
||||
|
||||
private void partialReadWithSomeData(final FileSystem fs,
|
||||
final Path testFilePath,
|
||||
final int seekPos, final int length, final byte[] fileContent)
|
||||
throws IOException, NoSuchFieldException, IllegalAccessException {
|
||||
FSDataInputStream iStream = fs.open(testFilePath);
|
||||
try {
|
||||
AbfsInputStream abfsInputStream = (AbfsInputStream) iStream
|
||||
.getWrappedStream();
|
||||
abfsInputStream = spy(abfsInputStream);
|
||||
// first readRemote, will return first 10 bytes
|
||||
// second readRemote, seekPos - someDataLength(10) will reach the
|
||||
// seekPos as 10 bytes are already read in the first call. Plus
|
||||
// someDataLength(10)
|
||||
int someDataLength = 10;
|
||||
int secondReturnSize = seekPos - 10 + someDataLength;
|
||||
doReturn(10)
|
||||
.doReturn(secondReturnSize)
|
||||
.doCallRealMethod()
|
||||
.when(abfsInputStream)
|
||||
.readRemote(anyLong(), any(), anyInt(), anyInt());
|
||||
|
||||
iStream = new FSDataInputStream(abfsInputStream);
|
||||
seek(iStream, seekPos);
|
||||
|
||||
byte[] buffer = new byte[length];
|
||||
int bytesRead = iStream.read(buffer, 0, length);
|
||||
assertEquals(length, bytesRead);
|
||||
assertTrue(abfsInputStream.getFCursor() > seekPos + length);
|
||||
assertTrue(abfsInputStream.getFCursorAfterLastRead() > seekPos + length);
|
||||
// Optimized read was no complete but it got some user requested data
|
||||
// from server. So obviously the buffer will contain data more than
|
||||
// seekPos + len
|
||||
assertEquals(length - someDataLength, abfsInputStream.getBCursor());
|
||||
assertTrue(abfsInputStream.getLimit() > length - someDataLength);
|
||||
} finally {
|
||||
iStream.close();
|
||||
}
|
||||
}
|
||||
|
||||
private enum SeekTo {BEGIN, MIDDLE, END}
|
||||
|
||||
}
|
Loading…
Reference in New Issue