diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index bfe66de6fee..3c520b80e3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -61,6 +61,11 @@ public interface AsyncFSOutput extends Closeable { */ int buffered(); + /** + * Whether the stream is broken. + */ + boolean isBroken(); + /** * Return current pipeline. Empty array if no pipeline. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 583759b66d8..43ddfb06cbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -18,26 +18,15 @@ package org.apache.hadoop.hbase.io.asyncfs; import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.io.ByteArrayOutputStream; -import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; @@ -62,114 +51,23 @@ public final class AsyncFSOutputHelper { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass); } - final FSDataOutputStream fsOut; + final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); if (createParent) { - fsOut = fs.create(f, overwrite, bufferSize, replication, blockSize, null); + out = fs.create(f, overwrite, bufferSize, replication, blockSize, null); } else { - fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); + out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); } // After we create the stream but before we attempt to use it at all // ensure that we can provide the level of data safety we're configured // to provide. if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) && - !(CommonFSUtils.hasCapability(fsOut, "hflush") && - CommonFSUtils.hasCapability(fsOut, "hsync"))) { + !(CommonFSUtils.hasCapability(out, "hflush") && + CommonFSUtils.hasCapability(out, "hsync"))) { + out.close(); throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync"); } - final ExecutorService flushExecutor = - Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build()); - return new AsyncFSOutput() { - - private final ByteArrayOutputStream out = new ByteArrayOutputStream(); - - @Override - public void write(byte[] b, int off, int len) { - out.write(b, off, len); - } - - @Override - public void write(byte[] b) { - write(b, 0, b.length); - } - - @Override - public void writeInt(int i) { - out.writeInt(i); - } - - @Override - public void write(ByteBuffer bb) { - out.write(bb, bb.position(), bb.remaining()); - } - - @Override - public void recoverAndClose(CancelableProgressable reporter) throws IOException { - fsOut.close(); - } - - @Override - public DatanodeInfo[] getPipeline() { - return new DatanodeInfo[0]; - } - - private void flush0(CompletableFuture future, boolean sync) { - try { - synchronized (out) { - fsOut.write(out.getBuffer(), 0, out.size()); - out.reset(); - } - } catch (IOException e) { - eventLoopGroup.next().execute(() -> future.completeExceptionally(e)); - return; - } - try { - if (sync) { - fsOut.hsync(); - } else { - fsOut.hflush(); - } - long pos = fsOut.getPos(); - eventLoopGroup.next().execute(() -> future.complete(pos)); - } catch (IOException e) { - eventLoopGroup.next().execute(() -> future.completeExceptionally(e)); - } - } - - @Override - public CompletableFuture flush(boolean sync) { - CompletableFuture future = new CompletableFuture<>(); - flushExecutor.execute(() -> flush0(future, sync)); - return future; - } - - @Override - public void close() throws IOException { - try { - flushExecutor.submit(() -> { - synchronized (out) { - fsOut.write(out.getBuffer(), 0, out.size()); - out.reset(); - } - return null; - }).get(); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } catch (ExecutionException e) { - Throwables.propagateIfPossible(e.getCause(), IOException.class); - throw new IOException(e.getCause()); - } finally { - flushExecutor.shutdown(); - } - fsOut.close(); - } - - @Override - public int buffered() { - return out.size(); - } - }; + return new WrapperAsyncFSOutput(f, out); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index b874aa73cd1..6d376f35f7f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -380,8 +380,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { @Override public DatanodeInfo[] getPipeline() { - State state = this.state; - return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0]; + return locations; } private void flushBuffer(CompletableFuture future, ByteBuf dataBuf, @@ -569,4 +568,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { block.setNumBytes(ackedBlockLength); completeFile(client, namenode, src, clientName, block, fileId); } + + @Override + public boolean isBroken() { + return state == State.BROKEN; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java new file mode 100644 index 00000000000..219e865e672 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * An {@link AsyncFSOutput} wraps a {@link FSDataOutputStream}. + */ +@InterfaceAudience.Private +public class WrapperAsyncFSOutput implements AsyncFSOutput { + + private final FSDataOutputStream out; + + private ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + + private final ExecutorService executor; + + public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) { + this.out = out; + this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("AsyncFSOutputFlusher-" + file.toString().replace("%", "%%")).build()); + } + + @Override + public void write(byte[] b) { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) { + buffer.write(b, off, len); + } + + @Override + public void writeInt(int i) { + buffer.writeInt(i); + } + + @Override + public void write(ByteBuffer bb) { + buffer.write(bb, bb.position(), bb.remaining()); + } + + @Override + public int buffered() { + return buffer.size(); + } + + @Override + public DatanodeInfo[] getPipeline() { + return new DatanodeInfo[0]; + } + + private void flush0(CompletableFuture future, ByteArrayOutputStream buffer, boolean sync) { + try { + if (buffer.size() > 0) { + out.write(buffer.getBuffer(), 0, buffer.size()); + if (sync) { + out.hsync(); + } else { + out.hflush(); + } + } + future.complete(out.getPos()); + } catch (IOException e) { + future.completeExceptionally(e); + return; + } + } + + @Override + public CompletableFuture flush(boolean sync) { + CompletableFuture future = new CompletableFuture<>(); + ByteArrayOutputStream buffer = this.buffer; + this.buffer = new ByteArrayOutputStream(); + executor.execute(() -> flush0(future, buffer, sync)); + return future; + } + + @Override + public void recoverAndClose(CancelableProgressable reporter) throws IOException { + executor.shutdown(); + out.close(); + } + + @Override + public void close() throws IOException { + Preconditions.checkState(buffer.size() == 0, "should call flush first before calling close"); + executor.shutdown(); + out.close(); + } + + @Override + public boolean isBroken() { + return false; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 4ccfdf34a92..08b27651da7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -751,6 +751,6 @@ public class AsyncFSWAL extends AbstractFSWAL { // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so // typically there is no 'low replication' state, only a 'broken' state. AsyncFSOutput output = this.fsOut; - return output != null && output.getPipeline().length == 0; + return output != null && output.isBroken(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 48c1cbf683b..6c190374657 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -39,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -102,8 +103,8 @@ public class TestFanOutOneBlockAsyncDFSOutput { for (;;) { try { FanOutOneBlockAsyncDFSOutput out = - FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"), - true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS); + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"), + true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS); out.close(); break; } catch (IOException e) { @@ -112,8 +113,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { } } - static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f, - final FanOutOneBlockAsyncDFSOutput out) + static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out) throws IOException, InterruptedException, ExecutionException { List> futures = new ArrayList<>(); byte[] b = new byte[10]; @@ -130,10 +130,10 @@ public class TestFanOutOneBlockAsyncDFSOutput { assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue()); } out.close(); - assertEquals(b.length * 10, dfs.getFileStatus(f).getLen()); + assertEquals(b.length * 10, fs.getFileStatus(f).getLen()); byte[] actual = new byte[b.length]; rand.setSeed(12345); - try (FSDataInputStream in = dfs.open(f)) { + try (FSDataInputStream in = fs.open(f)) { for (int i = 0; i < 10; i++) { in.readFully(actual); rand.nextBytes(b); @@ -149,7 +149,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); - writeAndVerify(eventLoop, FS, f, out); + writeAndVerify(FS, f, out); } @Test @@ -193,7 +193,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. - writeAndVerify(eventLoop, FS, f, out); + writeAndVerify(FS, f, out); } /** @@ -220,7 +220,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); xceiverServerDaemonField.setAccessible(true); Class xceiverServerClass = - Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); + Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); numPeersMethod.setAccessible(true); // make one datanode broken diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index 0453f1c88ac..026c5c1a630 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -17,19 +17,9 @@ */ package org.apache.hadoop.hbase.io.asyncfs; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; - -import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel; - import java.io.IOException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; @@ -40,6 +30,11 @@ import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel; + @Category({ MiscTests.class, SmallTests.class }) public class TestLocalAsyncOutput { @@ -62,16 +57,6 @@ public class TestLocalAsyncOutput { FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS); - byte[] b = new byte[10]; - ThreadLocalRandom.current().nextBytes(b); - out.write(b); - assertEquals(b.length, out.flush(true).get().longValue()); - out.close(); - assertEquals(b.length, fs.getFileStatus(f).getLen()); - byte[] actual = new byte[b.length]; - try (FSDataInputStream in = fs.open(f)) { - in.readFully(actual); - } - assertArrayEquals(b, actual); + TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 0e2b6d4df73..217ba608e97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -241,7 +241,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS); - TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, file, out); + TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java index a5f536f9e1a..6b5b837d593 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java @@ -17,16 +17,13 @@ */ package org.apache.hadoop.hbase.io.asyncfs; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.Test; import org.junit.experimental.categories.Category; -/** - * - */ @Category({ MiscTests.class, SmallTests.class }) public class TestSendBufSizePredictor {