diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java index d9515955293..93121df0fd4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteArrayOutputStream.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.io; -import java.io.IOException; import java.io.OutputStream; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; @@ -50,28 +49,28 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup } @Override - public void write(ByteBuffer b, int off, int len) throws IOException { + public void write(ByteBuffer b, int off, int len) { checkSizeAndGrow(len); ByteBufferUtils.copyFromBufferToArray(this.buf, b, off, this.pos, len); this.pos += len; } @Override - public void writeInt(int i) throws IOException { + public void writeInt(int i) { checkSizeAndGrow(Bytes.SIZEOF_INT); Bytes.putInt(this.buf, this.pos, i); this.pos += Bytes.SIZEOF_INT; } @Override - public void write(int b) throws IOException { + public void write(int b) { checkSizeAndGrow(Bytes.SIZEOF_BYTE); buf[this.pos] = (byte) b; this.pos++; } @Override - public void write(byte[] b, int off, int len) throws IOException { + public void write(byte[] b, int off, int len) { checkSizeAndGrow(len); System.arraycopy(b, off, this.buf, this.pos, len); this.pos += len; @@ -109,7 +108,7 @@ public class ByteArrayOutputStream extends OutputStream implements ByteBufferSup * Copies the content of this Stream into a new byte array. * @return the contents of this output stream, as new byte array. */ - public byte toByteArray()[] { + public byte[] toByteArray() { return Arrays.copyOf(buf, pos); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java index 807d82ad75d..0c60d3cf792 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.asyncfs; import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -43,6 +44,16 @@ public interface AsyncFSOutput extends Closeable { */ void write(byte[] b, int off, int len); + /** + * Write an int to the buffer. + */ + void writeInt(int i); + + /** + * Copy the data in the given {@code bb} into the buffer. + */ + void write(ByteBuffer bb); + /** * Return the current size of buffered data. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 576bb299b42..c9d4e709812 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -22,11 +22,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.channel.EventLoop; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InterruptedIOException; +import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -36,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -103,68 +103,45 @@ public final class AsyncFSOutputHelper { return new DatanodeInfo[0]; } - @Override - public void flush(final A attachment, final CompletionHandler handler, - final boolean sync) { - flushExecutor.execute(new Runnable() { - - @Override - public void run() { - try { - synchronized (out) { - out.writeTo(fsOut); - out.reset(); - } - } catch (final IOException e) { - eventLoop.execute(new Runnable() { - - @Override - public void run() { - handler.failed(e, attachment); - } - }); - return; - } - try { - if (sync) { - fsOut.hsync(); - } else { - fsOut.hflush(); - } - final long pos = fsOut.getPos(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - handler.completed(pos, attachment); - } - }); - } catch (final IOException e) { - eventLoop.execute(new Runnable() { - - @Override - public void run() { - handler.failed(e, attachment); - } - }); - } + private void flush0(A attachment, CompletionHandler handler, + boolean sync) { + try { + synchronized (out) { + fsOut.write(out.getBuffer(), 0, out.size()); + out.reset(); } - }); + } catch (IOException e) { + eventLoop.execute(() -> handler.failed(e, attachment)); + return; + } + try { + if (sync) { + fsOut.hsync(); + } else { + fsOut.hflush(); + } + final long pos = fsOut.getPos(); + eventLoop.execute(() -> handler.completed(pos, attachment)); + } catch (final IOException e) { + eventLoop.execute(() -> handler.failed(e, attachment)); + } + } + + @Override + public void flush(A attachment, CompletionHandler handler, + boolean sync) { + flushExecutor.execute(() -> flush0(attachment, handler, sync)); } @Override public void close() throws IOException { try { - flushExecutor.submit(new Callable() { - - @Override - public Void call() throws Exception { - synchronized (out) { - out.writeTo(fsOut); - out.reset(); - } - return null; + flushExecutor.submit(() -> { + synchronized (out) { + fsOut.write(out.getBuffer(), 0, out.size()); + out.reset(); } + return null; }).get(); } catch (InterruptedException e) { throw new InterruptedIOException(); @@ -181,6 +158,16 @@ public final class AsyncFSOutputHelper { public int buffered() { return out.size(); } + + @Override + public void writeInt(int i) { + out.writeInt(i); + } + + @Override + public void write(ByteBuffer bb) { + out.write(bb, bb.position(), bb.remaining()); + } }; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index e1303815499..916e5349755 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -54,7 +54,6 @@ import java.util.Deque; import java.util.IdentityHashMap; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -349,6 +348,47 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); } + private void writeInt0(int i) { + buf.ensureWritable(4); + if (cryptoCodec == null) { + buf.writeInt(i); + } else { + ByteBuffer inBuffer = ByteBuffer.allocate(4); + inBuffer.putInt(0, i); + cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), 4)); + buf.writerIndex(buf.writerIndex() + 4); + } + } + + @Override + public void writeInt(int i) { + if (eventLoop.inEventLoop()) { + writeInt0(i); + } else { + eventLoop.submit(() -> writeInt0(i)); + } + } + + private void write0(ByteBuffer bb) { + int len = bb.remaining(); + buf.ensureWritable(len); + if (cryptoCodec == null) { + buf.writeBytes(bb); + } else { + cryptoCodec.encrypt(bb, buf.nioBuffer(buf.writerIndex(), len)); + buf.writerIndex(buf.writerIndex() + len); + } + } + + @Override + public void write(ByteBuffer bb) { + if (eventLoop.inEventLoop()) { + write0(bb); + } else { + eventLoop.submit(() -> write0(bb)); + } + } + @Override public void write(byte[] b) { write(b, 0, b.length); @@ -370,13 +410,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { if (eventLoop.inEventLoop()) { write0(b, off, len); } else { - eventLoop.submit(new Runnable() { - - @Override - public void run() { - write0(b, off, len); - } - }).syncUninterruptibly(); + eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly(); } } @@ -385,13 +419,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { if (eventLoop.inEventLoop()) { return buf.readableBytes(); } else { - return eventLoop.submit(new Callable() { - - @Override - public Integer call() throws Exception { - return buf.readableBytes(); - } - }).syncUninterruptibly().getNow().intValue(); + return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e2080ffc7d6..db3088cbb60 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -18,13 +18,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import com.google.common.base.Throwables; -import com.google.common.primitives.Ints; import io.netty.channel.EventLoop; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import org.apache.commons.logging.Log; @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.io.ByteBufferSupportOutputStream; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; @@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; * AsyncWriter for protobuf-based WAL. */ @InterfaceAudience.Private -public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements - AsyncFSWALProvider.AsyncWriter { +public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter + implements AsyncFSWALProvider.AsyncWriter { private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class); @@ -98,7 +98,48 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements private AsyncFSOutput output; - private ByteArrayOutputStream buf; + private static final class OutputStreamWrapper extends OutputStream + implements ByteBufferSupportOutputStream { + + private final AsyncFSOutput out; + + private final byte[] oneByteBuf = new byte[1]; + + @Override + public void write(int b) throws IOException { + oneByteBuf[0] = (byte) b; + write(oneByteBuf); + } + + public OutputStreamWrapper(AsyncFSOutput out) { + this.out = out; + } + + @Override + public void write(ByteBuffer b, int off, int len) throws IOException { + ByteBuffer bb = b.duplicate(); + bb.position(off); + bb.limit(off + len); + out.write(bb); + } + + @Override + public void writeInt(int i) throws IOException { + out.writeInt(i); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void close() throws IOException { + out.close(); + } + } + + private OutputStream asyncOutputWrapper; public AsyncProtobufLogWriter(EventLoop eventLoop) { this.eventLoop = eventLoop; @@ -106,26 +147,22 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements @Override public void append(Entry entry) { - buf.reset(); + int buffered = output.buffered(); entry.setCompressionContext(compressionContext); try { entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() - .writeDelimitedTo(buf); + .writeDelimitedTo(asyncOutputWrapper); } catch (IOException e) { throw new AssertionError("should not happen", e); } - length.addAndGet(buf.size()); - output.write(buf.getBuffer(), 0, buf.size()); try { for (Cell cell : entry.getEdit().getCells()) { - buf.reset(); cellEncoder.write(cell); - length.addAndGet(buf.size()); - output.write(buf.getBuffer(), 0, buf.size()); } } catch (IOException e) { throw new AssertionError("should not happen", e); } + length.addAndGet(output.buffered() - buffered); } @Override @@ -157,22 +194,21 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements short replication, long blockSize) throws IOException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, blockSize, eventLoop); - this.buf = new ByteArrayOutputStream(); + this.asyncOutputWrapper = new OutputStreamWrapper(output); } @Override protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { - buf.reset(); - header.writeDelimitedTo(buf); final BlockingCompletionHandler handler = new BlockingCompletionHandler(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - output.write(ProtobufLogReader.PB_WAL_MAGIC); - output.write(buf.getBuffer(), 0, buf.size()); - output.flush(null, handler, false); + eventLoop.execute(() -> { + output.write(magic); + try { + header.writeDelimitedTo(asyncOutputWrapper); + } catch (IOException e) { + // should not happen + throw new AssertionError(e); } + output.flush(null, handler, false); }); return handler.get(); } @@ -180,22 +216,23 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements @Override protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic) throws IOException { - buf.reset(); - trailer.writeTo(buf); final BlockingCompletionHandler handler = new BlockingCompletionHandler(); - eventLoop.execute(new Runnable() { - public void run() { - output.write(buf.getBuffer(), 0, buf.size()); - output.write(Ints.toByteArray(buf.size())); - output.write(magic); - output.flush(null, handler, false); + eventLoop.execute(() -> { + try { + trailer.writeTo(asyncOutputWrapper); + } catch (IOException e) { + // should not happen + throw new AssertionError(e); } + output.writeInt(trailer.getSerializedSize()); + output.write(magic); + output.flush(null, handler, false); }); return handler.get(); } @Override protected OutputStream getOutputStreamForCellEncoder() { - return buf; + return asyncOutputWrapper; } }