HBASE-5451 Switch RPC call envelope/headers to PBs

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1309019 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-04-03 16:26:50 +00:00
parent b5b0116712
commit e72b67e20c
6 changed files with 3065 additions and 89 deletions

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
* OutputStream implementation that wraps a DataOutput.
*/
@InterfaceAudience.Private
class DataOutputOutputStream extends OutputStream {
public class DataOutputOutputStream extends OutputStream {
private final DataOutput out;

View File

@ -46,18 +46,23 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.PoolMap.PoolType;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.protobuf.ByteString;
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
@ -233,8 +238,9 @@ public class HBaseClient {
User ticket = remoteId.getTicket();
Class<? extends VersionedProtocol> protocol = remoteId.getProtocol();
header = new ConnectionHeader(
protocol == null ? null : protocol.getName(), ticket);
ConnectionHeader.Builder builder = ConnectionHeader.newBuilder();
builder.setProtocol(protocol == null ? "" : protocol.getName());
this.header = builder.build();
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +
@ -436,13 +442,8 @@ public class HBaseClient {
private void writeHeader() throws IOException {
out.write(HBaseServer.HEADER.array());
out.write(HBaseServer.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
DataOutputBuffer buf = new DataOutputBuffer();
header.write(buf);
int bufLen = buf.getLength();
out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen);
out.writeInt(header.getSerializedSize());
header.writeTo(out);
}
/* wait till someone signals us to start reading RPC response or
@ -451,7 +452,6 @@ public class HBaseClient {
*
* Return true if it is time to read a response; false otherwise.
*/
@SuppressWarnings({"ThrowableInstanceNeverThrown"})
protected synchronized boolean waitForWork() {
if (calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
long timeout = maxIdleTime-
@ -526,32 +526,24 @@ public class HBaseClient {
if (shouldCloseConnection.get()) {
return;
}
// For serializing the data to be written.
final DataOutputBuffer d = new DataOutputBuffer();
try {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
d.writeInt(0xdeadbeef); // placeholder for data length
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
// fill in the placeholder
Bytes.putInt(data, 0, dataLength - 4);
RpcRequest.Builder builder = RPCProtos.RpcRequest.newBuilder();
builder.setCallId(call.id);
Invocation invocation = (Invocation)call.param;
DataOutputBuffer d = new DataOutputBuffer();
invocation.write(d);
builder.setRequest(ByteString.copyFrom(d.getData()));
//noinspection SynchronizeOnNonFinalField
synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
out.write(data, 0, dataLength);
out.flush();
RpcRequest obj = builder.build();
this.out.writeInt(obj.getSerializedSize());
obj.writeTo(DataOutputOutputStream.constructOutputStream(this.out));
this.out.flush();
}
} catch(IOException e) {
markClosed(e);
} finally {
//the buffer is just an in-memory buffer, but it is still polite to
// close early
IOUtils.closeStream(d);
}
}
@ -566,33 +558,31 @@ public class HBaseClient {
try {
// See HBaseServer.Call.setResponse for where we write out the response.
// It writes the call.id (int), a flag byte, then optionally the length
// of the response (int) followed by data.
// It writes the call.id (int), a boolean signifying any error (and if
// so the exception name/trace), and the response bytes
// Read the call id.
int id = in.readInt();
RpcResponse response = RpcResponse.parseDelimitedFrom(in);
int id = response.getCallId();
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.remove(id);
// Read the flag byte
byte flag = in.readByte();
boolean isError = ResponseFlag.isError(flag);
if (ResponseFlag.isLength(flag)) {
// Currently length if present is unused.
in.readInt();
}
int state = in.readInt(); // Read the state. Currently unused.
boolean isError = response.getError();
if (isError) {
if (call != null) {
//noinspection ThrowableInstanceNeverThrown
call.setException(new RemoteException(WritableUtils.readString(in),
WritableUtils.readString(in)));
call.setException(new RemoteException(
response.getException().getExceptionName(),
response.getException().getStackTrace()));
}
} else {
ByteString responseObj = response.getResponse();
DataInputStream dis =
new DataInputStream(responseObj.newInput());
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
value.readFields(dis); // read value
// it's possible that this call may have been cleaned up due to a RPC
// timeout, so check if it still exists before setting the value.
if (call != null) {

View File

@ -59,22 +59,28 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.DataOutputOutputStream;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.WritableWithSize;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import org.cliffc.high_scale_lib.Counter;
@ -94,7 +100,7 @@ public abstract class HBaseServer implements RpcServer {
* The first four bytes of Hadoop RPC connections
*/
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
public static final byte CURRENT_VERSION = 3;
public static final byte CURRENT_VERSION = 5;
/**
* How many calls/handler are allowed in the queue.
@ -333,40 +339,27 @@ public abstract class HBaseServer implements RpcServer {
ByteBufferOutputStream buf = new ByteBufferOutputStream(size);
DataOutputStream out = new DataOutputStream(buf);
try {
RpcResponse.Builder builder = RpcResponse.newBuilder();
// Call id.
out.writeInt(this.id);
// Write flag.
byte flag = (error != null)?
ResponseFlag.getErrorAndLengthSet(): ResponseFlag.getLengthSetOnly();
out.writeByte(flag);
// Place holder for length set later below after we
// fill the buffer with data.
out.writeInt(0xdeadbeef);
out.writeInt(status.state);
} catch (IOException e) {
errorClass = e.getClass().getName();
error = StringUtils.stringifyException(e);
}
try {
if (error == null) {
result.write(out);
builder.setCallId(this.id);
builder.setError(error != null);
if (error != null) {
RpcException.Builder b = RpcException.newBuilder();
b.setExceptionName(errorClass);
b.setStackTrace(error);
builder.setException(b.build());
} else {
WritableUtils.writeString(out, errorClass);
WritableUtils.writeString(out, error);
DataOutputBuffer d = new DataOutputBuffer(size);
result.write(d);
byte[] response = d.getData();
builder.setResponse(ByteString.copyFrom(response));
}
builder.build().writeDelimitedTo(
DataOutputOutputStream.constructOutputStream(out));
} catch (IOException e) {
LOG.warn("Error sending response to call: ", e);
LOG.warn("Exception while creating response " + e);
}
// Set the length into the ByteBuffer after call id and after
// byte flag.
ByteBuffer bb = buf.getByteBuffer();
int bufSiz = bb.remaining();
// Move to the size location in our ByteBuffer past call.id
// and past the byte flag.
bb.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE);
bb.putInt(bufSiz);
bb.position(0);
this.response = bb;
}
@ -1065,9 +1058,9 @@ public abstract class HBaseServer implements RpcServer {
// disconnected, we can say where it used to connect to.
protected String hostAddress;
protected int remotePort;
ConnectionHeader header = new ConnectionHeader();
ConnectionHeader header;
Class<? extends VersionedProtocol> protocol;
protected User ticket = null;
protected User user = null;
public Connection(SocketChannel channel, long lastContact) {
this.channel = channel;
@ -1231,26 +1224,21 @@ public abstract class HBaseServer implements RpcServer {
/// Reads the connection header following version
private void processHeader() throws IOException {
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(data.array()));
header.readFields(in);
header = ConnectionHeader.parseFrom(new ByteArrayInputStream(data.array()));
try {
String protocolClassName = header.getProtocol();
if (protocolClassName == null) {
protocolClassName = "org.apache.hadoop.hbase.ipc.HRegionInterface";
}
protocol = getProtocolClass(protocolClassName, conf);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Unknown protocol: " + header.getProtocol());
}
ticket = header.getUser();
user = User.createUser(header);
}
protected void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
RpcRequest request = RpcRequest.parseFrom(buf);
int id = request.getCallId();
ByteString clientRequest = request.getRequest();
long callSize = buf.length;
if (LOG.isDebugEnabled()) {
@ -1271,6 +1259,8 @@ public abstract class HBaseServer implements RpcServer {
Writable param;
try {
DataInputStream dis =
new DataInputStream(clientRequest.newInput());
param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis);
} catch (Throwable t) {
@ -1372,12 +1362,12 @@ public abstract class HBaseServer implements RpcServer {
throw new ServerNotRunningYetException("Server is not running yet");
if (LOG.isDebugEnabled()) {
User remoteUser = call.connection.ticket;
User remoteUser = call.connection.user;
LOG.debug(getName() + ": call #" + call.id + " executing as "
+ (remoteUser == null ? "NULL principal" : remoteUser.getName()));
}
RequestContext.set(call.connection.ticket, getRemoteIp(),
RequestContext.set(call.connection.user, getRemoteIp(),
call.connection.protocol);
// make the call
value = call(call.connection.protocol, call.param, call.timestamp,

File diff suppressed because it is too large Load Diff

View File

@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
import org.apache.hadoop.hbase.util.Methods;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@ -164,6 +166,33 @@ public abstract class User {
return new HadoopUser(ugi);
}
public static User createUser(ConnectionHeader head) {
UserGroupInformation ugi = null;
if (!head.hasUserInfo()) {
return create(null);
}
UserInformation userInfoProto = head.getUserInfo();
String effectiveUser = null;
if (userInfoProto.hasEffectiveUser()) {
effectiveUser = userInfoProto.getEffectiveUser();
}
String realUser = null;
if (userInfoProto.hasRealUser()) {
realUser = userInfoProto.getRealUser();
}
if (effectiveUser != null) {
if (realUser != null) {
UserGroupInformation realUserUgi =
UserGroupInformation.createRemoteUser(realUser);
ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
} else {
ugi = UserGroupInformation.createRemoteUser(effectiveUser);
}
}
return create(ugi);
}
/**
* Generates a new {@code User} instance specifically for use in test code.
* @param name the full username

102
src/main/protobuf/RPC.proto Normal file
View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Specification of (unsecure) HBase RPC:
*
* Client needs to set up a connection first to a server serving a certain
* HBase protocol (like HRegionInterface). Once the connection is set up, the
* client and server communicates on that channel for RPC requests/responses.
* The sections below describe the flow.
*
* As part of setting up a connection to a server, the client needs to send
* the ConnectionHeader header. At the data level, this looks like
* <"hrpc"-bytearray><5-byte><length-of-serialized-ConnectionHeader-obj[int32]><ConnectionHeader-object serialized>
*
* For every RPC that the client makes it needs to send the
* RpcRequest. At the data level this looks like
* <length-of-serialized-RpcRequest-obj><RpcRequest-object serialized>
*
* The server sends back a RpcResponse object as response.
* At the data level this looks like
* <protobuf-encoded-length-of-serialized-RpcResponse-obj><RpcResponse-object serialized>
*
* There is one special message that's sent from client to server -
* the Ping message. At the data level, this is just the bytes corresponding
* to integer -1.
*/
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "RPCProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message UserInformation {
required string effectiveUser = 1;
required string realUser = 2;
}
message ConnectionHeader {
/** User Info beyond what is established at connection establishment
* (applies to secure HBase setup)
*/
optional UserInformation userInfo = 1;
/** Protocol name for next rpc layer
* the client created a proxy with this protocol name
*/
optional string protocol = 2 [default = "org.apache.hadoop.hbase.ipc.HRegionInterface"];
}
/**
* The complete RPC request message
*/
message RpcRequest {
/** Monotonically increasing callId, mostly to keep track of RPCs */
required int32 callId = 1;
/** The request bytes */
optional bytes request = 2;
}
/**
* At the RPC layer, this message is used to indicate
* the server side exception to the RPC client.
*
* HBase RPC client throws an exception indicated
* by exceptionName with the stackTrace.
*/
message RpcException {
/** Class name of the exception thrown from the server */
required string exceptionName = 1;
/** Exception stack trace from the server side */
optional string stackTrace = 2;
}
/**
* The complete RPC response message
*/
message RpcResponse {
/** Echo back the callId the client sent */
required int32 callId = 1;
/** Did the RPC execution encounter an error at the server */
required bool error = 2;
/** Optional response bytes */
optional bytes response = 3;
/** Optional exception when error is true*/
optional RpcException exception = 4;
}