diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java index 2f2f5469bbc..87e074fa073 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpQueryParams.java @@ -35,6 +35,7 @@ public final class HttpQueryParams { public static final String QUERY_PARAM_POSITION = "position"; public static final String QUERY_PARAM_TIMEOUT = "timeout"; public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData"; + public static final String QUERY_PARAM_CLOSE = "close"; public static final String QUERY_PARAM_UPN = "upn"; private HttpQueryParams() {} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 863ff548765..0b9ad7a6f79 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -290,7 +290,7 @@ public class AbfsClient { return op; } - public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) + public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, boolean isClose) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use @@ -302,6 +302,7 @@ public class AbfsClient { abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 85db7740c55..56fe0b11f56 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -200,7 +200,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa @Override public void hsync() throws IOException { if (supportFlush) { - flushInternal(); + flushInternal(false); } } @@ -211,7 +211,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa @Override public void hflush() throws IOException { if (supportFlush) { - flushInternal(); + flushInternal(false); } } @@ -230,7 +230,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa } try { - flushInternal(); + flushInternal(true); threadExecutor.shutdown(); } finally { lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); @@ -244,10 +244,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa } } - private synchronized void flushInternal() throws IOException { + private synchronized void flushInternal(boolean isClose) throws IOException { maybeThrowLastError(); writeCurrentBufferToService(); - flushWrittenBytesToService(); + flushWrittenBytesToService(isClose); } private synchronized void flushInternalAsync() throws IOException { @@ -288,7 +288,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa shrinkWriteOperationQueue(); } - private synchronized void flushWrittenBytesToService() throws IOException { + private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException { for (WriteOperation writeOperation : writeOperations) { try { writeOperation.task.get(); @@ -306,21 +306,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa throw lastError; } } - flushWrittenBytesToServiceInternal(position, false); + flushWrittenBytesToServiceInternal(position, false, isClose); } private synchronized void flushWrittenBytesToServiceAsync() throws IOException { shrinkWriteOperationQueue(); if (this.lastTotalAppendOffset > this.lastFlushOffset) { - this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true); + this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true, + false/*Async flush on close not permitted*/); } } private synchronized void flushWrittenBytesToServiceInternal(final long offset, - final boolean retainUncommitedData) throws IOException { + final boolean retainUncommitedData, final boolean isClose) throws IOException { try { - client.flush(path, offset, retainUncommitedData); + client.flush(path, offset, retainUncommitedData, isClose); } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {