HBASE-15628 Implement an AsyncOutputStream which can work with any FileSystem implementation

This commit is contained in:
zhangduo 2016-04-25 21:55:48 +08:00
parent 3cfe363f13
commit 1eac103ead
12 changed files with 478 additions and 139 deletions

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.CompletionHandler;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
/**
* Interface for asynchronous filesystem output stream.
*/
@InterfaceAudience.Private
public interface AsyncFSOutput extends Closeable {
/**
* Just call write(b, 0, b.length).
* @see #write(byte[], int, int)
*/
void write(byte[] b);
/**
* Copy the data into the buffer. Note that you need to call
* {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually.
*/
void write(byte[] b, int off, int len);
/**
* Return the current size of buffered data.
*/
int buffered();
/**
* Return current pipeline. Empty array if no pipeline.
*/
DatanodeInfo[] getPipeline();
/**
* Flush the buffer out.
* @param attachment will be passed to handler when completed.
* @param handler will set the acked length as result when completed.
* @param sync persistent the data to device
*/
<A> void flush(A attachment, final CompletionHandler<Long, ? super A> handler, boolean sync);
/**
* The close method when error occurred.
*/
void recoverAndClose(CancelableProgressable reporter) throws IOException;
/**
* Close the file. You should call {@link #recoverAndClose(CancelableProgressable)} if this method
* throws an exception.
*/
@Override
void close() throws IOException;
}

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoop;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
/**
* Helper class for creating AsyncFSOutput.
*/
@InterfaceAudience.Private
public final class AsyncFSOutputHelper {
private AsyncFSOutputHelper() {
}
/**
* Create {@link FanOutOneBlockAsyncDFSOutput} for {@link DistributedFileSystem}, and a simple
* implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, final EventLoop eventLoop)
throws IOException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoop);
}
final FSDataOutputStream fsOut;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
if (createParent) {
fsOut = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
} else {
fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
}
final ExecutorService flushExecutor =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
return new AsyncFSOutput() {
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
@Override
public void write(final byte[] b, final int off, final int len) {
if (eventLoop.inEventLoop()) {
out.write(b, off, len);
} else {
eventLoop.submit(new Runnable() {
public void run() {
out.write(b, off, len);
}
}).syncUninterruptibly();
}
}
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
fsOut.close();
}
@Override
public DatanodeInfo[] getPipeline() {
return new DatanodeInfo[0];
}
@Override
public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler,
final boolean sync) {
flushExecutor.execute(new Runnable() {
@Override
public void run() {
try {
synchronized (out) {
out.writeTo(fsOut);
out.reset();
}
} catch (final IOException e) {
eventLoop.execute(new Runnable() {
@Override
public void run() {
handler.failed(e, attachment);
}
});
return;
}
try {
if (sync) {
fsOut.hsync();
} else {
fsOut.hflush();
}
final long pos = fsOut.getPos();
eventLoop.execute(new Runnable() {
@Override
public void run() {
handler.completed(pos, attachment);
}
});
} catch (final IOException e) {
eventLoop.execute(new Runnable() {
@Override
public void run() {
handler.failed(e, attachment);
}
});
}
}
});
}
@Override
public void close() throws IOException {
try {
flushExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
synchronized (out) {
out.writeTo(fsOut);
out.reset();
}
return null;
}
}).get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw new IOException(e.getCause());
} finally {
flushExecutor.shutdown();
}
fsOut.close();
}
@Override
public int buffered() {
return out.size();
}
};
}
}

View File

@ -15,18 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
package org.apache.hadoop.hbase.io.asyncfs;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import com.google.common.base.Supplier;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
@ -38,7 +42,6 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.CompletionHandler;
import java.util.ArrayDeque;
@ -51,12 +54,12 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -96,7 +99,7 @@ import org.apache.hadoop.util.DataChecksum;
* </ol>
*/
@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutput implements Closeable {
public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
private final Configuration conf;
@ -126,11 +129,11 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
private static final class Callback {
public final Promise<Void> promise;
private final Promise<Void> promise;
public final long ackedLength;
private final long ackedLength;
public final Set<Channel> unfinishedReplicas;
private final Set<Channel> unfinishedReplicas;
public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) {
this.promise = promise;
@ -211,95 +214,98 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
}
}
private void setupReceiver(final int timeoutMs) {
SimpleChannelInboundHandler<PipelineAckProto> ackHandler = new SimpleChannelInboundHandler<PipelineAckProto>() {
@Sharable
private final class AckHandler extends SimpleChannelInboundHandler<PipelineAckProto> {
@Override
public boolean isSharable() {
return true;
}
private final int timeoutMs;
@Override
protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack)
throws Exception {
final Status reply = getStatus(ack);
if (reply != Status.SUCCESS) {
failed(ctx.channel(), new Supplier<Throwable>() {
public AckHandler(int timeoutMs) {
this.timeoutMs = timeoutMs;
}
@Override
public Throwable get() {
return new IOException("Bad response " + reply + " for block "
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
}
});
return;
}
if (PipelineAck.isRestartOOBStatus(reply)) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Restart response " + reply + " for block "
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
}
});
return;
}
if (ack.getSeqno() == HEART_BEAT_SEQNO) {
return;
}
completed(ctx.channel());
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack)
throws Exception {
final Status reply = getStatus(ack);
if (reply != Status.SUCCESS) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
return new IOException("Bad response " + reply + " for block " + locatedBlock.getBlock()
+ " from datanode " + ctx.channel().remoteAddress());
}
});
return;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause)
throws Exception {
if (PipelineAck.isRestartOOBStatus(reply)) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return cause;
return new IOException("Restart response " + reply + " for block "
+ locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress());
}
});
return;
}
if (ack.getSeqno() == HEART_BEAT_SEQNO) {
return;
}
completed(ctx.channel());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == READER_IDLE) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
}
});
} else if (e.state() == WRITER_IDLE) {
PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
int len = heartbeat.getSerializedSize();
ByteBuf buf = alloc.buffer(len);
heartbeat.putInBuffer(buf.nioBuffer(0, len));
buf.writerIndex(len);
ctx.channel().writeAndFlush(buf);
}
return;
@Override
public Throwable get() {
return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed");
}
super.userEventTriggered(ctx, evt);
}
});
}
};
@Override
public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause)
throws Exception {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return cause;
}
});
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == READER_IDLE) {
failed(ctx.channel(), new Supplier<Throwable>() {
@Override
public Throwable get() {
return new IOException("Timeout(" + timeoutMs + "ms) waiting for response");
}
});
} else if (e.state() == WRITER_IDLE) {
PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false);
int len = heartbeat.getSerializedSize();
ByteBuf buf = alloc.buffer(len);
heartbeat.putInBuffer(buf.nioBuffer(0, len));
buf.writerIndex(len);
ctx.channel().writeAndFlush(buf);
}
return;
}
super.userEventTriggered(ctx, evt);
}
}
private void setupReceiver(int timeoutMs) {
AckHandler ackHandler = new AckHandler(timeoutMs);
for (Channel ch : datanodeList) {
ch.pipeline().addLast(
new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
@ -331,18 +337,12 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT));
}
/**
* Just call write(b, 0, b.length).
* @see #write(byte[], int, int)
*/
@Override
public void write(byte[] b) {
write(b, 0, b.length);
}
/**
* Copy the data into the buffer. Note that you need to call
* {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually.
*/
@Override
public void write(final byte[] b, final int off, final int len) {
if (eventLoop.inEventLoop()) {
buf.ensureWritable(len).writeBytes(b, off, len);
@ -357,9 +357,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
}
}
/**
* Return the current size of buffered data.
*/
@Override
public int buffered() {
if (eventLoop.inEventLoop()) {
return buf.readableBytes();
@ -374,6 +372,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
}
}
@Override
public DatanodeInfo[] getPipeline() {
return locatedBlock.getLocations();
}
@ -491,6 +490,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable {
/**
* The close method when error occurred. Now we just call recoverFileLease.
*/
@Override
public void recoverAndClose(CancelableProgressable reporter) throws IOException {
assert !eventLoop.inEventLoop();
for (Channel ch : datanodeList) {

View File

@ -15,17 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
package org.apache.hadoop.hbase.io.asyncfs;
import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.CodedOutputStream;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@ -57,10 +62,6 @@ import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.CodedOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -72,6 +73,8 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;

View File

@ -15,9 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
package org.apache.hadoop.hbase.io.asyncfs;
import static io.netty.handler.timeout.IdleState.READER_IDLE;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.CompositeByteBuf;
@ -63,13 +71,6 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT;
import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
@ -37,8 +40,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -48,8 +49,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput;
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
@ -209,7 +210,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Close-WAL-Writer-%d").build());
private volatile FanOutOneBlockAsyncDFSOutput hdfsOut;
private volatile AsyncFSOutput fsOut;
private final Deque<FSWALEntry> waitingAppendEntries = new ArrayDeque<FSWALEntry>();
@ -663,7 +664,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
final AsyncWriter oldWriter = this.writer;
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
this.hdfsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
}
this.fileLengthAtLastSync = 0L;
boolean scheduleTask;
@ -721,7 +722,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
DatanodeInfo[] getPipeline() {
FanOutOneBlockAsyncDFSOutput output = this.hdfsOut;
AsyncFSOutput output = this.fsOut;
return output != null ? output.getPipeline() : new DatanodeInfo[0];
}

View File

@ -17,6 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables;
import com.google.common.primitives.Ints;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
@ -29,18 +34,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput;
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import com.google.common.base.Throwables;
import com.google.common.primitives.Ints;
import io.netty.channel.EventLoop;
/**
* AsyncWriter for protobuf-based WAL.
@ -97,7 +96,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
private final EventLoop eventLoop;
private FanOutOneBlockAsyncDFSOutput output;
private AsyncFSOutput output;
private ByteArrayOutputStream buf;
@ -149,16 +148,15 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
this.output = null;
}
public FanOutOneBlockAsyncDFSOutput getOutput() {
public AsyncFSOutput getOutput() {
return this.output;
}
@Override
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException {
this.output =
FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, path,
overwritable, false, replication, blockSize, eventLoop);
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoop);
this.buf = new ByteArrayOutputStream();
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
package org.apache.hadoop.hbase.io.asyncfs;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.junit.Assert.assertArrayEquals;
@ -23,6 +23,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
@ -49,10 +53,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@Category({ MiscTests.class, MediumTests.class })
public class TestFanOutOneBlockAsyncDFSOutput {

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.asyncfs;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MiscTests.class, SmallTests.class })
public class TestLocalAsyncOutput {
private static EventLoopGroup GROUP = new NioEventLoopGroup();
private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
@AfterClass
public static void tearDownAfterClass() throws IOException {
TEST_UTIL.cleanupTestDir();
GROUP.shutdownGracefully();
}
@Test
public void test() throws IOException, InterruptedException, ExecutionException {
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next());
byte[] b = new byte[10];
ThreadLocalRandom.current().nextBytes(b);
FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler();
out.write(b);
out.flush(null, handler, true);
assertEquals(b.length, handler.get());
out.close();
assertEquals(b.length, fs.getFileStatus(f).getLen());
byte[] actual = new byte[b.length];
try (FSDataInputStream in = fs.open(f)) {
in.readFully(actual);
}
assertArrayEquals(b, actual);
}
}

View File

@ -15,12 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
package org.apache.hadoop.hbase.io.asyncfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@ -58,10 +62,6 @@ import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@RunWith(Parameterized.class)
@Category({ MiscTests.class, MediumTests.class })
public class TestSaslFanOutOneBlockAsyncDFSOutput {

View File

@ -24,9 +24,9 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputFlushHandler;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputFlushHandler;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.junit.AfterClass;