From 745a6c1e69b3699f6496a146afc48824dd735461 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 26 Mar 2020 15:09:13 +0000 Subject: [PATCH] Revert "HADOOP-16818. ABFS: Combine append+flush calls for blockblob & appendblob" This reverts commit 3612317038196ee0cb6d7204056d54b7a7ed8bf7. Change-Id: Ie0d36f25de0b55a937894f4d9963c495bae0576a --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 16 - .../fs/azurebfs/AzureBlobFileSystemStore.java | 73 +--- .../azurebfs/constants/AbfsHttpConstants.java | 1 - .../azurebfs/constants/ConfigurationKeys.java | 5 - .../constants/FileSystemConfigurations.java | 2 - .../azurebfs/constants/HttpQueryParams.java | 2 - .../fs/azurebfs/services/AbfsClient.java | 12 +- .../azurebfs/services/AbfsOutputStream.java | 79 +--- .../hadoop-azure/src/site/markdown/abfs.md | 4 - .../azurebfs/ITestAzureBlobFileSystemE2E.java | 3 - .../services/TestAbfsOutputStream.java | 407 ------------------ 11 files changed, 34 insertions(+), 570 deletions(-) delete mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 548587539fd..61fe3d8d6d2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -143,10 +143,6 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) private String azureAtomicDirs; - @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APPEND_BLOB_KEY, - DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES) - private String azureAppendBlobDirs; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; @@ -167,10 +163,6 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH) private boolean disableOutputStreamFlush; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_APPEND_WITH_FLUSH, - DefaultValue = DEFAULT_ENABLE_APPEND_WITH_FLUSH) - private boolean enableAppendWithFlush; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING, DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) private boolean enableAutoThrottling; @@ -457,10 +449,6 @@ public class AbfsConfiguration{ return this.azureAtomicDirs; } - public String getAppendBlobDirs() { - return this.azureAppendBlobDirs; - } - public boolean getCreateRemoteFileSystemDuringInitialization() { // we do not support creating the filesystem when AuthType is SAS return this.createRemoteFileSystemDuringInitialization @@ -483,10 +471,6 @@ public class AbfsConfiguration{ return this.disableOutputStreamFlush; } - public boolean isAppendWithFlushEnabled() { - return this.enableAppendWithFlush; - } - public boolean isAutoThrottlingEnabled() { return this.enableAutoThrottling; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 9268cf8af8c..bff0e455cf0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -137,11 +137,6 @@ public class AzureBlobFileSystemStore implements Closeable { private final IdentityTransformer identityTransformer; private final AbfsPerfTracker abfsPerfTracker; - /** - * The set of directories where we should store files as append blobs. - */ - private Set appendBlobDirSet; - public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration) throws IOException { this.uri = uri; @@ -182,22 +177,6 @@ public class AzureBlobFileSystemStore implements Closeable { initializeClient(uri, fileSystemName, accountName, useHttps); this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration()); LOG.trace("IdentityTransformer init complete"); - // Extract the directories that should contain append blobs - String appendBlobDirs = abfsConfiguration.getAppendBlobDirs(); - if (appendBlobDirs.trim().isEmpty()) { - this.appendBlobDirSet = new HashSet(); - } else { - this.appendBlobDirSet = new HashSet<>(Arrays.asList( - abfsConfiguration.getAppendBlobDirs().split(AbfsHttpConstants.COMMA))); - } - } - - /** - * Checks if the given key in Azure Storage should be stored as a page - * blob instead of block blob. - */ - public boolean isAppendBlobKey(String key) { - return isKeyForDirectorySet(key, appendBlobDirSet); } /** @@ -424,25 +403,18 @@ public class AzureBlobFileSystemStore implements Closeable { umask.toString(), isNamespaceEnabled); - boolean appendBlob = false; - if (isAppendBlobKey(path.toString())) { - appendBlob = true; - } - - client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite, - isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, - appendBlob); + final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite, + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null); + perfInfo.registerResult(op.getResult()).registerSuccess(true); return new AbfsOutputStream( - client, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - 0, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled(), - abfsConfiguration.isAppendWithFlushEnabled(), - appendBlob); + client, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + 0, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); } } @@ -458,8 +430,8 @@ public class AzureBlobFileSystemStore implements Closeable { isNamespaceEnabled); final AbfsRestOperation op = client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true, - isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null, false); + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null); perfInfo.registerResult(op.getResult()).registerSuccess(true); } } @@ -522,20 +494,13 @@ public class AzureBlobFileSystemStore implements Closeable { perfInfo.registerSuccess(true); - boolean appendBlob = false; - if (isAppendBlobKey(path.toString())) { - appendBlob = true; - } - return new AbfsOutputStream( - client, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - offset, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled(), - abfsConfiguration.isAppendWithFlushEnabled(), - appendBlob); + client, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + offset, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled(), + abfsConfiguration.isOutputStreamFlushDisabled()); } } @@ -1421,4 +1386,4 @@ public class AzureBlobFileSystemStore implements Closeable { AbfsClient getClient() { return this.client; } -} +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index e9b60c71153..c6ade9cb99d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -40,7 +40,6 @@ public final class AbfsHttpConstants { public static final String CHECK_ACCESS = "checkAccess"; public static final String GET_STATUS = "getStatus"; public static final String DEFAULT_TIMEOUT = "90"; - public static final String APPEND_BLOB_TYPE = "appendblob"; public static final String TOKEN_VERSION = "2"; public static final String JAVA_VERSION = "java.version"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 41cf20feada..a63e9535349 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -51,7 +51,6 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; - public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.key"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; /** Provides a config control to enable or disable ABFS Flush operations - * HFlush and HSync. Default is true. **/ @@ -62,10 +61,6 @@ public final class ConfigurationKeys { * documentation does not have such expectations of data being persisted. * Default value of this config is true. **/ public static final String FS_AZURE_DISABLE_OUTPUTSTREAM_FLUSH = "fs.azure.disable.outputstream.flush"; - /** Provides a config control to enable OutputStream AppendWithFlush API - * operations in AbfsOutputStream. - * Default value of this config is true. **/ - public static final String FS_AZURE_ENABLE_APPEND_WITH_FLUSH = "fs.azure.enable.appendwithflush"; public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix"; public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode"; /** Provides a config to enable/disable the checkAccess API. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index b88b24ce5b2..c6b308ed5f8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -55,12 +55,10 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; - public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = ""; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final boolean DEFAULT_ENABLE_FLUSH = true; public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; - public static final boolean DEFAULT_ENABLE_APPEND_WITH_FLUSH = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java index d81931d3409..9f735f729cb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java @@ -38,8 +38,6 @@ public final class HttpQueryParams { public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData"; public static final String QUERY_PARAM_CLOSE = "close"; public static final String QUERY_PARAM_UPN = "upn"; - public static final String QUERY_PARAM_FLUSH = "flush"; - public static final String QUERY_PARAM_BLOBTYPE = "blobtype"; private HttpQueryParams() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 98198598947..6e1de68b5de 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -119,6 +119,7 @@ public class AbfsClient implements Closeable { this.sasTokenProvider = sasTokenProvider; } + @Override public void close() throws IOException { if (tokenProvider instanceof Closeable) { IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); @@ -260,8 +261,7 @@ public class AbfsClient implements Closeable { } public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, - final String permission, final String umask, - final boolean appendBlob) throws AzureBlobFileSystemException { + final String permission, final String umask) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); if (!overwrite) { requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); @@ -277,9 +277,6 @@ public class AbfsClient implements Closeable { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); - if (appendBlob) { - abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE); - } String operation = isFile ? SASTokenProvider.CREATEFILE_OPERATION @@ -328,8 +325,7 @@ public class AbfsClient implements Closeable { } public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset, - final int length, boolean flush, boolean isClose) - throws AzureBlobFileSystemException { + final int length) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. @@ -339,8 +335,6 @@ public class AbfsClient implements Closeable { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_FLUSH, String.valueOf(flush)); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 57e42e07b29..7e9746d118c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -55,8 +55,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private boolean closed; private boolean supportFlush; private boolean disableOutputStreamFlush; - private boolean supportAppendWithFlush; - private boolean appendBlob; private volatile IOException lastError; private long lastFlushOffset; @@ -86,18 +84,13 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa final long position, final int bufferSize, final boolean supportFlush, - final boolean disableOutputStreamFlush, - final boolean supportAppendWithFlush, - final boolean appendBlob) { + final boolean disableOutputStreamFlush) { this.client = client; this.path = path; this.position = position; this.closed = false; - this.disableOutputStreamFlush = disableOutputStreamFlush; this.supportFlush = supportFlush; this.disableOutputStreamFlush = disableOutputStreamFlush; - this.supportAppendWithFlush = supportAppendWithFlush; - this.appendBlob = appendBlob; this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; @@ -106,6 +99,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa this.writeOperations = new ConcurrentLinkedDeque<>(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, @@ -176,7 +170,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa if (writableBytes <= numberOfBytesToWrite) { System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); bufferIndex += writableBytes; - writeCurrentBufferToService(false, false); + writeCurrentBufferToService(); currentOffset += writableBytes; numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; } else { @@ -274,16 +268,17 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private synchronized void flushInternal(boolean isClose) throws IOException { maybeThrowLastError(); - writeAndFlushWrittenBytesToService(isClose); + writeCurrentBufferToService(); + flushWrittenBytesToService(isClose); } private synchronized void flushInternalAsync() throws IOException { maybeThrowLastError(); - writeCurrentBufferToService(true, false); + writeCurrentBufferToService(); flushWrittenBytesToServiceAsync(); } - private synchronized void writeCurrentBufferToService(final boolean flush, final boolean isClose) throws IOException { + private synchronized void writeCurrentBufferToService() throws IOException { if (bufferIndex == 0) { return; } @@ -295,16 +290,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa final long offset = position; position += bytesLength; - if (this.appendBlob) { - client.append(path, offset, bytes, 0, - bytesLength, flush, isClose); - lastTotalAppendOffset += bytesLength; - if (flush) { - lastFlushOffset = lastTotalAppendOffset; - } - return; - } - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { waitForTaskToComplete(); } @@ -315,15 +300,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "writeCurrentBufferToService", "append")) { - if (flush) { - /* Append with Flush enabled should happen - * when all the data which was supposed to be - * appended has been sent and finished. - */ - while(lastTotalAppendOffset < lastFlushOffset); - } AbfsRestOperation op = client.append(path, offset, bytes, 0, - bytesLength, flush, isClose); + bytesLength); perfInfo.registerResult(op.getResult()); byteBufferPool.putBuffer(ByteBuffer.wrap(bytes)); perfInfo.registerSuccess(true); @@ -332,7 +310,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa } }); - writeOperations.add(new WriteOperation(job, offset, bytesLength, flush)); + writeOperations.add(new WriteOperation(job, offset, bytesLength)); // Try to shrink the queue shrinkWriteOperationQueue(); @@ -348,6 +326,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa throw new FileNotFoundException(ex.getMessage()); } } + if (ex.getCause() instanceof AzureBlobFileSystemException) { ex = (AzureBlobFileSystemException) ex.getCause(); } @@ -355,36 +334,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa throw lastError; } } - shrinkWriteOperationQueue(); - } - - private synchronized void completeExistingTasks() throws IOException { - for (WriteOperation writeOperation : writeOperations) { - try { - writeOperation.task.get(); - } catch (Exception ex) { - if (ex.getCause() instanceof AbfsRestOperationException) { - if (((AbfsRestOperationException) ex.getCause()).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - throw new FileNotFoundException(ex.getMessage()); - } - } - if (ex.getCause() instanceof AzureBlobFileSystemException) { - ex = (AzureBlobFileSystemException) ex.getCause(); - } - lastError = new IOException(ex); - throw lastError; - } - } - shrinkWriteOperationQueue(); - } - - private synchronized void writeAndFlushWrittenBytesToService(boolean isClose) throws IOException { - completeExistingTasks(); - writeCurrentBufferToService(supportAppendWithFlush, isClose); - completeExistingTasks(); - if (this.lastTotalAppendOffset > this.lastFlushOffset) { - flushWrittenBytesToServiceInternal(position, false, isClose); - } + flushWrittenBytesToServiceInternal(position, false, isClose); } private synchronized void flushWrittenBytesToServiceAsync() throws IOException { @@ -423,9 +373,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { writeOperations.peek().task.get(); lastTotalAppendOffset += writeOperations.peek().length; - if (writeOperations.peek().isFlush) { - lastFlushOffset = lastTotalAppendOffset; - } writeOperations.remove(); } } catch (Exception e) { @@ -458,9 +405,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final Future task; private final long startOffset; private final long length; - private final boolean isFlush; - WriteOperation(final Future task, final long startOffset, final long length, final boolean flush) { + WriteOperation(final Future task, final long startOffset, final long length) { Preconditions.checkNotNull(task, "task"); Preconditions.checkArgument(startOffset >= 0, "startOffset"); Preconditions.checkArgument(length >= 0, "length"); @@ -468,7 +414,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa this.task = task; this.startOffset = startOffset; this.length = length; - this.isFlush = flush; } } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 338a1fa7c7c..01c1fbd03b3 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -643,10 +643,6 @@ Consult the javadocs for `org.apache.hadoop.fs.azurebfs.constants.ConfigurationK `org.apache.hadoop.fs.azurebfs.AbfsConfiguration` for the full list of configuration options and their default values. -### Append Blob Directories Options -#### Config `fs.azure.appendblob.key` provides -an option for using append blob for the files prefixed by the config value. - ### Flush Options #### 1. Azure Blob File System Flush Options diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 00c4dd23fc2..ebc9c07e53e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -206,13 +206,10 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest { FSDataOutputStream stream = fs.create(testFilePath); assertTrue(fs.exists(testFilePath)); - stream.write(TEST_BYTE); fs.delete(testFilePath, true); assertFalse(fs.exists(testFilePath)); - AbfsConfiguration configuration = this.getConfiguration(); - // trigger flush call intercept(FileNotFoundException.class, () -> stream.close()); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java deleted file mode 100644 index 04d8cd22259..00000000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ /dev/null @@ -1,407 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.Random; - -import org.junit.Assert; -import org.junit.Test; - -import org.mockito.ArgumentCaptor; - -import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; -import org.apache.hadoop.conf.Configuration; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.anyLong; - -import static org.assertj.core.api.Assertions.assertThat; - -public final class TestAbfsOutputStream { - - private static final int bufferSize = 4096; - private static final int writeSize = 1000; - private static final String path = "~/testpath"; - private final String globalKey = "fs.azure.configuration"; - private final String accountName1 = "account1"; - private final String accountKey1 = globalKey + "." + accountName1; - private final String accountValue1 = "one"; - - /** - * The test verifies OutputStream shortwrite case(2000bytes write followed by flush, hflush, hsync) is making correct HTTP calls to the server - */ - @Test - public void verifyShortWriteRequest() throws Exception { - - AbfsClient client = mock(AbfsClient.class); - AbfsRestOperation op = mock(AbfsRestOperation.class); - AbfsConfiguration abfsConf; - final Configuration conf = new Configuration(); - conf.set(accountKey1, accountValue1); - abfsConf = new AbfsConfiguration(conf, accountName1); - AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); - when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op); - - AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false); - final byte[] b = new byte[writeSize]; - new Random().nextBytes(b); - out.write(b); - out.hsync(); - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acFlush = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acClose = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - - final byte[] b1 = new byte[2*writeSize]; - new Random().nextBytes(b1); - out.write(b1); - out.flush(); - out.hflush(); - - out.hsync(); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acFlush.capture(), acClose.capture()); - assertThat(Arrays.asList(path, path)).describedAs("Path of the requests").isEqualTo(acString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(writeSize))).describedAs("Write Position").isEqualTo(acLong.getAllValues()); - //flush=true, close=false, flush=true, close=false - assertThat(Arrays.asList(true, true)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues()); - assertThat(Arrays.asList(false, false)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues()); - assertThat(Arrays.asList(0,0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(writeSize, 2*writeSize)).describedAs("Buffer length").isEqualTo(acBufferLength.getAllValues()); - - } - - /** - * The test verifies OutputStream Write of writeSize(1000 bytes) followed by a close is making correct HTTP calls to the server - */ - @Test - public void verifyWriteRequest() throws Exception { - - AbfsClient client = mock(AbfsClient.class); - AbfsRestOperation op = mock(AbfsRestOperation.class); - AbfsConfiguration abfsConf; - final Configuration conf = new Configuration(); - conf.set(accountKey1, accountValue1); - abfsConf = new AbfsConfiguration(conf, accountName1); - AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); - - when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op); - - AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false); - final byte[] b = new byte[writeSize]; - new Random().nextBytes(b); - - for (int i = 0; i < 5; i++) { - out.write(b); - } - out.close(); - - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acFlush = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acClose = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acFlush.capture(), acClose.capture()); - assertThat(Arrays.asList(path, path)).describedAs("Path").isEqualTo(acString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize))).describedAs("Position").isEqualTo(acLong.getAllValues()); - //flush=false,close=false, flush=true,close=true - assertThat(Arrays.asList(false, true)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues()); - assertThat(Arrays.asList(false, true)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(bufferSize, 5*writeSize-bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); - - } - - /** - * The test verifies OutputStream Write of bufferSize(4KB) followed by a close is making correct HTTP calls to the server - */ - @Test - public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { - - AbfsClient client = mock(AbfsClient.class); - AbfsRestOperation op = mock(AbfsRestOperation.class); - AbfsConfiguration abfsConf; - final Configuration conf = new Configuration(); - conf.set(accountKey1, accountValue1); - abfsConf = new AbfsConfiguration(conf, accountName1); - AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); - - when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean())).thenReturn(op); - - AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false); - final byte[] b = new byte[bufferSize]; - new Random().nextBytes(b); - - for (int i = 0; i < 2; i++) { - out.write(b); - } - out.close(); - - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acFlush = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acClose = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acFlush.capture(), acClose.capture()); - assertThat(Arrays.asList(path, path)).describedAs("path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("Position").isEqualTo(new HashSet( - acLong.getAllValues())); - //flush=false, close=false, flush=false, close=false - assertThat(Arrays.asList(false, false)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues()); - assertThat(Arrays.asList(false, false)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); - - ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); - - verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture()); - assertThat(Arrays.asList(path)).describedAs("path").isEqualTo(acFlushString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(2*bufferSize))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); - assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); - assertThat(Arrays.asList(true)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); - - } - - /** - * The test verifies OutputStream Write of bufferSize(4KB) is making correct HTTP calls to the server - */ - @Test - public void verifyWriteRequestOfBufferSize() throws Exception { - - AbfsClient client = mock(AbfsClient.class); - AbfsRestOperation op = mock(AbfsRestOperation.class); - AbfsConfiguration abfsConf; - final Configuration conf = new Configuration(); - conf.set(accountKey1, accountValue1); - abfsConf = new AbfsConfiguration(conf, accountName1); - AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); - - when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op); - - AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false); - final byte[] b = new byte[bufferSize]; - new Random().nextBytes(b); - - for (int i = 0; i < 2; i++) { - out.write(b); - } - Thread.sleep(1000); - - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acFlush = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acClose = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acFlush.capture(), acClose.capture()); - assertThat(Arrays.asList(path, path)).describedAs("File Path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("Position in file").isEqualTo( - new HashSet(acLong.getAllValues())); - //flush=false, close=false, flush=false, close=false - assertThat(Arrays.asList(false, false)).describedAs("flush flag").isEqualTo(acFlush.getAllValues()); - assertThat(Arrays.asList(false, false)).describedAs("close flag").isEqualTo(acClose.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("buffer offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("buffer length").isEqualTo(acBufferLength.getAllValues()); - - } - - /** - * The test verifies OutputStream Write of bufferSize(4KB) on a AppendBlob based stream is making correct HTTP calls to the server - */ - @Test - public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { - - AbfsClient client = mock(AbfsClient.class); - AbfsRestOperation op = mock(AbfsRestOperation.class); - AbfsConfiguration abfsConf; - final Configuration conf = new Configuration(); - conf.set(accountKey1, accountValue1); - abfsConf = new AbfsConfiguration(conf, accountName1); - AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); - - when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op); - - AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false); - final byte[] b = new byte[bufferSize]; - new Random().nextBytes(b); - - for (int i = 0; i < 2; i++) { - out.write(b); - } - Thread.sleep(1000); - - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acFlush = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acClose = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acFlush.capture(), acClose.capture()); - assertThat(Arrays.asList(path, path)).describedAs("File Path").isEqualTo(acString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize))).describedAs("File Position").isEqualTo(acLong.getAllValues()); - //flush=false, close=false, flush=false, close=false - assertThat(Arrays.asList(false, false)).describedAs("Flush Flag").isEqualTo(acFlush.getAllValues()); - assertThat(Arrays.asList(false, false)).describedAs("Close Flag").isEqualTo(acClose.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); - - } - - /** - * The test verifies OutputStream Write of bufferSize(4KB) followed by a hflush call is making correct HTTP calls to the server - */ - @Test - public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { - - AbfsClient client = mock(AbfsClient.class); - AbfsRestOperation op = mock(AbfsRestOperation.class); - AbfsConfiguration abfsConf; - final Configuration conf = new Configuration(); - conf.set(accountKey1, accountValue1); - abfsConf = new AbfsConfiguration(conf, accountName1); - AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); - - when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean())).thenReturn(op); - - AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false); - final byte[] b = new byte[bufferSize]; - new Random().nextBytes(b); - - for (int i = 0; i < 2; i++) { - out.write(b); - } - out.hflush(); - - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acFlush = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acClose = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acFlush.capture(), acClose.capture()); - assertThat(Arrays.asList(path, path)).describedAs("File Path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("File Position").isEqualTo( - new HashSet(acLong.getAllValues())); - //flush=false, close=false, flush=false, close=false - assertThat(Arrays.asList(false, false)).describedAs("Flush Flag").isEqualTo(acFlush.getAllValues()); - assertThat(Arrays.asList(false, false)).describedAs("Close Flag").isEqualTo(acClose.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); - - ArgumentCaptor acFlushString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acFlushLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); - - verify(client, times(1)).flush(acFlushString.capture(), acFlushLong.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture()); - assertThat(Arrays.asList(path)).describedAs("path").isEqualTo(acFlushString.getAllValues()); - assertThat(Arrays.asList(Long.valueOf(2*bufferSize))).describedAs("position").isEqualTo(acFlushLong.getAllValues()); - assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); - assertThat(Arrays.asList(false)).describedAs("Close flag").isEqualTo(acFlushClose.getAllValues()); - - } - - /** - * The test verifies OutputStream Write of bufferSize(4KB) followed by a flush call is making correct HTTP calls to the server - */ - @Test - public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { - - AbfsClient client = mock(AbfsClient.class); - AbfsRestOperation op = mock(AbfsRestOperation.class); - AbfsConfiguration abfsConf; - final Configuration conf = new Configuration(); - conf.set(accountKey1, accountValue1); - abfsConf = new AbfsConfiguration(conf, accountName1); - AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); - when(client.getAbfsPerfTracker()).thenReturn(tracker); - when(client.append(anyString(), anyLong(), any(byte[].class), anyInt(), anyInt(), anyBoolean(), anyBoolean())).thenReturn(op); - - AbfsOutputStream out = new AbfsOutputStream(client, path, 0, bufferSize, true, false, true, false); - final byte[] b = new byte[bufferSize]; - new Random().nextBytes(b); - - for (int i = 0; i < 2; i++) { - out.write(b); - } - out.flush(); - Thread.sleep(1000); - - ArgumentCaptor acString = ArgumentCaptor.forClass(String.class); - ArgumentCaptor acLong = ArgumentCaptor.forClass(Long.class); - ArgumentCaptor acBufferOffset = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acBufferLength = ArgumentCaptor.forClass(Integer.class); - ArgumentCaptor acFlush = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acClose = ArgumentCaptor.forClass(Boolean.class); - ArgumentCaptor acByteArray = ArgumentCaptor.forClass(byte[].class); - - verify(client, times(2)).append(acString.capture(), acLong.capture(), acByteArray.capture(), acBufferOffset.capture(), acBufferLength.capture(), - acFlush.capture(), acClose.capture()); - assertThat(Arrays.asList(path, path)).describedAs("path").isEqualTo(acString.getAllValues()); - assertThat(new HashSet(Arrays.asList(Long.valueOf(0), Long.valueOf(bufferSize)))).describedAs("Position").isEqualTo( - new HashSet(acLong.getAllValues())); - //flush=false, close=false, flush=false, close=false - assertThat(Arrays.asList(false, false)).describedAs("Flush = true/false").isEqualTo(acFlush.getAllValues()); - assertThat(Arrays.asList(false, false)).describedAs("Close = true/false").isEqualTo(acClose.getAllValues()); - assertThat(Arrays.asList(0, 0)).describedAs("Buffer Offset").isEqualTo(acBufferOffset.getAllValues()); - assertThat(Arrays.asList(bufferSize, bufferSize)).describedAs("Buffer Length").isEqualTo(acBufferLength.getAllValues()); - - } -}