HBASE-27097 SimpleRpcServer is broken (#4613)
Replace BufferChain#write(channel,int) with a simpler #write(channel) implementation that does not attempt to "chunk" data to be written. This method was used exclusively by SimpleRpcServer. The code was unnecessarily complex and caused short writes when values were large, so was corrected and simplified. Any difference in performance from this change will be limited to SimpleRpcServer. Testing under load confirms the fix and does not show significant regression. SimpleRpcServer and its related code is now also marked as @Deprecated. Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
648bb6d0eb
commit
ed32043da0
|
@ -23,13 +23,12 @@ import java.nio.channels.GatheringByteChannel;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Chain of ByteBuffers. Used writing out an array of byte buffers. Writes in chunks.
|
* Chain of ByteBuffers. Used writing out an array of byte buffers.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class BufferChain {
|
class BufferChain {
|
||||||
private final ByteBuffer[] buffers;
|
private final ByteBuffer[] buffers;
|
||||||
private int remaining = 0;
|
private int remaining = 0;
|
||||||
private int bufferOffset = 0;
|
|
||||||
private int size;
|
private int size;
|
||||||
|
|
||||||
BufferChain(ByteBuffer... buffers) {
|
BufferChain(ByteBuffer... buffers) {
|
||||||
|
@ -61,51 +60,27 @@ class BufferChain {
|
||||||
return remaining > 0;
|
return remaining > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
long write(GatheringByteChannel channel) throws IOException {
|
||||||
* Write out our chain of buffers in chunks
|
if (!hasRemaining()) {
|
||||||
* @param channel Where to write
|
|
||||||
* @param chunkSize Size of chunks to write.
|
|
||||||
* @return Amount written. n
|
|
||||||
*/
|
|
||||||
long write(GatheringByteChannel channel, int chunkSize) throws IOException {
|
|
||||||
int chunkRemaining = chunkSize;
|
|
||||||
ByteBuffer lastBuffer = null;
|
|
||||||
int bufCount = 0;
|
|
||||||
int restoreLimit = -1;
|
|
||||||
|
|
||||||
while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
|
|
||||||
lastBuffer = buffers[bufferOffset + bufCount];
|
|
||||||
if (!lastBuffer.hasRemaining()) {
|
|
||||||
bufferOffset++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
bufCount++;
|
|
||||||
if (lastBuffer.remaining() > chunkRemaining) {
|
|
||||||
restoreLimit = lastBuffer.limit();
|
|
||||||
lastBuffer.limit(lastBuffer.position() + chunkRemaining);
|
|
||||||
chunkRemaining = 0;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
chunkRemaining -= lastBuffer.remaining();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert lastBuffer != null;
|
|
||||||
if (chunkRemaining == chunkSize) {
|
|
||||||
assert !hasRemaining();
|
|
||||||
// no data left to write
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
try {
|
long written = 0;
|
||||||
long ret = channel.write(buffers, bufferOffset, bufCount);
|
for (ByteBuffer bb : this.buffers) {
|
||||||
if (ret > 0) {
|
if (bb.hasRemaining()) {
|
||||||
remaining = (int) (remaining - ret);
|
final int pos = bb.position();
|
||||||
}
|
final int result = channel.write(bb);
|
||||||
return ret;
|
if (result <= 0) {
|
||||||
} finally {
|
// Write error. Return how much we were able to write until now.
|
||||||
if (restoreLimit >= 0) {
|
return written;
|
||||||
lastBuffer.limit(restoreLimit);
|
}
|
||||||
|
// Adjust the position of buffers already written so we don't write out
|
||||||
|
// duplicate data upon retry of incomplete write with the same buffer chain.
|
||||||
|
bb.position(pos + result);
|
||||||
|
remaining -= result;
|
||||||
|
written += result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return written;
|
||||||
}
|
}
|
||||||
|
|
||||||
int size() {
|
int size() {
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
|
||||||
* put itself on new queue for Responder to pull from and return result to client.
|
* put itself on new queue for Responder to pull from and return result to client.
|
||||||
* @see BlockingRpcClient
|
* @see BlockingRpcClient
|
||||||
*/
|
*/
|
||||||
|
@Deprecated()
|
||||||
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
|
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG })
|
||||||
public class SimpleRpcServer extends RpcServer {
|
public class SimpleRpcServer extends RpcServer {
|
||||||
|
|
||||||
|
@ -464,20 +465,9 @@ public class SimpleRpcServer extends RpcServer {
|
||||||
return listener.getAddress();
|
return listener.getAddress();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This is a wrapper around
|
|
||||||
* {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. If the amount of data
|
|
||||||
* is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many
|
|
||||||
* direct buffers as the size of buffer increases. This also minimizes extra copies in NIO layer
|
|
||||||
* as a result of multiple write operations required to write a large buffer.
|
|
||||||
* @param channel writable byte channel to write to
|
|
||||||
* @param bufferChain Chain of buffers to write
|
|
||||||
* @return number of bytes written
|
|
||||||
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
|
|
||||||
*/
|
|
||||||
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
|
protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
|
long count = bufferChain.write(channel);
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
this.metrics.sentBytes(count);
|
this.metrics.sentBytes(count);
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
/**
|
/**
|
||||||
* Sends responses of RPC back to clients.
|
* Sends responses of RPC back to clients.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class SimpleRpcServerResponder extends Thread {
|
class SimpleRpcServerResponder extends Thread {
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
|
||||||
* Datastructure that holds all necessary to a method invocation and then afterward, carries the
|
* Datastructure that holds all necessary to a method invocation and then afterward, carries the
|
||||||
* result.
|
* result.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
|
class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> {
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
|
||||||
/** Reads calls from a connection and queues them for handling. */
|
/** Reads calls from a connection and queues them for handling. */
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
|
||||||
justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
|
justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
|
||||||
|
@Deprecated
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class SimpleServerRpcConnection extends ServerRpcConnection {
|
class SimpleServerRpcConnection extends ServerRpcConnection {
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
@ -33,7 +35,6 @@ import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
|
import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.io.Files;
|
import org.apache.hbase.thirdparty.com.google.common.io.Files;
|
||||||
|
@ -68,53 +69,15 @@ public class TestBufferChain {
|
||||||
assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes()));
|
assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testChainChunkBiggerThanWholeArray() throws IOException {
|
|
||||||
ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
|
|
||||||
BufferChain chain = new BufferChain(bufs);
|
|
||||||
writeAndVerify(chain, "hello world", 8192);
|
|
||||||
assertNoRemaining(bufs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testChainChunkBiggerThanSomeArrays() throws IOException {
|
|
||||||
ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
|
|
||||||
BufferChain chain = new BufferChain(bufs);
|
|
||||||
writeAndVerify(chain, "hello world", 3);
|
|
||||||
assertNoRemaining(bufs);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLimitOffset() throws IOException {
|
public void testLimitOffset() throws IOException {
|
||||||
ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1),
|
ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1),
|
||||||
stringBuf("XXXXworldY", 4, 5) };
|
stringBuf("XXXXworldY", 4, 5) };
|
||||||
BufferChain chain = new BufferChain(bufs);
|
BufferChain chain = new BufferChain(bufs);
|
||||||
writeAndVerify(chain, "hello world", 3);
|
writeAndVerify(chain, "hello world");
|
||||||
assertNoRemaining(bufs);
|
assertNoRemaining(bufs);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWithSpy() throws IOException {
|
|
||||||
ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1),
|
|
||||||
stringBuf("XXXXworldY", 4, 5) };
|
|
||||||
BufferChain chain = new BufferChain(bufs);
|
|
||||||
FileOutputStream fos = new FileOutputStream(tmpFile);
|
|
||||||
FileChannel ch = Mockito.spy(fos.getChannel());
|
|
||||||
try {
|
|
||||||
chain.write(ch, 2);
|
|
||||||
assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8));
|
|
||||||
chain.write(ch, 2);
|
|
||||||
assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8));
|
|
||||||
chain.write(ch, 3);
|
|
||||||
assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8));
|
|
||||||
chain.write(ch, 8);
|
|
||||||
assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8));
|
|
||||||
} finally {
|
|
||||||
ch.close();
|
|
||||||
fos.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ByteBuffer stringBuf(String string, int position, int length) {
|
private ByteBuffer stringBuf(String string, int position, int length) {
|
||||||
ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8));
|
ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8));
|
||||||
buf.position(position);
|
buf.position(position);
|
||||||
|
@ -137,14 +100,13 @@ public class TestBufferChain {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeAndVerify(BufferChain chain, String string, int chunkSize) throws IOException {
|
private void writeAndVerify(BufferChain chain, String string) throws IOException {
|
||||||
FileOutputStream fos = new FileOutputStream(tmpFile);
|
FileOutputStream fos = new FileOutputStream(tmpFile);
|
||||||
FileChannel ch = fos.getChannel();
|
FileChannel ch = fos.getChannel();
|
||||||
try {
|
try {
|
||||||
long remaining = string.length();
|
long remaining = string.length();
|
||||||
while (chain.hasRemaining()) {
|
while (chain.hasRemaining()) {
|
||||||
long n = chain.write(ch, chunkSize);
|
long n = chain.write(ch);
|
||||||
assertTrue(n == chunkSize || n == remaining);
|
|
||||||
remaining -= n;
|
remaining -= n;
|
||||||
}
|
}
|
||||||
assertEquals(0, remaining);
|
assertEquals(0, remaining);
|
||||||
|
|
Loading…
Reference in New Issue