HBASE-27097 SimpleRpcServer is broken (#4613)

Replace BufferChain#write(channel,int) with a simpler #write(channel)
implementation that does not attempt to "chunk" data to be written. This
method was used exclusively by SimpleRpcServer. The code was unnecessarily
complex and caused short writes when values were large, so was corrected
and simplified. Any difference in performance from this change will be
limited to SimpleRpcServer. Testing under load confirms the fix and does
not show significant regression.

SimpleRpcServer and its related code is now also marked as @Deprecated.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>

Conflicts:
	hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
This commit is contained in:
Andrew Purtell 2022-07-12 11:07:23 -07:00
parent b2a145661f
commit 0018cbec58
6 changed files with 29 additions and 99 deletions

View File

@ -23,13 +23,12 @@ import java.nio.channels.GatheringByteChannel;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Chain of ByteBuffers. Used writing out an array of byte buffers. Writes in chunks. * Chain of ByteBuffers. Used writing out an array of byte buffers.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class BufferChain { class BufferChain {
private final ByteBuffer[] buffers; private final ByteBuffer[] buffers;
private int remaining = 0; private int remaining = 0;
private int bufferOffset = 0;
private int size; private int size;
BufferChain(ByteBuffer... buffers) { BufferChain(ByteBuffer... buffers) {
@ -61,51 +60,27 @@ class BufferChain {
return remaining > 0; return remaining > 0;
} }
/** long write(GatheringByteChannel channel) throws IOException {
* Write out our chain of buffers in chunks if (!hasRemaining()) {
* @param channel Where to write
* @param chunkSize Size of chunks to write.
* @return Amount written. n
*/
long write(GatheringByteChannel channel, int chunkSize) throws IOException {
int chunkRemaining = chunkSize;
ByteBuffer lastBuffer = null;
int bufCount = 0;
int restoreLimit = -1;
while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
lastBuffer = buffers[bufferOffset + bufCount];
if (!lastBuffer.hasRemaining()) {
bufferOffset++;
continue;
}
bufCount++;
if (lastBuffer.remaining() > chunkRemaining) {
restoreLimit = lastBuffer.limit();
lastBuffer.limit(lastBuffer.position() + chunkRemaining);
chunkRemaining = 0;
break;
} else {
chunkRemaining -= lastBuffer.remaining();
}
}
assert lastBuffer != null;
if (chunkRemaining == chunkSize) {
assert !hasRemaining();
// no data left to write
return 0; return 0;
} }
try { long written = 0;
long ret = channel.write(buffers, bufferOffset, bufCount); for (ByteBuffer bb : this.buffers) {
if (ret > 0) { if (bb.hasRemaining()) {
remaining = (int) (remaining - ret); final int pos = bb.position();
final int result = channel.write(bb);
if (result <= 0) {
// Write error. Return how much we were able to write until now.
return written;
} }
return ret; // Adjust the position of buffers already written so we don't write out
} finally { // duplicate data upon retry of incomplete write with the same buffer chain.
if (restoreLimit >= 0) { bb.position(pos + result);
lastBuffer.limit(restoreLimit); remaining -= result;
written += result;
} }
} }
return written;
} }
int size() { int size() {

View File

@ -75,6 +75,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Message;
* put itself on new queue for Responder to pull from and return result to client. * put itself on new queue for Responder to pull from and return result to client.
* @see BlockingRpcClient * @see BlockingRpcClient
*/ */
@Deprecated
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
public class SimpleRpcServer extends RpcServer { public class SimpleRpcServer extends RpcServer {
@ -487,20 +488,9 @@ public class SimpleRpcServer extends RpcServer {
return call(fakeCall, status); return call(fakeCall, status);
} }
/**
* This is a wrapper around
* {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. If the amount of data
* is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
* direct buffers as the size of buffer increases. This also minimizes extra copies in NIO layer
* as a result of multiple write operations required to write a large buffer.
* @param channel writable byte channel to write to
* @param bufferChain Chain of buffers to write
* @return number of bytes written
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
*/
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
throws IOException { throws IOException {
long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); long count = bufferChain.write(channel);
if (count > 0) { if (count > 0) {
this.metrics.sentBytes(count); this.metrics.sentBytes(count);
} }

View File

@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Sends responses of RPC back to clients. * Sends responses of RPC back to clients.
*/ */
@Deprecated
@InterfaceAudience.Private @InterfaceAudience.Private
class SimpleRpcServerResponder extends Thread { class SimpleRpcServerResponder extends Thread {

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
* Datastructure that holds all necessary to a method invocation and then afterward, carries the * Datastructure that holds all necessary to a method invocation and then afterward, carries the
* result. * result.
*/ */
@Deprecated
@InterfaceAudience.Private @InterfaceAudience.Private
class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> { class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
/** Reads calls from a connection and queues them for handling. */ /** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
@Deprecated
@InterfaceAudience.Private @InterfaceAudience.Private
class SimpleServerRpcConnection extends ServerRpcConnection { class SimpleServerRpcConnection extends ServerRpcConnection {

View File

@ -17,7 +17,9 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -33,7 +35,6 @@ import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.common.base.Charsets; import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
import org.apache.hbase.thirdparty.com.google.common.io.Files; import org.apache.hbase.thirdparty.com.google.common.io.Files;
@ -68,53 +69,15 @@ public class TestBufferChain {
assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes())); assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes()));
} }
@Test
public void testChainChunkBiggerThanWholeArray() throws IOException {
ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
BufferChain chain = new BufferChain(bufs);
writeAndVerify(chain, "hello world", 8192);
assertNoRemaining(bufs);
}
@Test
public void testChainChunkBiggerThanSomeArrays() throws IOException {
ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
BufferChain chain = new BufferChain(bufs);
writeAndVerify(chain, "hello world", 3);
assertNoRemaining(bufs);
}
@Test @Test
public void testLimitOffset() throws IOException { public void testLimitOffset() throws IOException {
ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1), ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1),
stringBuf("XXXXworldY", 4, 5) }; stringBuf("XXXXworldY", 4, 5) };
BufferChain chain = new BufferChain(bufs); BufferChain chain = new BufferChain(bufs);
writeAndVerify(chain, "hello world", 3); writeAndVerify(chain, "hello world");
assertNoRemaining(bufs); assertNoRemaining(bufs);
} }
@Test
public void testWithSpy() throws IOException {
ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1),
stringBuf("XXXXworldY", 4, 5) };
BufferChain chain = new BufferChain(bufs);
FileOutputStream fos = new FileOutputStream(tmpFile);
FileChannel ch = Mockito.spy(fos.getChannel());
try {
chain.write(ch, 2);
assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8));
chain.write(ch, 2);
assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8));
chain.write(ch, 3);
assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8));
chain.write(ch, 8);
assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8));
} finally {
ch.close();
fos.close();
}
}
private ByteBuffer stringBuf(String string, int position, int length) { private ByteBuffer stringBuf(String string, int position, int length) {
ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8)); ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8));
buf.position(position); buf.position(position);
@ -137,14 +100,13 @@ public class TestBufferChain {
return ret; return ret;
} }
private void writeAndVerify(BufferChain chain, String string, int chunkSize) throws IOException { private void writeAndVerify(BufferChain chain, String string) throws IOException {
FileOutputStream fos = new FileOutputStream(tmpFile); FileOutputStream fos = new FileOutputStream(tmpFile);
FileChannel ch = fos.getChannel(); FileChannel ch = fos.getChannel();
try { try {
long remaining = string.length(); long remaining = string.length();
while (chain.hasRemaining()) { while (chain.hasRemaining()) {
long n = chain.write(ch, chunkSize); long n = chain.write(ch);
assertTrue(n == chunkSize || n == remaining);
remaining -= n; remaining -= n;
} }
assertEquals(0, remaining); assertEquals(0, remaining);