HADOOP-16182. Update abfs storage back-end with "close" flag when application is done writing to a file.

Contributed by Vishwajeet Dusane.
This commit is contained in:
Vishwajeet Dusane 2019-03-18 13:18:08 +00:00 committed by Yuan Gao
parent 024a694684
commit 662a75d65b
3 changed files with 14 additions and 11 deletions

View File

@ -35,6 +35,7 @@ public final class HttpQueryParams {
public static final String QUERY_PARAM_POSITION = "position"; public static final String QUERY_PARAM_POSITION = "position";
public static final String QUERY_PARAM_TIMEOUT = "timeout"; public static final String QUERY_PARAM_TIMEOUT = "timeout";
public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData"; public static final String QUERY_PARAM_RETAIN_UNCOMMITTED_DATA = "retainUncommittedData";
public static final String QUERY_PARAM_CLOSE = "close";
public static final String QUERY_PARAM_UPN = "upn"; public static final String QUERY_PARAM_UPN = "upn";
private HttpQueryParams() {} private HttpQueryParams() {}

View File

@ -290,7 +290,7 @@ public class AbfsClient {
return op; return op;
} }
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData, boolean isClose)
throws AzureBlobFileSystemException { throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use // JDK7 does not support PATCH, so to workaround the issue we will use
@ -302,6 +302,7 @@ public class AbfsClient {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation( final AbfsRestOperation op = new AbfsRestOperation(

View File

@ -200,7 +200,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
@Override @Override
public void hsync() throws IOException { public void hsync() throws IOException {
if (supportFlush) { if (supportFlush) {
flushInternal(); flushInternal(false);
} }
} }
@ -211,7 +211,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
@Override @Override
public void hflush() throws IOException { public void hflush() throws IOException {
if (supportFlush) { if (supportFlush) {
flushInternal(); flushInternal(false);
} }
} }
@ -230,7 +230,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
} }
try { try {
flushInternal(); flushInternal(true);
threadExecutor.shutdown(); threadExecutor.shutdown();
} finally { } finally {
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
@ -244,10 +244,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
} }
} }
private synchronized void flushInternal() throws IOException { private synchronized void flushInternal(boolean isClose) throws IOException {
maybeThrowLastError(); maybeThrowLastError();
writeCurrentBufferToService(); writeCurrentBufferToService();
flushWrittenBytesToService(); flushWrittenBytesToService(isClose);
} }
private synchronized void flushInternalAsync() throws IOException { private synchronized void flushInternalAsync() throws IOException {
@ -288,7 +288,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
shrinkWriteOperationQueue(); shrinkWriteOperationQueue();
} }
private synchronized void flushWrittenBytesToService() throws IOException { private synchronized void flushWrittenBytesToService(boolean isClose) throws IOException {
for (WriteOperation writeOperation : writeOperations) { for (WriteOperation writeOperation : writeOperations) {
try { try {
writeOperation.task.get(); writeOperation.task.get();
@ -306,21 +306,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
throw lastError; throw lastError;
} }
} }
flushWrittenBytesToServiceInternal(position, false); flushWrittenBytesToServiceInternal(position, false, isClose);
} }
private synchronized void flushWrittenBytesToServiceAsync() throws IOException { private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
shrinkWriteOperationQueue(); shrinkWriteOperationQueue();
if (this.lastTotalAppendOffset > this.lastFlushOffset) { if (this.lastTotalAppendOffset > this.lastFlushOffset) {
this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true); this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
false/*Async flush on close not permitted*/);
} }
} }
private synchronized void flushWrittenBytesToServiceInternal(final long offset, private synchronized void flushWrittenBytesToServiceInternal(final long offset,
final boolean retainUncommitedData) throws IOException { final boolean retainUncommitedData, final boolean isClose) throws IOException {
try { try {
client.flush(path, offset, retainUncommitedData); client.flush(path, offset, retainUncommitedData, isClose);
} catch (AzureBlobFileSystemException ex) { } catch (AzureBlobFileSystemException ex) {
if (ex instanceof AbfsRestOperationException) { if (ex instanceof AbfsRestOperationException) {
if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { if (((AbfsRestOperationException) ex).getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {