HBASE-16891 Try copying to the Netty ByteBuf directly from the WALEdit

This commit is contained in:
zhangduo 2016-10-28 21:00:24 +08:00
parent ad0e862f78
commit 6127753b65
5 changed files with 171 additions and 109 deletions

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.io;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
@ -50,28 +49,28 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup
}
@Override
public void write(ByteBuffer b, int off, int len) throws IOException {
public void write(ByteBuffer b, int off, int len) {
checkSizeAndGrow(len);
ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len);
this.pos += len;
}
@Override
public void writeInt(int i) throws IOException {
public void writeInt(int i) {
checkSizeAndGrow(Bytes.SIZEOF_INT);
Bytes.putInt(this.buf, this.pos, i);
this.pos += Bytes.SIZEOF_INT;
}
@Override
public void write(int b) throws IOException {
public void write(int b) {
checkSizeAndGrow(Bytes.SIZEOF_BYTE);
buf[this.pos] = (byte) b;
this.pos++;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
public void write(byte[] b, int off, int len) {
checkSizeAndGrow(len);
System.arraycopy(b, off, this.buf, this.pos, len);
this.pos += len;
@ -109,7 +108,7 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup
* Copies the content of this Stream into a new byte array.
* @return the contents of this output stream, as new byte array.
*/
public byte toByteArray()[] {
public byte[] toByteArray() {
return Arrays.copyOf(buf, pos);
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.asyncfs;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -43,6 +44,16 @@ public interface AsyncFSOutput extends Closeable {
*/
void write(byte[] b, int off, int len);
/**
* Write an int to the buffer.
*/
void writeInt(int i);
/**
* Copy the data in the given {@code bb} into the buffer.
*/
void write(ByteBuffer bb);
/**
* Return the current size of buffered data.
*/

View File

@ -22,11 +22,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.EventLoop;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -36,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -103,68 +103,45 @@ public final class AsyncFSOutputHelper {
return new DatanodeInfo[0];
}
@Override
public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler,
final boolean sync) {
flushExecutor.execute(new Runnable() {
@Override
public void run() {
try {
synchronized (out) {
out.writeTo(fsOut);
out.reset();
}
} catch (final IOException e) {
eventLoop.execute(new Runnable() {
@Override
public void run() {
handler.failed(e, attachment);
}
});
return;
}
try {
if (sync) {
fsOut.hsync();
} else {
fsOut.hflush();
}
final long pos = fsOut.getPos();
eventLoop.execute(new Runnable() {
@Override
public void run() {
handler.completed(pos, attachment);
}
});
} catch (final IOException e) {
eventLoop.execute(new Runnable() {
@Override
public void run() {
handler.failed(e, attachment);
}
});
}
private <A> void flush0(A attachment, CompletionHandler<Long, ? super A> handler,
boolean sync) {
try {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
});
} catch (IOException e) {
eventLoop.execute(() -> handler.failed(e, attachment));
return;
}
try {
if (sync) {
fsOut.hsync();
} else {
fsOut.hflush();
}
final long pos = fsOut.getPos();
eventLoop.execute(() -> handler.completed(pos, attachment));
} catch (final IOException e) {
eventLoop.execute(() -> handler.failed(e, attachment));
}
}
@Override
public <A> void flush(A attachment, CompletionHandler<Long, ? super A> handler,
boolean sync) {
flushExecutor.execute(() -> flush0(attachment, handler, sync));
}
@Override
public void close() throws IOException {
try {
flushExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
synchronized (out) {
out.writeTo(fsOut);
out.reset();
}
return null;
flushExecutor.submit(() -> {
synchronized (out) {
fsOut.write(out.getBuffer(), 0, out.size());
out.reset();
}
return null;
}).get();
} catch (InterruptedException e) {
throw new InterruptedIOException();
@ -181,6 +158,16 @@ public final class AsyncFSOutputHelper {
public int buffered() {
return out.size();
}
@Override
public void writeInt(int i) {
out.writeInt(i);
}
@Override
public void write(ByteBuffer bb) {
out.write(bb, bb.position(), bb.remaining());
}
};
}
}

View File

@ -54,7 +54,6 @@ import java.util.Deque;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@ -349,6 +348,47 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
}
private void writeInt0(int i) {
buf.ensureWritable(4);
if (cryptoCodec == null) {
buf.writeInt(i);
} else {
ByteBuffer inBuffer = ByteBuffer.allocate(4);
inBuffer.putInt(0, i);
cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4));
buf.writerIndex(buf.writerIndex() + 4);
}
}
@Override
public void writeInt(int i) {
if (eventLoop.inEventLoop()) {
writeInt0(i);
} else {
eventLoop.submit(() -> writeInt0(i));
}
}
private void write0(ByteBuffer bb) {
int len = bb.remaining();
buf.ensureWritable(len);
if (cryptoCodec == null) {
buf.writeBytes(bb);
} else {
cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len));
buf.writerIndex(buf.writerIndex() + len);
}
}
@Override
public void write(ByteBuffer bb) {
if (eventLoop.inEventLoop()) {
write0(bb);
} else {
eventLoop.submit(() -> write0(bb));
}
}
@Override
public void write(byte[] b) {
write(b, 0, b.length);
@ -370,13 +410,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
if (eventLoop.inEventLoop()) {
write0(b, off, len);
} else {
eventLoop.submit(new Runnable() {
@Override
public void run() {
write0(b, off, len);
}
}).syncUninterruptibly();
eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly();
}
}
@ -385,13 +419,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
if (eventLoop.inEventLoop()) {
return buf.readableBytes();
} else {
return eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return buf.readableBytes();
}
}).syncUninterruptibly().getNow().intValue();
return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue();
}
}

