HBASE-19513 Fix the wrapped AsyncFSOutput implementation

This commit is contained in:
zhangduo 2017-12-14 21:04:10 +08:00
parent 6ab8ce9829
commit 661491b56b
9 changed files with 163 additions and 147 deletions

View File

@ -61,6 +61,11 @@ public interface AsyncFSOutput extends Closeable {
*/
int buffered();
/**
* Whether the stream is broken.
*/
boolean isBroken();
/**
* Return current pipeline. Empty array if no pipeline.
*/

View File

@ -18,26 +18,15 @@
package org.apache.hadoop.hbase.io.asyncfs;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
@ -62,114 +51,23 @@ public final class AsyncFSOutputHelper {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
}
final FSDataOutputStream fsOut;
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
if (createParent) {
fsOut = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
} else {
fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
}
// After we create the stream but before we attempt to use it at all
// ensure that we can provide the level of data safety we're configured
// to provide.
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
CommonFSUtils.hasCapability(fsOut, "hsync"))) {
!(CommonFSUtils.hasCapability(out, "hflush") &&
CommonFSUtils.hasCapability(out, "hsync"))) {
out.close();
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
}
final ExecutorService flushExecutor =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
return new AsyncFSOutput() {
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
@Override
public void write(byte[] b, int off, int len) {
out.write(b, off, len);
}
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
@Override
public void writeInt(int i) {
out.writeInt(i);
}
@Override
public void write(ByteBuffer bb) {
out.write(bb, bb.position(), bb.remaining());
}
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
fsOut.close();
}
@Override
public DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}
private void flush0(CompletableFuture<Long> future, boolean sync) {
try {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
} catch (IOException e) {
eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
return;
}
try {
if (sync) {
fsOut.hsync();
} else {
fsOut.hflush();
}
long pos = fsOut.getPos();
eventLoopGroup.next().execute(() -> future.complete(pos));
} catch (IOException e) {
eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
}
}
@Override
public CompletableFuture<Long> flush(boolean sync) {
CompletableFuture<Long> future = new CompletableFuture<>();
flushExecutor.execute(() -> flush0(future, sync));
return future;
}
@Override
public void close() throws IOException {
try {
flushExecutor.submit(() -> {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
return null;
}).get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new IOException(e.getCause());
} finally {
flushExecutor.shutdown();
}
fsOut.close();
}
@Override
public int buffered() {
return out.size();
}
};
return new WrapperAsyncFSOutput(f, out);
}
}

View File

@ -380,8 +380,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
@Override
public DatanodeInfo[] getPipeline() {
State state = this.state;
return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0];
return locations;
}
private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
@ -569,4 +568,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
}
@Override
public boolean isBroken() {
return state == State.BROKEN;
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* An {@link AsyncFSOutput} wraps a {@link FSDataOutputStream}.
*/
@InterfaceAudience.Private
public class WrapperAsyncFSOutput implements AsyncFSOutput {
private final FSDataOutputStream out;
private ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final ExecutorService executor;
public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
this.out = out;
this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + file.toString().replace("%", "%%")).build());
}
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) {
buffer.write(b, off, len);
}
@Override
public void writeInt(int i) {
buffer.writeInt(i);
}
@Override
public void write(ByteBuffer bb) {
buffer.write(bb, bb.position(), bb.remaining());
}
@Override
public int buffered() {
return buffer.size();
}
@Override
public DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}
private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer, boolean sync) {
try {
if (buffer.size() > 0) {
out.write(buffer.getBuffer(), 0, buffer.size());
if (sync) {
out.hsync();
} else {
out.hflush();
}
}
future.complete(out.getPos());
} catch (IOException e) {
future.completeExceptionally(e);
return;
}
}
@Override
public CompletableFuture<Long> flush(boolean sync) {
CompletableFuture<Long> future = new CompletableFuture<>();
ByteArrayOutputStream buffer = this.buffer;
this.buffer = new ByteArrayOutputStream();
executor.execute(() -> flush0(future, buffer, sync));
return future;
}
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
executor.shutdown();
out.close();
}
@Override
public void close() throws IOException {
Preconditions.checkState(buffer.size() == 0, "should call flush first before calling close");
executor.shutdown();
out.close();
}
@Override
public boolean isBroken() {
return false;
}
}

View File

@ -751,6 +751,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
// typically there is no 'low replication' state, only a 'broken' state.
AsyncFSOutput output = this.fsOut;
return output != null && output.getPipeline().length == 0;
return output != null && output.isBroken();
}
}

View File

@ -39,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -112,8 +113,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
}
}
static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
final FanOutOneBlockAsyncDFSOutput out)
static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
throws IOException, InterruptedException, ExecutionException {
List<CompletableFuture<Long>> futures = new ArrayList<>();
byte[] b = new byte[10];
@ -130,10 +130,10 @@ public class TestFanOutOneBlockAsyncDFSOutput {
assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
}
out.close();
assertEquals(b.length * 10, dfs.getFileStatus(f).getLen());
assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
rand.setSeed(12345);
try (FSDataInputStream in = dfs.open(f)) {
try (FSDataInputStream in = fs.open(f)) {
for (int i = 0; i < 10; i++) {
in.readFully(actual);
rand.nextBytes(b);
@ -149,7 +149,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
writeAndVerify(eventLoop, FS, f, out);
writeAndVerify(FS, f, out);
}
@Test
@ -193,7 +193,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(eventLoop, FS, f, out);
writeAndVerify(FS, f, out);
}
/**

View File

@ -17,19 +17,9 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
@ -40,6 +30,11 @@ import org.junit.AfterClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
@Category({ MiscTests.class, SmallTests.class })
public class TestLocalAsyncOutput {
@ -62,16 +57,6 @@ public class TestLocalAsyncOutput {
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS);
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
out.write(b);
assertEquals(b.length, out.flush(true).get().longValue());
out.close();
assertEquals(b.length, fs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = fs.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
}
}

View File

@ -241,7 +241,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, file, out);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
}
@Test

View File

@ -17,16 +17,13 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
*
*/
@Category({ MiscTests.class, SmallTests.class })
public class TestSendBufSizePredictor {