diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java index 534a467eda1..5877a06d930 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java @@ -23,13 +23,12 @@ import java.nio.channels.GatheringByteChannel; import org.apache.yetus.audience.InterfaceAudience; /** - * Chain of ByteBuffers. Used writing out an array of byte buffers. Writes in chunks. + * Chain of ByteBuffers. Used writing out an array of byte buffers. */ @InterfaceAudience.Private class BufferChain { private final ByteBuffer[] buffers; private int remaining = 0; - private int bufferOffset = 0; private int size; BufferChain(ByteBuffer... buffers) { @@ -61,51 +60,27 @@ class BufferChain { return remaining > 0; } - /** - * Write out our chain of buffers in chunks - * @param channel Where to write - * @param chunkSize Size of chunks to write. - * @return Amount written. n - */ - long write(GatheringByteChannel channel, int chunkSize) throws IOException { - int chunkRemaining = chunkSize; - ByteBuffer lastBuffer = null; - int bufCount = 0; - int restoreLimit = -1; - - while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) { - lastBuffer = buffers[bufferOffset + bufCount]; - if (!lastBuffer.hasRemaining()) { - bufferOffset++; - continue; - } - bufCount++; - if (lastBuffer.remaining() > chunkRemaining) { - restoreLimit = lastBuffer.limit(); - lastBuffer.limit(lastBuffer.position() + chunkRemaining); - chunkRemaining = 0; - break; - } else { - chunkRemaining -= lastBuffer.remaining(); - } - } - assert lastBuffer != null; - if (chunkRemaining == chunkSize) { - assert !hasRemaining(); - // no data left to write + long write(GatheringByteChannel channel) throws IOException { + if (!hasRemaining()) { return 0; } - try { - long ret = channel.write(buffers, bufferOffset, bufCount); - if (ret > 0) { - remaining = (int) (remaining - ret); - } - return ret; - } finally { - if (restoreLimit >= 0) { - lastBuffer.limit(restoreLimit); + long written = 0; + for (ByteBuffer bb : this.buffers) { + if (bb.hasRemaining()) { + final int pos = bb.position(); + final int result = channel.write(bb); + if (result <= 0) { + // Write error. Return how much we were able to write until now. + return written; + } + // Adjust the position of buffers already written so we don't write out + // duplicate data upon retry of incomplete write with the same buffer chain. + bb.position(pos + result); + remaining -= result; + written += result; } } + return written; } int size() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index 5a56fff4ad4..49c861b14ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -69,6 +69,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto * put itself on new queue for Responder to pull from and return result to client. * @see BlockingRpcClient */ +@Deprecated() @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) public class SimpleRpcServer extends RpcServer { @@ -464,20 +465,9 @@ public class SimpleRpcServer extends RpcServer { return listener.getAddress(); } - /** - * This is a wrapper around - * {@link java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)}. If the amount of data - * is large, it writes to channel in smaller chunks. This is to avoid jdk from creating many - * direct buffers as the size of buffer increases. This also minimizes extra copies in NIO layer - * as a result of multiple write operations required to write a large buffer. - * @param channel writable byte channel to write to - * @param bufferChain Chain of buffers to write - * @return number of bytes written - * @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer) - */ protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain) throws IOException { - long count = bufferChain.write(channel, NIO_BUFFER_LIMIT); + long count = bufferChain.write(channel); if (count > 0) { this.metrics.sentBytes(count); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java index 200c4ebd1af..b9d8d3dffc4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServerResponder.java @@ -36,6 +36,7 @@ import org.apache.yetus.audience.InterfaceAudience; /** * Sends responses of RPC back to clients. */ +@Deprecated @InterfaceAudience.Private class SimpleRpcServerResponder extends Thread { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java index 861da8055d1..5c5e9102115 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader * Datastructure that holds all necessary to a method invocation and then afterward, carries the * result. */ +@Deprecated @InterfaceAudience.Private class SimpleServerCall extends ServerCall { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java index ba7a9752a79..51e1bedba57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/") +@Deprecated @InterfaceAudience.Private class SimpleServerRpcConnection extends ServerRpcConnection { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java index df85589ab8b..c7142f34f63 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.ipc; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; @@ -33,7 +35,6 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import org.apache.hbase.thirdparty.com.google.common.base.Charsets; import org.apache.hbase.thirdparty.com.google.common.io.Files; @@ -68,53 +69,15 @@ public class TestBufferChain { assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes())); } - @Test - public void testChainChunkBiggerThanWholeArray() throws IOException { - ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); - BufferChain chain = new BufferChain(bufs); - writeAndVerify(chain, "hello world", 8192); - assertNoRemaining(bufs); - } - - @Test - public void testChainChunkBiggerThanSomeArrays() throws IOException { - ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS); - BufferChain chain = new BufferChain(bufs); - writeAndVerify(chain, "hello world", 3); - assertNoRemaining(bufs); - } - @Test public void testLimitOffset() throws IOException { ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1), stringBuf("XXXXworldY", 4, 5) }; BufferChain chain = new BufferChain(bufs); - writeAndVerify(chain, "hello world", 3); + writeAndVerify(chain, "hello world"); assertNoRemaining(bufs); } - @Test - public void testWithSpy() throws IOException { - ByteBuffer[] bufs = new ByteBuffer[] { stringBuf("XXXhelloYYY", 3, 5), stringBuf(" ", 0, 1), - stringBuf("XXXXworldY", 4, 5) }; - BufferChain chain = new BufferChain(bufs); - FileOutputStream fos = new FileOutputStream(tmpFile); - FileChannel ch = Mockito.spy(fos.getChannel()); - try { - chain.write(ch, 2); - assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8)); - chain.write(ch, 2); - assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8)); - chain.write(ch, 3); - assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8)); - chain.write(ch, 8); - assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8)); - } finally { - ch.close(); - fos.close(); - } - } - private ByteBuffer stringBuf(String string, int position, int length) { ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8)); buf.position(position); @@ -137,14 +100,13 @@ public class TestBufferChain { return ret; } - private void writeAndVerify(BufferChain chain, String string, int chunkSize) throws IOException { + private void writeAndVerify(BufferChain chain, String string) throws IOException { FileOutputStream fos = new FileOutputStream(tmpFile); FileChannel ch = fos.getChannel(); try { long remaining = string.length(); while (chain.hasRemaining()) { - long n = chain.write(ch, chunkSize); - assertTrue(n == chunkSize || n == remaining); + long n = chain.write(ch); remaining -= n; } assertEquals(0, remaining);