From 8031c66295b530dcaae9e00d4f656330bc3b3952 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 21 Apr 2020 21:57:29 +0530 Subject: [PATCH] HADOOP-16965. Refactor abfs stream configuration. (#1956) Contributed by Mukund Thakur. --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 30 +++++--- .../fs/azurebfs/services/AbfsInputStream.java | 20 +++--- .../services/AbfsInputStreamContext.java | 70 +++++++++++++++++++ .../azurebfs/services/AbfsOutputStream.java | 19 +++-- .../services/AbfsOutputStreamContext.java | 68 ++++++++++++++++++ .../azurebfs/services/AbfsStreamContext.java | 26 +++++++ 6 files changed, 204 insertions(+), 29 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index a330da40cbf..6b194a41de2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -81,7 +81,9 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; @@ -415,12 +417,18 @@ public class AzureBlobFileSystemStore implements Closeable { statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamContext()); } } + private AbfsOutputStreamContext populateAbfsOutputStreamContext() { + return new AbfsOutputStreamContext() + .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) + .enableFlush(abfsConfiguration.isFlushEnabled()) + .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) + .build(); + } + public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) { @@ -466,11 +474,19 @@ public class AzureBlobFileSystemStore implements Closeable { // Add statistics for InputStream return new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), - abfsConfiguration.getTolerateOobAppends(), eTag); + populateAbfsInputStreamContext(), + eTag); } } + private AbfsInputStreamContext populateAbfsInputStreamContext() { + return new AbfsInputStreamContext() + .withReadBufferSize(abfsConfiguration.getReadBufferSize()) + .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) + .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .build(); + } + public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws AzureBlobFileSystemException { try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) { @@ -502,9 +518,7 @@ public class AzureBlobFileSystemStore implements Closeable { statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), offset, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamContext()); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 0c06014a408..05c093a880d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -61,21 +61,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private boolean closed = false; public AbfsInputStream( - final AbfsClient client, - final Statistics statistics, - final String path, - final long contentLength, - final int bufferSize, - final int readAheadQueueDepth, - final boolean tolerateOobAppends, - final String eTag) { + final AbfsClient client, + final Statistics statistics, + final String path, + final long contentLength, + final AbfsInputStreamContext abfsInputStreamContext, + final String eTag) { this.client = client; this.statistics = statistics; this.path = path; this.contentLength = contentLength; - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.tolerateOobAppends = tolerateOobAppends; + this.bufferSize = abfsInputStreamContext.getReadBufferSize(); + this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); + this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadEnabled = true; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java new file mode 100644 index 00000000000..cba71910169 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra input stream configs. + */ +public class AbfsInputStreamContext extends AbfsStreamContext { + + private int readBufferSize; + + private int readAheadQueueDepth; + + private boolean tolerateOobAppends; + + public AbfsInputStreamContext() { + } + + public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) { + this.readBufferSize = readBufferSize; + return this; + } + + public AbfsInputStreamContext withReadAheadQueueDepth( + final int readAheadQueueDepth) { + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) + ? readAheadQueueDepth + : Runtime.getRuntime().availableProcessors(); + return this; + } + + public AbfsInputStreamContext withTolerateOobAppends( + final boolean tolerateOobAppends) { + this.tolerateOobAppends = tolerateOobAppends; + return this; + } + + public AbfsInputStreamContext build() { + // Validation of parameters to be done here. + return this; + } + + public int getReadBufferSize() { + return readBufferSize; + } + + public int getReadAheadQueueDepth() { + return readAheadQueueDepth; + } + + public boolean isTolerateOobAppends() { + return tolerateOobAppends; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index e943169a03e..f29cc4afe82 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -82,23 +82,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final Statistics statistics; public AbfsOutputStream( - final AbfsClient client, - final Statistics statistics, - final String path, - final long position, - final int bufferSize, - final boolean supportFlush, - final boolean disableOutputStreamFlush) { + final AbfsClient client, + final Statistics statistics, + final String path, + final long position, + AbfsOutputStreamContext abfsOutputStreamContext) { this.client = client; this.statistics = statistics; this.path = path; this.position = position; this.closed = false; - this.supportFlush = supportFlush; - this.disableOutputStreamFlush = disableOutputStreamFlush; + this.supportFlush = abfsOutputStreamContext.isEnableFlush(); + this.disableOutputStreamFlush = abfsOutputStreamContext + .isDisableOutputStreamFlush(); this.lastError = null; this.lastFlushOffset = 0; - this.bufferSize = bufferSize; + this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java new file mode 100644 index 00000000000..0be97c5d9f1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra output stream configs. + */ +public class AbfsOutputStreamContext extends AbfsStreamContext { + + private int writeBufferSize; + + private boolean enableFlush; + + private boolean disableOutputStreamFlush; + + public AbfsOutputStreamContext() { + } + + public AbfsOutputStreamContext withWriteBufferSize( + final int writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + public AbfsOutputStreamContext enableFlush(final boolean enableFlush) { + this.enableFlush = enableFlush; + return this; + } + + public AbfsOutputStreamContext disableOutputStreamFlush( + final boolean disableOutputStreamFlush) { + this.disableOutputStreamFlush = disableOutputStreamFlush; + return this; + } + + public AbfsOutputStreamContext build() { + // Validation of parameters to be done here. + return this; + } + + public int getWriteBufferSize() { + return writeBufferSize; + } + + public boolean isEnableFlush() { + return enableFlush; + } + + public boolean isDisableOutputStreamFlush() { + return disableOutputStreamFlush; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java new file mode 100644 index 00000000000..ee77f595fed --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Base stream configuration class which is going + * to store common configs among input and output streams. + */ +public abstract class AbfsStreamContext { +}