View File

@ -18,13 +18,13 @@
package org.apache.hadoop.hbase.regionserver.wal;
import com.google.common.base.Throwables;
import com.google.common.primitives.Ints;
import io.netty.channel.EventLoop;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
import org.apache.commons.logging.Log;
@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
* AsyncWriter for protobuf-based WAL.
*/
@InterfaceAudience.Private
public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
AsyncFSWALProvider.AsyncWriter {
public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
implements AsyncFSWALProvider.AsyncWriter {
private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class);
@ -98,7 +98,48 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
private AsyncFSOutput output;
private ByteArrayOutputStream buf;
private static final class OutputStreamWrapper extends OutputStream
implements ByteBufferSupportOutputStream {
private final AsyncFSOutput out;
private final byte[] oneByteBuf = new byte[1];
@Override
public void write(int b) throws IOException {
oneByteBuf[0] = (byte) b;
write(oneByteBuf);
}
public OutputStreamWrapper(AsyncFSOutput out) {
this.out = out;
}
@Override
public void write(ByteBuffer b, int off, int len) throws IOException {
ByteBuffer bb = b.duplicate();
bb.position(off);
bb.limit(off + len);
out.write(bb);
}
@Override
public void writeInt(int i) throws IOException {
out.writeInt(i);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
@Override
public void close() throws IOException {
out.close();
}
}
private OutputStream asyncOutputWrapper;
public AsyncProtobufLogWriter(EventLoop eventLoop) {
this.eventLoop = eventLoop;
@ -106,26 +147,22 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
@Override
public void append(Entry entry) {
buf.reset();
int buffered = output.buffered();
entry.setCompressionContext(compressionContext);
try {
entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
.writeDelimitedTo(buf);
.writeDelimitedTo(asyncOutputWrapper);
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
length.addAndGet(buf.size());
output.write(buf.getBuffer(), 0, buf.size());
try {
for (Cell cell : entry.getEdit().getCells()) {
buf.reset();
cellEncoder.write(cell);
length.addAndGet(buf.size());
output.write(buf.getBuffer(), 0, buf.size());
}
} catch (IOException e) {
throw new AssertionError("should not happen", e);
}
length.addAndGet(output.buffered() - buffered);
}
@Override
@ -157,22 +194,21 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
short replication, long blockSize) throws IOException {
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoop);
this.buf = new ByteArrayOutputStream();
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}
@Override
protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException {
buf.reset();
header.writeDelimitedTo(buf);
final BlockingCompletionHandler handler = new BlockingCompletionHandler();
eventLoop.execute(new Runnable() {
@Override
public void run() {
output.write(ProtobufLogReader.PB_WAL_MAGIC);
output.write(buf.getBuffer(), 0, buf.size());
output.flush(null, handler, false);
eventLoop.execute(() -> {
output.write(magic);
try {
header.writeDelimitedTo(asyncOutputWrapper);
} catch (IOException e) {
// should not happen
throw new AssertionError(e);
}
output.flush(null, handler, false);
});
return handler.get();
}
@ -180,22 +216,23 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements
@Override
protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic)
throws IOException {
buf.reset();
trailer.writeTo(buf);
final BlockingCompletionHandler handler = new BlockingCompletionHandler();
eventLoop.execute(new Runnable() {
public void run() {
output.write(buf.getBuffer(), 0, buf.size());
output.write(Ints.toByteArray(buf.size()));
output.write(magic);
output.flush(null, handler, false);
eventLoop.execute(() -> {
try {
trailer.writeTo(asyncOutputWrapper);
} catch (IOException e) {
// should not happen
throw new AssertionError(e);
}
output.writeInt(trailer.getSerializedSize());
output.write(magic);
output.flush(null, handler, false);
});
return handler.get();
}
@Override
protected OutputStream getOutputStreamForCellEncoder() {
return buf;
return asyncOutputWrapper;
}
}