HADOOP-13483. Optimize IPC server protobuf decoding. Contributed by Daryn Sharp.

(cherry picked from commit 580a833496)

Conflicts:
	hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
This commit is contained in:
Kihwal Lee 2016-08-03 14:12:02 -05:00
parent 9a0ac56a5c
commit 9d62caa440
4 changed files with 361 additions and 61 deletions

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RpcWritable;
import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto; import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@ -68,7 +69,7 @@ public class ProtobufRpcEngine implements RpcEngine {
static { // Register the rpcRequest deserializer for WritableRpcEngine static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine( org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcWritable.Buffer.class,
new Server.ProtoBufRpcInvoker()); new Server.ProtoBufRpcInvoker());
} }
@ -612,8 +613,9 @@ public class ProtobufRpcEngine implements RpcEngine {
*/ */
public Writable call(RPC.Server server, String protocol, public Writable call(RPC.Server server, String protocol,
Writable writableRequest, long receiveTime) throws Exception { Writable writableRequest, long receiveTime) throws Exception {
RpcRequestWrapper request = (RpcRequestWrapper) writableRequest; RpcWritable.Buffer request = (RpcWritable.Buffer) writableRequest;
RequestHeaderProto rpcRequest = request.requestHeader; RequestHeaderProto rpcRequest =
request.getValue(RequestHeaderProto.getDefaultInstance());
String methodName = rpcRequest.getMethodName(); String methodName = rpcRequest.getMethodName();
String protoName = rpcRequest.getDeclaringClassProtocolName(); String protoName = rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion(); long clientVersion = rpcRequest.getClientProtocolVersion();
@ -632,8 +634,7 @@ public class ProtobufRpcEngine implements RpcEngine {
throw new RpcNoSuchMethodException(msg); throw new RpcNoSuchMethodException(msg);
} }
Message prototype = service.getRequestPrototype(methodDescriptor); Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType() Message param = request.getValue(prototype);
.mergeFrom(request.theRequestRead).build();
Message result; Message result;
long startTime = Time.now(); long startTime = Time.now();
@ -663,7 +664,7 @@ public class ProtobufRpcEngine implements RpcEngine {
exception.getClass().getSimpleName(); exception.getClass().getSimpleName();
server.updateMetrics(detailedMetricsName, qTime, processingTime); server.updateMetrics(detailedMetricsName, qTime, processingTime);
} }
return new RpcResponseWrapper(result); return RpcWritable.wrap(result);
} }
} }
} }

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
@InterfaceAudience.Private
public abstract class RpcWritable implements Writable {
static RpcWritable wrap(Object o) {
if (o instanceof RpcWritable) {
return (RpcWritable)o;
} else if (o instanceof Message) {
return new ProtobufWrapper((Message)o);
} else if (o instanceof Writable) {
return new WritableWrapper((Writable)o);
}
throw new IllegalArgumentException("Cannot wrap " + o.getClass());
}
// don't support old inefficient Writable methods.
@Override
public final void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public final void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException();
}
// methods optimized for reduced intermediate byte[] allocations.
abstract void writeTo(ResponseBuffer out) throws IOException;
abstract <T> T readFrom(ByteBuffer bb) throws IOException;
// adapter for Writables.
static class WritableWrapper extends RpcWritable {
private final Writable writable;
WritableWrapper(Writable writable) {
this.writable = writable;
}
@Override
public void writeTo(ResponseBuffer out) throws IOException {
writable.write(out);
}
@SuppressWarnings("unchecked")
@Override
<T> T readFrom(ByteBuffer bb) throws IOException {
// create a stream that may consume up to the entire ByteBuffer.
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
bb.array(), bb.position() + bb.arrayOffset(), bb.remaining()));
try {
writable.readFields(in);
} finally {
// advance over the bytes read.
bb.position(bb.limit() - in.available());
}
return (T)writable;
}
}
// adapter for Protobufs.
static class ProtobufWrapper extends RpcWritable {
private Message message;
ProtobufWrapper(Message message) {
this.message = message;
}
@Override
void writeTo(ResponseBuffer out) throws IOException {
int length = message.getSerializedSize();
length += CodedOutputStream.computeRawVarint32Size(length);
out.ensureCapacity(length);
message.writeDelimitedTo(out);
}
@SuppressWarnings("unchecked")
@Override
<T> T readFrom(ByteBuffer bb) throws IOException {
// using the parser with a byte[]-backed coded input stream is the
// most efficient way to deserialize a protobuf. it has a direct
// path to the PB ctor that doesn't create multi-layered streams
// that internally buffer.
CodedInputStream cis = CodedInputStream.newInstance(
bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
try {
cis.pushLimit(cis.readRawVarint32());
message = message.getParserForType().parseFrom(cis);
cis.checkLastTagWas(0);
} finally {
// advance over the bytes read.
bb.position(bb.position() + cis.getTotalBytesRead());
}
return (T)message;
}
}
// adapter to allow decoding of writables and protobufs from a byte buffer.
static class Buffer extends RpcWritable {
private ByteBuffer bb;
static Buffer wrap(ByteBuffer bb) {
return new Buffer(bb);
}
Buffer() {}
Buffer(ByteBuffer bb) {
this.bb = bb;
}
@Override
void writeTo(ResponseBuffer out) throws IOException {
out.ensureCapacity(bb.remaining());
out.write(bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
}
@SuppressWarnings("unchecked")
@Override
<T> T readFrom(ByteBuffer bb) throws IOException {
// effectively consume the rest of the buffer from the callers
// perspective.
this.bb = bb.slice();
bb.limit(bb.position());
return (T)this;
}
public <T> T newInstance(Class<T> valueClass,
Configuration conf) throws IOException {
T instance;
try {
// this is much faster than ReflectionUtils!
instance = valueClass.newInstance();
if (instance instanceof Configurable) {
((Configurable)instance).setConf(conf);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return getValue(instance);
}
public <T> T getValue(T value) throws IOException {
return RpcWritable.wrap(value).readFrom(bb);
}
int remaining() {
return bb.remaining();
}
}
}

View File

@ -25,7 +25,6 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
@ -82,8 +81,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcWrapper;
import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch; import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
@ -113,7 +110,6 @@ import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.htrace.core.SpanId; import org.apache.htrace.core.SpanId;
@ -122,9 +118,7 @@ import org.apache.htrace.core.Tracer;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a /** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on * parameter, and return a {@link Writable} as their value. A service runs on
@ -1345,6 +1339,7 @@ public abstract class Server {
* A WrappedRpcServerException that is suppressed altogether * A WrappedRpcServerException that is suppressed altogether
* for the purposes of logging. * for the purposes of logging.
*/ */
@SuppressWarnings("serial")
private static class WrappedRpcServerExceptionSuppressed private static class WrappedRpcServerExceptionSuppressed
extends WrappedRpcServerException { extends WrappedRpcServerException {
public WrappedRpcServerExceptionSuppressed( public WrappedRpcServerExceptionSuppressed(
@ -1474,10 +1469,10 @@ public abstract class Server {
} }
} }
private void saslReadAndProcess(DataInputStream dis) throws private void saslReadAndProcess(RpcWritable.Buffer buffer) throws
WrappedRpcServerException, IOException, InterruptedException { WrappedRpcServerException, IOException, InterruptedException {
final RpcSaslProto saslMessage = final RpcSaslProto saslMessage =
decodeProtobufFromStream(RpcSaslProto.newBuilder(), dis); getMessage(RpcSaslProto.getDefaultInstance(), buffer);
switch (saslMessage.getState()) { switch (saslMessage.getState()) {
case WRAP: { case WRAP: {
if (!saslContextEstablished || !useWrap) { if (!saslContextEstablished || !useWrap) {
@ -1675,7 +1670,7 @@ public abstract class Server {
RpcConstants.INVALID_RETRY_COUNT, null, this); RpcConstants.INVALID_RETRY_COUNT, null, this);
setupResponse(saslCall, setupResponse(saslCall,
RpcStatusProto.SUCCESS, null, RpcStatusProto.SUCCESS, null,
new RpcResponseWrapper(message), null, null); RpcWritable.wrap(message), null, null);
saslCall.sendResponse(); saslCall.sendResponse();
} }
@ -1780,7 +1775,7 @@ public abstract class Server {
dataLengthBuffer.clear(); dataLengthBuffer.clear();
data.flip(); data.flip();
boolean isHeaderRead = connectionContextRead; boolean isHeaderRead = connectionContextRead;
processOneRpc(data.array()); processOneRpc(data);
data = null; data = null;
if (!isHeaderRead) { if (!isHeaderRead) {
continue; continue;
@ -1897,7 +1892,7 @@ public abstract class Server {
* @throws WrappedRpcServerException - if the header cannot be * @throws WrappedRpcServerException - if the header cannot be
* deserialized, or the user is not authorized * deserialized, or the user is not authorized
*/ */
private void processConnectionContext(DataInputStream dis) private void processConnectionContext(RpcWritable.Buffer buffer)
throws WrappedRpcServerException { throws WrappedRpcServerException {
// allow only one connection context during a session // allow only one connection context during a session
if (connectionContextRead) { if (connectionContextRead) {
@ -1905,8 +1900,7 @@ public abstract class Server {
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context already processed"); "Connection context already processed");
} }
connectionContext = decodeProtobufFromStream( connectionContext = getMessage(IpcConnectionContextProto.getDefaultInstance(), buffer);
IpcConnectionContextProto.newBuilder(), dis);
protocolName = connectionContext.hasProtocol() ? connectionContext protocolName = connectionContext.hasProtocol() ? connectionContext
.getProtocol() : null; .getProtocol() : null;
@ -1981,7 +1975,7 @@ public abstract class Server {
if (unwrappedData.remaining() == 0) { if (unwrappedData.remaining() == 0) {
unwrappedDataLengthBuffer.clear(); unwrappedDataLengthBuffer.clear();
unwrappedData.flip(); unwrappedData.flip();
processOneRpc(unwrappedData.array()); processOneRpc(unwrappedData);
unwrappedData = null; unwrappedData = null;
} }
} }
@ -1998,15 +1992,14 @@ public abstract class Server {
* Listener thread * Listener thread
* @throws InterruptedException * @throws InterruptedException
*/ */
private void processOneRpc(byte[] buf) private void processOneRpc(ByteBuffer bb)
throws IOException, WrappedRpcServerException, InterruptedException { throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1; int callId = -1;
int retry = RpcConstants.INVALID_RETRY_COUNT; int retry = RpcConstants.INVALID_RETRY_COUNT;
try { try {
final DataInputStream dis = final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header = final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis); getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
callId = header.getCallId(); callId = header.getCallId();
retry = header.getRetryCount(); retry = header.getRetryCount();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -2015,13 +2008,13 @@ public abstract class Server {
checkRpcHeaders(header); checkRpcHeaders(header);
if (callId < 0) { // callIds typically used during connection setup if (callId < 0) { // callIds typically used during connection setup
processRpcOutOfBandRequest(header, dis); processRpcOutOfBandRequest(header, buffer);
} else if (!connectionContextRead) { } else if (!connectionContextRead) {
throw new WrappedRpcServerException( throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"Connection context not established"); "Connection context not established");
} else { } else {
processRpcRequest(header, dis); processRpcRequest(header, buffer);
} }
} catch (WrappedRpcServerException wrse) { // inform client of error } catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause(); Throwable ioe = wrse.getCause();
@ -2074,7 +2067,7 @@ public abstract class Server {
* @throws InterruptedException * @throws InterruptedException
*/ */
private void processRpcRequest(RpcRequestHeaderProto header, private void processRpcRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException, RpcWritable.Buffer buffer) throws WrappedRpcServerException,
InterruptedException { InterruptedException {
Class<? extends Writable> rpcRequestClass = Class<? extends Writable> rpcRequestClass =
getRpcRequestWrapper(header.getRpcKind()); getRpcRequestWrapper(header.getRpcKind());
@ -2088,8 +2081,7 @@ public abstract class Server {
} }
Writable rpcRequest; Writable rpcRequest;
try { //Read the rpc request try { //Read the rpc request
rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf); rpcRequest = buffer.newInstance(rpcRequestClass, conf);
rpcRequest.readFields(dis);
} catch (Throwable t) { // includes runtime exception from newInstance } catch (Throwable t) { // includes runtime exception from newInstance
LOG.warn("Unable to read call parameters for client " + LOG.warn("Unable to read call parameters for client " +
getHostAddress() + "on connection protocol " + getHostAddress() + "on connection protocol " +
@ -2169,8 +2161,8 @@ public abstract class Server {
* @throws InterruptedException * @throws InterruptedException
*/ */
private void processRpcOutOfBandRequest(RpcRequestHeaderProto header, private void processRpcOutOfBandRequest(RpcRequestHeaderProto header,
DataInputStream dis) throws WrappedRpcServerException, IOException, RpcWritable.Buffer buffer) throws WrappedRpcServerException,
InterruptedException { IOException, InterruptedException {
final int callId = header.getCallId(); final int callId = header.getCallId();
if (callId == CONNECTION_CONTEXT_CALL_ID) { if (callId == CONNECTION_CONTEXT_CALL_ID) {
// SASL must be established prior to connection context // SASL must be established prior to connection context
@ -2180,7 +2172,7 @@ public abstract class Server {
"Connection header sent during SASL negotiation"); "Connection header sent during SASL negotiation");
} }
// read and authorize the user // read and authorize the user
processConnectionContext(dis); processConnectionContext(buffer);
} else if (callId == AuthProtocol.SASL.callId) { } else if (callId == AuthProtocol.SASL.callId) {
// if client was switched to simple, ignore first SASL message // if client was switched to simple, ignore first SASL message
if (authProtocol != AuthProtocol.SASL) { if (authProtocol != AuthProtocol.SASL) {
@ -2188,7 +2180,7 @@ public abstract class Server {
RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER,
"SASL protocol not requested by client"); "SASL protocol not requested by client");
} }
saslReadAndProcess(dis); saslReadAndProcess(buffer);
} else if (callId == PING_CALL_ID) { } else if (callId == PING_CALL_ID) {
LOG.debug("Received ping message"); LOG.debug("Received ping message");
} else { } else {
@ -2235,13 +2227,12 @@ public abstract class Server {
* @throws WrappedRpcServerException - deserialization failed * @throws WrappedRpcServerException - deserialization failed
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private <T extends Message> T decodeProtobufFromStream(Builder builder, <T extends Message> T getMessage(Message message,
DataInputStream dis) throws WrappedRpcServerException { RpcWritable.Buffer buffer) throws WrappedRpcServerException {
try { try {
builder.mergeDelimitedFrom(dis); return (T)buffer.getValue(message);
return (T)builder.build();
} catch (Exception ioe) { } catch (Exception ioe) {
Class<?> protoClass = builder.getDefaultInstanceForType().getClass(); Class<?> protoClass = message.getClass();
throw new WrappedRpcServerException( throw new WrappedRpcServerException(
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST,
"Error decoding " + protoClass.getSimpleName() + ": "+ ioe); "Error decoding " + protoClass.getSimpleName() + ": "+ ioe);
@ -2632,19 +2623,13 @@ public abstract class Server {
private void setupResponse(Call call, private void setupResponse(Call call,
RpcResponseHeaderProto header, Writable rv) throws IOException { RpcResponseHeaderProto header, Writable rv) throws IOException {
ResponseBuffer buf = responseBuffer.get().reset(); ResponseBuffer buf = responseBuffer.get().reset();
// adjust capacity on estimated length to reduce resizing copies try {
int estimatedLen = header.getSerializedSize(); RpcWritable.wrap(header).writeTo(buf);
estimatedLen += CodedOutputStream.computeRawVarint32Size(estimatedLen); if (rv != null) {
// if it's not a wrapped protobuf, just let it grow on its own RpcWritable.wrap(rv).writeTo(buf);
if (rv instanceof RpcWrapper) {
estimatedLen += ((RpcWrapper)rv).getLength();
}
buf.ensureCapacity(estimatedLen);
header.writeDelimitedTo(buf);
if (rv != null) { // null for exceptions
rv.write(buf);
} }
call.setResponse(ByteBuffer.wrap(buf.toByteArray())); call.setResponse(ByteBuffer.wrap(buf.toByteArray()));
} finally {
// Discard a large buf and reset it back to smaller size // Discard a large buf and reset it back to smaller size
// to free up heap. // to free up heap.
if (buf.capacity() > maxRespSize) { if (buf.capacity() > maxRespSize) {
@ -2653,6 +2638,7 @@ public abstract class Server {
buf.setCapacity(INITIAL_RESP_BUF_SIZE); buf.setCapacity(INITIAL_RESP_BUF_SIZE);
} }
} }
}
/** /**
* Setup response for the IPC Call on Fatal Error from a * Setup response for the IPC Call on Fatal Error from a
@ -2701,7 +2687,7 @@ public abstract class Server {
.setState(SaslState.WRAP) .setState(SaslState.WRAP)
.setToken(ByteString.copyFrom(token)) .setToken(ByteString.copyFrom(token))
.build(); .build();
setupResponse(call, saslHeader, new RpcResponseWrapper(saslMessage)); setupResponse(call, saslHeader, RpcWritable.wrap(saslMessage));
} }
} }

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import com.google.protobuf.Message;
public class TestRpcWritable {//extends TestRpcBase {
static Writable writable = new LongWritable(Time.now());
static Message message1 =
EchoRequestProto.newBuilder().setMessage("testing1").build();
static Message message2 =
EchoRequestProto.newBuilder().setMessage("testing2").build();
@Test
public void testWritableWrapper() throws IOException {
// serial writable in byte buffer
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writable.write(new DataOutputStream(baos));
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
// deserial
LongWritable actual = RpcWritable.wrap(new LongWritable())
.readFrom(bb);
Assert.assertEquals(writable, actual);
Assert.assertEquals(0, bb.remaining());
}
@Test
public void testProtobufWrapper() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
message1.writeDelimitedTo(baos);
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
Message actual = RpcWritable.wrap(EchoRequestProto.getDefaultInstance())
.readFrom(bb);
Assert.assertEquals(message1, actual);
Assert.assertEquals(0, bb.remaining());
}
@Test
public void testBufferWrapper() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
message1.writeDelimitedTo(dos);
message2.writeDelimitedTo(dos);
writable.write(dos);
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
RpcWritable.Buffer buf = RpcWritable.Buffer.wrap(bb);
Assert.assertEquals(baos.size(), bb.remaining());
Assert.assertEquals(baos.size(), buf.remaining());
Object actual = buf.getValue(EchoRequestProto.getDefaultInstance());
Assert.assertEquals(message1, actual);
Assert.assertTrue(bb.remaining() > 0);
Assert.assertEquals(bb.remaining(), buf.remaining());
actual = buf.getValue(EchoRequestProto.getDefaultInstance());
Assert.assertEquals(message2, actual);
Assert.assertTrue(bb.remaining() > 0);
Assert.assertEquals(bb.remaining(), buf.remaining());
actual = buf.newInstance(LongWritable.class, null);
Assert.assertEquals(writable, actual);
Assert.assertEquals(0, bb.remaining());
Assert.assertEquals(0, buf.remaining());
}
@Test
public void testBufferWrapperNested() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
writable.write(dos);
message1.writeDelimitedTo(dos);
message2.writeDelimitedTo(dos);
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
RpcWritable.Buffer buf1 = RpcWritable.Buffer.wrap(bb);
Assert.assertEquals(baos.size(), bb.remaining());
Assert.assertEquals(baos.size(), buf1.remaining());
Object actual = buf1.newInstance(LongWritable.class, null);
Assert.assertEquals(writable, actual);
int left = bb.remaining();
Assert.assertTrue(left > 0);
Assert.assertEquals(left, buf1.remaining());
// original bb now appears empty, but rpc writable has a slice of the bb.
RpcWritable.Buffer buf2 = buf1.newInstance(RpcWritable.Buffer.class, null);
Assert.assertEquals(0, bb.remaining());
Assert.assertEquals(0, buf1.remaining());
Assert.assertEquals(left, buf2.remaining());
actual = buf2.getValue(EchoRequestProto.getDefaultInstance());
Assert.assertEquals(message1, actual);
Assert.assertTrue(buf2.remaining() > 0);
actual = buf2.getValue(EchoRequestProto.getDefaultInstance());
Assert.assertEquals(message2, actual);
Assert.assertEquals(0, buf2.remaining());
}
}