HADOOP-16040. ABFS: Bug fix for tolerateOobAppends configuration.

Contributed by Da Zhou.
This commit is contained in:
Da Zhou 2019-01-10 11:58:39 +00:00 committed by Yuan Gao
parent 175a69e863
commit 4a1761067d
3 changed files with 39 additions and 4 deletions

View File

@ -374,7 +374,8 @@ public class AzureBlobFileSystemStore {
// Add statistics for InputStream
return new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag);
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
abfsConfiguration.getTolerateOobAppends(), eTag);
}
public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws

View File

@ -61,6 +61,7 @@ public class AbfsInputStream extends FSInputStream {
final long contentLength,
final int bufferSize,
final int readAheadQueueDepth,
final boolean tolerateOobAppends,
final String eTag) {
this.client = client;
this.statistics = statistics;
@ -68,8 +69,8 @@ public class AbfsInputStream extends FSInputStream {
this.contentLength = contentLength;
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.tolerateOobAppends = tolerateOobAppends;
this.eTag = eTag;
this.tolerateOobAppends = false;
this.readAheadEnabled = true;
}

View File

@ -25,12 +25,14 @@ import java.util.Random;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
@ -66,7 +68,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
}
@Test (expected = IOException.class)
public void testOOBWrites() throws Exception {
public void testOOBWritesAndReadFail() throws Exception {
Configuration conf = this.getRawConfiguration();
conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, false);
final AzureBlobFileSystem fs = getFileSystem();
int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
@ -83,7 +87,6 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
try (FSDataInputStream readStream = fs.open(testFilePath)) {
assertEquals(readBufferSize,
readStream.read(bytesToRead, 0, readBufferSize));
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
writeStream.write(b);
writeStream.flush();
@ -94,6 +97,36 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
}
}
@Test
public void testOOBWritesAndReadSucceed() throws Exception {
Configuration conf = this.getRawConfiguration();
conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, true);
final AzureBlobFileSystem fs = getFileSystem(conf);
int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
byte[] bytesToRead = new byte[readBufferSize];
final byte[] b = new byte[2 * readBufferSize];
new Random().nextBytes(b);
final Path testFilePath = new Path(methodName.getMethodName());
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
writeStream.write(b);
writeStream.flush();
}
try (FSDataInputStream readStream = fs.open(testFilePath)) {
// Read
assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
// Concurrent write
try (FSDataOutputStream writeStream = fs.create(testFilePath)) {
writeStream.write(b);
writeStream.flush();
}
assertEquals(readBufferSize, readStream.read(bytesToRead, 0, readBufferSize));
}
}
@Test
public void testWriteWithBufferOffset() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();