From ed32043da005475c7ae2d6e5e58f607592536213 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Tue, 12 Jul 2022 11:07:23 -0700 Subject: [PATCH] HBASE-27097 SimpleRpcServer is broken (#4613) Replace BufferChain#write(channel,int) with a simpler #write(channel) implementation that does not attempt to "chunk" data to be written. This method was used exclusively by SimpleRpcServer. The code was unnecessarily complex and caused short writes when values were large, so was corrected and simplified. Any difference in performance from this change will be limited to SimpleRpcServer. Testing under load confirms the fix and does not show significant regression. SimpleRpcServer and its related code is now also marked as @Deprecated. Signed-off-by: Duo Zhang Signed-off-by: Viraj Jasani --- .../apache/hadoop/hbase/ipc/BufferChain.java | 61 ++++++------------- .../hadoop/hbase/ipc/SimpleRpcServer.java | 14 +---- .../hbase/ipc/SimpleRpcServerResponder.java | 1 + .../hadoop/hbase/ipc/SimpleServerCall.java | 1 + .../hbase/ipc/SimpleServerRpcConnection.java | 1 + .../hadoop/hbase/ipc/TestBufferChain.java | 50 ++------------- 6 files changed, 29 insertions(+), 99 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index 534a467eda1..5877a06d930 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -23,13 +23,12 @@ import java.nio.channels.GatheringByteChannel; import org.apache.yetus.audience.InterfaceAudience; /** - * Chain of ByteBuffers. Used writing out an array of byte buffers. Writes in chunks. + * Chain of ByteBuffers. Used writing out an array of byte buffers. */ @InterfaceAudience.Private class BufferChain { private final ByteBuffer[] buffers; private int remaining = 0; - private int bufferOffset = 0; private int size; BufferChain(ByteBuffer... buffers) { @@ -61,51 +60,27 @@ class BufferChain { return remaining > 0; } - /** - * Write out our chain of buffers in chunks - * @param channel Where to write - * @param chunkSize Size of chunks to write. - * @return Amount written. n - */ - long write(GatheringByteChannel channel, int chunkSize) throws IOException { - int chunkRemaining = chunkSize; - ByteBuffer lastBuffer = null; - int bufCount = 0; - int restoreLimit = -1; - - while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) { - lastBuffer = buffers[bufferOffset + bufCount]; - if (!lastBuffer.hasRemaining()) { - bufferOffset++; - continue; - } - bufCount++; - if (lastBuffer.remaining() > chunkRemaining) { - restoreLimit = lastBuffer.limit(); - lastBuffer.limit(lastBuffer.position() + chunkRemaining); - chunkRemaining = 0; - break; - } else { - chunkRemaining -= lastBuffer.remaining(); - } - } - assert lastBuffer != null; - if (chunkRemaining == chunkSize) { - assert !hasRemaining(); - // no data left to write + long write(GatheringByteChannel channel) throws IOException { + if (!hasRemaining()) { return 0; } - try { - long ret = channel.write(buffers, bufferOffset, bufCount); - if (ret > 0) { - remaining = (int) (remaining - ret); - } - return ret; - } finally { - if (restoreLimit >= 0) { - lastBuffer.limit(restoreLimit); + long written = 0; + for (ByteBuffer bb : this.buffers) { + if (bb.hasRemaining()) { + final int pos = bb.position(); + final int result = channel.write(bb); + if (result <= 0) { + // Write error. Return how much we were able to write until now. + return written; + } + // Adjust the position of buffers already written so we don't write out + // duplicate data upon retry of incomplete write with the same buffer chain. + bb.position(pos + result); + remaining -= result; + written += result; } } + return written; } int size() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index 5a56fff4ad4..49c861b14ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -69,6 +69,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * put itself on new queue for Responder to pull from and return result to client. * @see BlockingRpcClient */ +@Deprecated() @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) public class SimpleRpcServer extends RpcServer { @@ -464,20 +465,9 @@ public class SimpleRpcServer extends RpcServer { return listener.getAddress(); } - /** - * This is a wrapper around - * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. If the amount of data - * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many - * direct buffers as the size of buffer increases. This also minimizes extra copies in NIO layer - * as a result of multiple write operations required to write a large buffer. - * @param channel writable byte channel to write to - * @param bufferChain Chain of buffers to write - * @return number of bytes written - * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) - */ protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) throws IOException { - long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); + long count = bufferChain.write(channel); if (count > 0) { this.metrics.sentBytes(count); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java index 200c4ebd1af..b9d8d3dffc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java @@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience; /** * Sends responses of RPC back to clients. */ +@Deprecated @InterfaceAudience.Private class SimpleRpcServerResponder extends Thread { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java index 861da8055d1..5c5e9102115 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader * Datastructure that holds all necessary to a method invocation and then afterward, carries the * result. */ +@Deprecated @InterfaceAudience.Private class SimpleServerCall extends ServerCall { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java index ba7a9752a79..51e1bedba57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") +@Deprecated @InterfaceAudience.Private class SimpleServerRpcConnection extends ServerRpcConnection { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java index df85589ab8b..c7142f34f63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; @@ -33,7 +35,6 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.common.base.Charsets; import org.apache.hbase.thirdparty.com.google.common.io.Files; @@ -68,53 +69,15 @@ public class TestBufferChain { assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes())); } - @Test - public void testChainChunkBiggerThanWholeArray() throws IOException { - ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); - BufferChain chain = new BufferChain(bufs); - writeAndVerify(chain, "hello world", 8192); - assertNoRemaining(bufs); - } - - @Test - public void testChainChunkBiggerThanSomeArrays() throws IOException { - ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); - BufferChain chain = new BufferChain(bufs); - writeAndVerify(chain, "hello world", 3); - assertNoRemaining(bufs); - } - @Test public void testLimitOffset() throws IOException { ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1), stringBuf("XXXXworldY", 4, 5) }; BufferChain chain = new BufferChain(bufs); - writeAndVerify(chain, "hello world", 3); + writeAndVerify(chain, "hello world"); assertNoRemaining(bufs); } - @Test - public void testWithSpy() throws IOException { - ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1), - stringBuf("XXXXworldY", 4, 5) }; - BufferChain chain = new BufferChain(bufs); - FileOutputStream fos = new FileOutputStream(tmpFile); - FileChannel ch = Mockito.spy(fos.getChannel()); - try { - chain.write(ch, 2); - assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8)); - chain.write(ch, 2); - assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8)); - chain.write(ch, 3); - assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8)); - chain.write(ch, 8); - assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8)); - } finally { - ch.close(); - fos.close(); - } - } - private ByteBuffer stringBuf(String string, int position, int length) { ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8)); buf.position(position); @@ -137,14 +100,13 @@ public class TestBufferChain { return ret; } - private void writeAndVerify(BufferChain chain, String string, int chunkSize) throws IOException { + private void writeAndVerify(BufferChain chain, String string) throws IOException { FileOutputStream fos = new FileOutputStream(tmpFile); FileChannel ch = fos.getChannel(); try { long remaining = string.length(); while (chain.hasRemaining()) { - long n = chain.write(ch, chunkSize); - assertTrue(n == chunkSize || n == remaining); + long n = chain.write(ch); remaining -= n; } assertEquals(0, remaining);