diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java index dcfff2fbe37..14ddb02fc4a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SyncableDataOutputStream.java @@ -22,9 +22,12 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; -import org.apache.hadoop.classification.InterfaceAudience; /** * Support the Syncable interface on top of a DataOutputStream. @@ -35,6 +38,8 @@ import org.apache.hadoop.classification.InterfaceAudience; public class SyncableDataOutputStream extends DataOutputStream implements Syncable, StreamCapabilities { + private static final Logger LOG = LoggerFactory.getLogger(SyncableDataOutputStream.class); + public SyncableDataOutputStream(OutputStream out) { super(out); } @@ -70,4 +75,34 @@ public class SyncableDataOutputStream extends DataOutputStream ((Syncable) out).hsync(); } } + + @Override + public void close() throws IOException { + IOException ioeFromFlush = null; + try { + flush(); + } catch (IOException e) { + ioeFromFlush = e; + throw e; + } finally { + try { + this.out.close(); + } catch (IOException e) { + // If there was an Exception during flush(), the Azure SDK will throw back the + // same when we call close on the same stream. When try and finally both throw + // Exception, Java will use Throwable#addSuppressed for one of the Exception so + // that the caller will get one exception back. When within this, if both + // Exceptions are equal, it will throw back IllegalStateException. This makes us + // to throw back a non IOE. The below special handling is to avoid this. + if (ioeFromFlush == e) { + // Do nothing.. + // The close() call gave back the same IOE which flush() gave. Just swallow it + LOG.debug("flush() and close() throwing back same Exception. Just swallowing the latter", e); + } else { + // Let Java handle 2 different Exceptions been thrown from try and finally. + throw e; + } + } + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestSyncableDataOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestSyncableDataOutputStream.java new file mode 100644 index 00000000000..c8c6d93f49d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestSyncableDataOutputStream.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.io.OutputStream; + +import org.junit.Test; + +import org.apache.hadoop.test.LambdaTestUtils; + +public class TestSyncableDataOutputStream { + + @Test + public void testCloseWhenFlushThrowingIOException() throws Exception { + MockOutputStream out = new MockOutputStream(); + SyncableDataOutputStream sdos = new SyncableDataOutputStream(out); + out.flushThrowIOE = true; + LambdaTestUtils.intercept(IOException.class, "An IOE from flush", () -> sdos.close()); + MockOutputStream out2 = new MockOutputStream(); + out2.flushThrowIOE = true; + LambdaTestUtils.intercept(IOException.class, "An IOE from flush", () -> { + try (SyncableDataOutputStream sdos2 = new SyncableDataOutputStream(out2)) { + } + }); + } + + private static class MockOutputStream extends OutputStream { + + private boolean flushThrowIOE = false; + private IOException lastException = null; + + @Override + public void write(int arg0) throws IOException { + + } + + @Override + public void flush() throws IOException { + if (this.flushThrowIOE) { + this.lastException = new IOException("An IOE from flush"); + throw this.lastException; + } + } + + @Override + public void close() throws IOException { + if (this.lastException != null) { + throw this.lastException; + } + } + } +}