diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java new file mode 100644 index 00000000000..807d82ad75d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.channels.CompletionHandler; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +/** + * Interface for asynchronous filesystem output stream. + */ +@InterfaceAudience.Private +public interface AsyncFSOutput extends Closeable { + + /** + * Just call write(b, 0, b.length). + * @see #write(byte[], int, int) + */ + void write(byte[] b); + + /** + * Copy the data into the buffer. Note that you need to call + * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually. + */ + void write(byte[] b, int off, int len); + + /** + * Return the current size of buffered data. + */ + int buffered(); + + /** + * Return current pipeline. Empty array if no pipeline. + */ + DatanodeInfo[] getPipeline(); + + /** + * Flush the buffer out. + * @param attachment will be passed to handler when completed. + * @param handler will set the acked length as result when completed. + * @param sync persistent the data to device + */ + void flush(A attachment, final CompletionHandler handler, boolean sync); + + /** + * The close method when error occurred. + */ + void recoverAndClose(CancelableProgressable reporter) throws IOException; + + /** + * Close the file. You should call {@link #recoverAndClose(CancelableProgressable)} if this method + * throws an exception. + */ + @Override + void close() throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java new file mode 100644 index 00000000000..576bb299b42 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import io.netty.channel.EventLoop; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.channels.CompletionHandler; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +/** + * Helper class for creating AsyncFSOutput. + */ +@InterfaceAudience.Private +public final class AsyncFSOutputHelper { + + private AsyncFSOutputHelper() { + } + + /** + * Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple + * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}. + */ + public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, + boolean createParent, short replication, long blockSize, final EventLoop eventLoop) + throws IOException { + if (fs instanceof DistributedFileSystem) { + return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, + overwrite, createParent, replication, blockSize, eventLoop); + } + final FSDataOutputStream fsOut; + int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + if (createParent) { + fsOut = fs.create(f, overwrite, bufferSize, replication, blockSize, null); + } else { + fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null); + } + final ExecutorService flushExecutor = + Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build()); + return new AsyncFSOutput() { + + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + @Override + public void write(final byte[] b, final int off, final int len) { + if (eventLoop.inEventLoop()) { + out.write(b, off, len); + } else { + eventLoop.submit(new Runnable() { + public void run() { + out.write(b, off, len); + } + }).syncUninterruptibly(); + } + } + + @Override + public void write(byte[] b) { + write(b, 0, b.length); + } + + @Override + public void recoverAndClose(CancelableProgressable reporter) throws IOException { + fsOut.close(); + } + + @Override + public DatanodeInfo[] getPipeline() { + return new DatanodeInfo[0]; + } + + @Override + public void flush(final A attachment, final CompletionHandler handler, + final boolean sync) { + flushExecutor.execute(new Runnable() { + + @Override + public void run() { + try { + synchronized (out) { + out.writeTo(fsOut); + out.reset(); + } + } catch (final IOException e) { + eventLoop.execute(new Runnable() { + + @Override + public void run() { + handler.failed(e, attachment); + } + }); + return; + } + try { + if (sync) { + fsOut.hsync(); + } else { + fsOut.hflush(); + } + final long pos = fsOut.getPos(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + handler.completed(pos, attachment); + } + }); + } catch (final IOException e) { + eventLoop.execute(new Runnable() { + + @Override + public void run() { + handler.failed(e, attachment); + } + }); + } + } + }); + } + + @Override + public void close() throws IOException { + try { + flushExecutor.submit(new Callable() { + + @Override + public Void call() throws Exception { + synchronized (out) { + out.writeTo(fsOut); + out.reset(); + } + return null; + } + }).get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } catch (ExecutionException e) { + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new IOException(e.getCause()); + } finally { + flushExecutor.shutdown(); + } + fsOut.close(); + } + + @Override + public int buffered() { + return out.size(); + } + }; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java similarity index 81% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index bdbf8656f34..7d6a676d183 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -15,18 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.util; +package org.apache.hadoop.hbase.io.asyncfs; import static io.netty.handler.timeout.IdleState.READER_IDLE; import static io.netty.handler.timeout.IdleState.WRITER_IDLE; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; + +import com.google.common.base.Supplier; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoop; import io.netty.channel.SimpleChannelInboundHandler; @@ -38,7 +42,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import java.io.Closeable; import java.io.IOException; import java.nio.channels.CompletionHandler; import java.util.ArrayDeque; @@ -51,12 +54,12 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import com.google.common.base.Supplier; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -96,7 +99,7 @@ import org.apache.hadoop.util.DataChecksum; * */ @InterfaceAudience.Private -public class FanOutOneBlockAsyncDFSOutput implements Closeable { +public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final Configuration conf; @@ -126,11 +129,11 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { private static final class Callback { - public final Promise promise; + private final Promise promise; - public final long ackedLength; + private final long ackedLength; - public final Set unfinishedReplicas; + private final Set unfinishedReplicas; public Callback(Promise promise, long ackedLength, Collection replicas) { this.promise = promise; @@ -211,95 +214,98 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { } } - private void setupReceiver(final int timeoutMs) { - SimpleChannelInboundHandler ackHandler = new SimpleChannelInboundHandler() { + @Sharable + private final class AckHandler extends SimpleChannelInboundHandler { - @Override - public boolean isSharable() { - return true; - } + private final int timeoutMs; - @Override - protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack) - throws Exception { - final Status reply = getStatus(ack); - if (reply != Status.SUCCESS) { - failed(ctx.channel(), new Supplier() { + public AckHandler(int timeoutMs) { + this.timeoutMs = timeoutMs; + } - @Override - public Throwable get() { - return new IOException("Bad response " + reply + " for block " - + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); - } - }); - return; - } - if (PipelineAck.isRestartOOBStatus(reply)) { - failed(ctx.channel(), new Supplier() { - - @Override - public Throwable get() { - return new IOException("Restart response " + reply + " for block " - + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); - } - }); - return; - } - if (ack.getSeqno() == HEART_BEAT_SEQNO) { - return; - } - completed(ctx.channel()); - } - - @Override - public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + @Override + protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack) + throws Exception { + final Status reply = getStatus(ack); + if (reply != Status.SUCCESS) { failed(ctx.channel(), new Supplier() { @Override public Throwable get() { - return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"); + return new IOException("Bad response " + reply + " for block " + locatedBlock.getBlock() + + " from datanode " + ctx.channel().remoteAddress()); } }); + return; } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) - throws Exception { + if (PipelineAck.isRestartOOBStatus(reply)) { failed(ctx.channel(), new Supplier() { @Override public Throwable get() { - return cause; + return new IOException("Restart response " + reply + " for block " + + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); } }); + return; } + if (ack.getSeqno() == HEART_BEAT_SEQNO) { + return; + } + completed(ctx.channel()); + } - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent e = (IdleStateEvent) evt; - if (e.state() == READER_IDLE) { - failed(ctx.channel(), new Supplier() { + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + failed(ctx.channel(), new Supplier() { - @Override - public Throwable get() { - return new IOException("Timeout(" + timeoutMs + "ms) waiting for response"); - } - }); - } else if (e.state() == WRITER_IDLE) { - PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); - int len = heartbeat.getSerializedSize(); - ByteBuf buf = alloc.buffer(len); - heartbeat.putInBuffer(buf.nioBuffer(0, len)); - buf.writerIndex(len); - ctx.channel().writeAndFlush(buf); - } - return; + @Override + public Throwable get() { + return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"); } - super.userEventTriggered(ctx, evt); - } + }); + } - }; + @Override + public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) + throws Exception { + failed(ctx.channel(), new Supplier() { + + @Override + public Throwable get() { + return cause; + } + }); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == READER_IDLE) { + failed(ctx.channel(), new Supplier() { + + @Override + public Throwable get() { + return new IOException("Timeout(" + timeoutMs + "ms) waiting for response"); + } + }); + } else if (e.state() == WRITER_IDLE) { + PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); + int len = heartbeat.getSerializedSize(); + ByteBuf buf = alloc.buffer(len); + heartbeat.putInBuffer(buf.nioBuffer(0, len)); + buf.writerIndex(len); + ctx.channel().writeAndFlush(buf); + } + return; + } + super.userEventTriggered(ctx, evt); + } + } + + private void setupReceiver(int timeoutMs) { + AckHandler ackHandler = new AckHandler(timeoutMs); for (Channel ch : datanodeList) { ch.pipeline().addLast( new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), @@ -331,18 +337,12 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT)); } - /** - * Just call write(b, 0, b.length). - * @see #write(byte[], int, int) - */ + @Override public void write(byte[] b) { write(b, 0, b.length); } - /** - * Copy the data into the buffer. Note that you need to call - * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually. - */ + @Override public void write(final byte[] b, final int off, final int len) { if (eventLoop.inEventLoop()) { buf.ensureWritable(len).writeBytes(b, off, len); @@ -357,9 +357,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { } } - /** - * Return the current size of buffered data. - */ + @Override public int buffered() { if (eventLoop.inEventLoop()) { return buf.readableBytes(); @@ -374,6 +372,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { } } + @Override public DatanodeInfo[] getPipeline() { return locatedBlock.getLocations(); } @@ -491,6 +490,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { /** * The close method when error occurred. Now we just call recoverFileLease. */ + @Override public void recoverAndClose(CancelableProgressable reporter) throws IOException { assert !eventLoop.inEventLoop(); for (Channel ch : datanodeList) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java similarity index 99% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 2225191e5f1..4f9058cc559 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -15,17 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.util; +package org.apache.hadoop.hbase.io.asyncfs; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.CodedOutputStream; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; @@ -57,10 +62,6 @@ import java.util.EnumSet; import java.util.List; import java.util.concurrent.TimeUnit; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.CodedOutputStream; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -72,6 +73,8 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DistributedFileSystem; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java similarity index 99% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index 341d4ec69f3..870262eb2d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -15,9 +15,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.util; +package org.apache.hadoop.hbase.io.asyncfs; import static io.netty.handler.timeout.IdleState.READER_IDLE; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; + import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.CompositeByteBuf; @@ -63,13 +71,6 @@ import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedOutputStream; - import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index b80f2c9f33c..d5bccf0445c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; @@ -37,8 +40,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,8 +49,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; @@ -209,7 +210,7 @@ public class AsyncFSWAL extends AbstractFSWAL { .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Close-WAL-Writer-%d").build()); - private volatile FanOutOneBlockAsyncDFSOutput hdfsOut; + private volatile AsyncFSOutput fsOut; private final Deque waitingAppendEntries = new ArrayDeque(); @@ -663,7 +664,7 @@ public class AsyncFSWAL extends AbstractFSWAL { final AsyncWriter oldWriter = this.writer; this.writer = nextWriter; if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) { - this.hdfsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); + this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); } this.fileLengthAtLastSync = 0L; boolean scheduleTask; @@ -721,7 +722,7 @@ public class AsyncFSWAL extends AbstractFSWAL { @Override DatanodeInfo[] getPipeline() { - FanOutOneBlockAsyncDFSOutput output = this.hdfsOut; + AsyncFSOutput output = this.fsOut; return output != null ? output.getPipeline() : new DatanodeInfo[0]; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 894f3dd656e..886b1725eb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; + +import io.netty.channel.EventLoop; + import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; @@ -29,18 +34,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hdfs.DistributedFileSystem; - -import com.google.common.base.Throwables; -import com.google.common.primitives.Ints; - -import io.netty.channel.EventLoop; /** * AsyncWriter for protobuf-based WAL. @@ -97,7 +96,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements private final EventLoop eventLoop; - private FanOutOneBlockAsyncDFSOutput output; + private AsyncFSOutput output; private ByteArrayOutputStream buf; @@ -149,16 +148,15 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements this.output = null; } - public FanOutOneBlockAsyncDFSOutput getOutput() { + public AsyncFSOutput getOutput() { return this.output; } @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException { - this.output = - FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, path, - overwritable, false, replication, blockSize, eventLoop); + this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, + blockSize, eventLoop); this.buf = new ByteArrayOutputStream(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputFlushHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java similarity index 97% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputFlushHandler.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java index cbd0761ce4e..58b53017dd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputFlushHandler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.util; +package org.apache.hadoop.hbase.io.asyncfs; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java similarity index 99% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index a10712e7b98..2be3b281744 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.util; +package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.junit.Assert.assertArrayEquals; @@ -23,6 +23,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Field; @@ -49,10 +53,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; - @Category({ MiscTests.class, MediumTests.class }) public class TestFanOutOneBlockAsyncDFSOutput { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java new file mode 100644 index 00000000000..04cb0efba93 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MiscTests.class, SmallTests.class }) +public class TestLocalAsyncOutput { + + private static EventLoopGroup GROUP = new NioEventLoopGroup(); + + private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility(); + + @AfterClass + public static void tearDownAfterClass() throws IOException { + TEST_UTIL.cleanupTestDir(); + GROUP.shutdownGracefully(); + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); + FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); + AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next()); + byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); + out.write(b); + out.flush(null, handler, true); + assertEquals(b.length, handler.get()); + out.close(); + assertEquals(b.length, fs.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = fs.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java similarity index 99% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 2f5e2ff1e22..7501849f608 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -15,12 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.util; +package org.apache.hadoop.hbase.io.asyncfs; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -58,10 +62,6 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import io.netty.channel.EventLoop; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; - @RunWith(Parameterized.class) @Category({ MiscTests.class, MediumTests.class }) public class TestSaslFanOutOneBlockAsyncDFSOutput { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java index 7d6c6d98877..b64d4584675 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -24,9 +24,9 @@ import java.util.concurrent.ExecutionException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputFlushHandler; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputFlushHandler; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.junit.AfterClass;