From 6b6f8cc2bedefc98028d875398ce022edaf77933 Mon Sep 17 00:00:00 2001 From: Thomas Marquardt Date: Thu, 23 Aug 2018 20:43:52 +0000 Subject: [PATCH] HADOOP 15688. ABFS: InputStream wrapped in FSDataInputStream twice. Contributed by Sean Mackrory. --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 58df914ec8b..fc60127c51e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azurebfs; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.MalformedURLException; import java.net.URI; @@ -50,8 +49,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -251,11 +248,12 @@ public class AzureBlobFileSystemStore { isNamespaceEnabled ? getOctalNotation(permission) : null, isNamespaceEnabled ? getOctalNotation(umask) : null); - final OutputStream outputStream; - outputStream = new FSDataOutputStream( - new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); - return outputStream; + return new AbfsOutputStream( + client, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + 0, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled()); } public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) @@ -273,7 +271,7 @@ public class AzureBlobFileSystemStore { isNamespaceEnabled ? getOctalNotation(umask) : null); } - public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) + public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { LOG.debug("openFileForRead filesystem: {} path: {}", client.getFileSystem(), @@ -294,10 +292,9 @@ public class AzureBlobFileSystemStore { } // Add statistics for InputStream - return new FSDataInputStream( - new AbfsInputStream(client, statistics, - AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag)); + return new AbfsInputStream(client, statistics, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, + abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag); } public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws @@ -322,11 +319,12 @@ public class AzureBlobFileSystemStore { final long offset = overwrite ? 0 : contentLength; - final OutputStream outputStream; - outputStream = new FSDataOutputStream( - new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - offset, abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); - return outputStream; + return new AbfsOutputStream( + client, + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + offset, + abfsConfiguration.getWriteBufferSize(), + abfsConfiguration.isFlushEnabled()); } public void rename(final Path source, final Path destination) throws