HBASE-7479 Remove VersionedProtocol and ProtocolSignature from RPC

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1430677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-01-09 04:23:02 +00:00
parent 9924f8c7a6
commit 4ab4e075bb
33 changed files with 341 additions and 1336 deletions

View File

@ -1637,11 +1637,7 @@ public final class RPCProtos {
boolean hasMethodName();
String getMethodName();
// optional uint64 clientProtocolVersion = 2;
boolean hasClientProtocolVersion();
long getClientProtocolVersion();
// optional bytes request = 3;
// optional bytes request = 2;
boolean hasRequest();
com.google.protobuf.ByteString getRequest();
@ -1710,21 +1706,11 @@ public final class RPCProtos {
}
}
// optional uint64 clientProtocolVersion = 2;
public static final int CLIENTPROTOCOLVERSION_FIELD_NUMBER = 2;
private long clientProtocolVersion_;
public boolean hasClientProtocolVersion() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public long getClientProtocolVersion() {
return clientProtocolVersion_;
}
// optional bytes request = 3;
public static final int REQUEST_FIELD_NUMBER = 3;
// optional bytes request = 2;
public static final int REQUEST_FIELD_NUMBER = 2;
private com.google.protobuf.ByteString request_;
public boolean hasRequest() {
return ((bitField0_ & 0x00000004) == 0x00000004);
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public com.google.protobuf.ByteString getRequest() {
return request_;
@ -1734,7 +1720,7 @@ public final class RPCProtos {
public static final int REQUESTCLASSNAME_FIELD_NUMBER = 4;
private java.lang.Object requestClassName_;
public boolean hasRequestClassName() {
return ((bitField0_ & 0x00000008) == 0x00000008);
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public String getRequestClassName() {
java.lang.Object ref = requestClassName_;
@ -1764,7 +1750,6 @@ public final class RPCProtos {
private void initFields() {
methodName_ = "";
clientProtocolVersion_ = 0L;
request_ = com.google.protobuf.ByteString.EMPTY;
requestClassName_ = "";
}
@ -1788,12 +1773,9 @@ public final class RPCProtos {
output.writeBytes(1, getMethodNameBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt64(2, clientProtocolVersion_);
output.writeBytes(2, request_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, request_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(4, getRequestClassNameBytes());
}
getUnknownFields().writeTo(output);
@ -1811,13 +1793,9 @@ public final class RPCProtos {
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, clientProtocolVersion_);
.computeBytesSize(2, request_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, request_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(4, getRequestClassNameBytes());
}
@ -1849,11 +1827,6 @@ public final class RPCProtos {
result = result && getMethodName()
.equals(other.getMethodName());
}
result = result && (hasClientProtocolVersion() == other.hasClientProtocolVersion());
if (hasClientProtocolVersion()) {
result = result && (getClientProtocolVersion()
== other.getClientProtocolVersion());
}
result = result && (hasRequest() == other.hasRequest());
if (hasRequest()) {
result = result && getRequest()
@ -1877,10 +1850,6 @@ public final class RPCProtos {
hash = (37 * hash) + METHODNAME_FIELD_NUMBER;
hash = (53 * hash) + getMethodName().hashCode();
}
if (hasClientProtocolVersion()) {
hash = (37 * hash) + CLIENTPROTOCOLVERSION_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getClientProtocolVersion());
}
if (hasRequest()) {
hash = (37 * hash) + REQUEST_FIELD_NUMBER;
hash = (53 * hash) + getRequest().hashCode();
@ -2007,12 +1976,10 @@ public final class RPCProtos {
super.clear();
methodName_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
clientProtocolVersion_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
request_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
bitField0_ = (bitField0_ & ~0x00000002);
requestClassName_ = "";
bitField0_ = (bitField0_ & ~0x00000008);
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -2058,14 +2025,10 @@ public final class RPCProtos {
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.clientProtocolVersion_ = clientProtocolVersion_;
result.request_ = request_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.request_ = request_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.requestClassName_ = requestClassName_;
result.bitField0_ = to_bitField0_;
onBuilt();
@ -2086,9 +2049,6 @@ public final class RPCProtos {
if (other.hasMethodName()) {
setMethodName(other.getMethodName());
}
if (other.hasClientProtocolVersion()) {
setClientProtocolVersion(other.getClientProtocolVersion());
}
if (other.hasRequest()) {
setRequest(other.getRequest());
}
@ -2135,18 +2095,13 @@ public final class RPCProtos {
methodName_ = input.readBytes();
break;
}
case 16: {
case 18: {
bitField0_ |= 0x00000002;
clientProtocolVersion_ = input.readUInt64();
break;
}
case 26: {
bitField0_ |= 0x00000004;
request_ = input.readBytes();
break;
}
case 34: {
bitField0_ |= 0x00000008;
bitField0_ |= 0x00000004;
requestClassName_ = input.readBytes();
break;
}
@ -2192,31 +2147,10 @@ public final class RPCProtos {
onChanged();
}
// optional uint64 clientProtocolVersion = 2;
private long clientProtocolVersion_ ;
public boolean hasClientProtocolVersion() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public long getClientProtocolVersion() {
return clientProtocolVersion_;
}
public Builder setClientProtocolVersion(long value) {
bitField0_ |= 0x00000002;
clientProtocolVersion_ = value;
onChanged();
return this;
}
public Builder clearClientProtocolVersion() {
bitField0_ = (bitField0_ & ~0x00000002);
clientProtocolVersion_ = 0L;
onChanged();
return this;
}
// optional bytes request = 3;
// optional bytes request = 2;
private com.google.protobuf.ByteString request_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasRequest() {
return ((bitField0_ & 0x00000004) == 0x00000004);
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public com.google.protobuf.ByteString getRequest() {
return request_;
@ -2225,13 +2159,13 @@ public final class RPCProtos {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000004;
bitField0_ |= 0x00000002;
request_ = value;
onChanged();
return this;
}
public Builder clearRequest() {
bitField0_ = (bitField0_ & ~0x00000004);
bitField0_ = (bitField0_ & ~0x00000002);
request_ = getDefaultInstance().getRequest();
onChanged();
return this;
@ -2240,7 +2174,7 @@ public final class RPCProtos {
// optional string requestClassName = 4;
private java.lang.Object requestClassName_ = "";
public boolean hasRequestClassName() {
return ((bitField0_ & 0x00000008) == 0x00000008);
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public String getRequestClassName() {
java.lang.Object ref = requestClassName_;
@ -2256,19 +2190,19 @@ public final class RPCProtos {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000008;
bitField0_ |= 0x00000004;
requestClassName_ = value;
onChanged();
return this;
}
public Builder clearRequestClassName() {
bitField0_ = (bitField0_ & ~0x00000008);
bitField0_ = (bitField0_ & ~0x00000004);
requestClassName_ = getDefaultInstance().getRequestClassName();
onChanged();
return this;
}
void setRequestClassName(com.google.protobuf.ByteString value) {
bitField0_ |= 0x00000008;
bitField0_ |= 0x00000004;
requestClassName_ = value;
onChanged();
}
@ -3761,17 +3695,16 @@ public final class RPCProtos {
" \001(\0132\020.UserInformation\022?\n\010protocol\030\002 \001(\t" +
":-org.apache.hadoop.hbase.client.ClientP" +
"rotocol\"<\n\020RpcRequestHeader\022\016\n\006callId\030\001 " +
"\002(\r\022\030\n\005tinfo\030\002 \001(\0132\t.RPCTInfo\"n\n\016RpcRequ" +
"estBody\022\022\n\nmethodName\030\001 \002(\t\022\035\n\025clientPro" +
"tocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020r" +
"equestClassName\030\004 \001(\t\"{\n\021RpcResponseHead",
"er\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 \002(\0162\031.Rpc" +
"ResponseHeader.Status\"+\n\006Status\022\013\n\007SUCCE" +
"SS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017RpcRespons" +
"eBody\022\020\n\010response\030\001 \001(\014\"9\n\014RpcException\022" +
"\025\n\rexceptionName\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001" +
"(\tB<\n*org.apache.hadoop.hbase.protobuf.g" +
"eneratedB\tRPCProtosH\001\240\001\001"
"\002(\r\022\030\n\005tinfo\030\002 \001(\0132\t.RPCTInfo\"O\n\016RpcRequ" +
"estBody\022\022\n\nmethodName\030\001 \002(\t\022\017\n\007request\030\002" +
" \001(\014\022\030\n\020requestClassName\030\004 \001(\t\"{\n\021RpcRes" +
"ponseHeader\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 ",
"\002(\0162\031.RpcResponseHeader.Status\"+\n\006Status" +
"\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017R" +
"pcResponseBody\022\020\n\010response\030\001 \001(\014\"9\n\014RpcE" +
"xception\022\025\n\rexceptionName\030\001 \002(\t\022\022\n\nstack" +
"Trace\030\002 \001(\tB<\n*org.apache.hadoop.hbase.p" +
"rotobuf.generatedB\tRPCProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -3807,7 +3740,7 @@ public final class RPCProtos {
internal_static_RpcRequestBody_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RpcRequestBody_descriptor,
new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", "RequestClassName", },
new java.lang.String[] { "MethodName", "Request", "RequestClassName", },
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.class,
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.Builder.class);
internal_static_RpcResponseHeader_descriptor =

View File

@ -86,13 +86,10 @@ message RpcRequestBody {
/** Name of the RPC method */
required string methodName = 1;
/** protocol version of class declaring the called method */
optional uint64 clientProtocolVersion = 2;
/** Bytes corresponding to the client protobuf request. This is the actual
* bytes corresponding to the RPC request argument.
*/
optional bytes request = 3;
optional bytes request = 2;
/** Some metainfo about the request. Helps us to treat RPCs with
* different priorities. For now this is just the classname of the request

View File

@ -0,0 +1,32 @@
package org.apache.hadoop.hbase;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Marker Interface used by ipc. We need a means of referring to
* ipc "protocols" generically. For example, we need to tell an rpc
* server the "protocols" it implements and it helps if all protocols
* implement a common 'type'. That is what this Interface is used for.
*/
// This Interface replaces the old VersionedProtocol Interface. Rather
// than redo a bunch of code its removal, instead we put in place this
// Interface and change all VP references to Protocol references.
// It is moved up here to top-level because it is ugly having members
// of super packages reach down into subpackages.
public interface IpcProtocol {}

View File

@ -21,52 +21,8 @@ package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.security.TokenInfo;
/**
* Protocol that a client uses to communicate with the Master (for admin purposes).
@ -76,274 +32,5 @@ import com.google.protobuf.ServiceException;
@TokenInfo("HBASE_AUTH_TOKEN")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface MasterAdminProtocol extends
MasterAdminService.BlockingInterface, MasterProtocol {
public static final long VERSION = 1L;
/* Column-level */
/**
* Adds a column to the specified table
* @param controller Unused (set to null).
* @param req AddColumnRequest that contains:<br>
* - tableName: table to modify<br>
* - column: column descriptor
* @throws ServiceException
*/
@Override
public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req)
throws ServiceException;
/**
* Deletes a column from the specified table. Table must be disabled.
* @param controller Unused (set to null).
* @param req DeleteColumnRequest that contains:<br>
* - tableName: table to alter<br>
* - columnName: column family to remove
* @throws ServiceException
*/
@Override
public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req)
throws ServiceException;
/**
* Modifies an existing column on the specified table
* @param controller Unused (set to null).
* @param req ModifyColumnRequest that contains:<br>
* - tableName: table name<br>
* - descriptor: new column descriptor
* @throws ServiceException e
*/
@Override
public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req)
throws ServiceException;
/* Region-level */
/**
* Move a region to a specified destination server.
* @param controller Unused (set to null).
* @param req The request that contains:<br>
* - region: The encoded region name; i.e. the hash that makes
* up the region name suffix: e.g. if regionname is
* <code>TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396.</code>,
* then the encoded region name is: <code>527db22f95c8a9e0116f0cc13c680396</code>.<br>
* - destServerName: The servername of the destination regionserver. If
* passed the empty byte array we'll assign to a random server. A server name
* is made of host, port and startcode. Here is an example:
* <code> host187.example.com,60020,1289493121758</code>.
* @throws ServiceException that wraps a UnknownRegionException if we can't find a
* region named <code>encodedRegionName</code>
*/
@Override
public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
throws ServiceException;
/**
* Assign a region to a server chosen at random.
* @param controller Unused (set to null).
* @param req contains the region to assign. Will use existing RegionPlan if one
* found.
* @throws ServiceException
*/
@Override
public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
throws ServiceException;
/**
* Unassign a region from current hosting regionserver. Region will then be
* assigned to a regionserver chosen at random. Region could be reassigned
* back to the same server. Use {@link #moveRegion} if you want to
* control the region movement.
* @param controller Unused (set to null).
* @param req The request that contains:<br>
* - region: Region to unassign. Will clear any existing RegionPlan
* if one found.<br>
* - force: If true, force unassign (Will remove region from
* regions-in-transition too if present as well as from assigned regions --
* radical!.If results in double assignment use hbck -fix to resolve.
* @throws ServiceException
*/
@Override
public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
throws ServiceException;
/**
* Offline a region from the assignment manager's in-memory state. The
* region should be in a closed state and there will be no attempt to
* automatically reassign the region as in unassign. This is a special
* method, and should only be used by experts or hbck.
* @param controller Unused (set to null).
* @param request OfflineRegionRequest that contains:<br>
* - region: Region to offline. Will clear any existing RegionPlan
* if one found.
* @throws ServiceException
*/
@Override
public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request)
throws ServiceException;
/* Table-level */
/**
* Creates a new table asynchronously. If splitKeys are specified, then the
* table will be created with an initial set of multiple regions.
* If splitKeys is null, the table will be created with a single region.
* @param controller Unused (set to null).
* @param req CreateTableRequest that contains:<br>
* - tablesSchema: table descriptor<br>
* - splitKeys
* @throws ServiceException
*/
@Override
public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
throws ServiceException;
/**
* Deletes a table
* @param controller Unused (set to null).
* @param req DeleteTableRequest that contains:<br>
* - tableName: table to delete
* @throws ServiceException
*/
@Override
public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest req)
throws ServiceException;
/**
* Puts the table on-line (only needed if table has been previously taken offline)
* @param controller Unused (set to null).
* @param req EnableTableRequest that contains:<br>
* - tableName: table to enable
* @throws ServiceException
*/
@Override
public EnableTableResponse enableTable(RpcController controller, EnableTableRequest req)
throws ServiceException;
/**
* Take table offline
*
* @param controller Unused (set to null).
* @param req DisableTableRequest that contains:<br>
* - tableName: table to take offline
* @throws ServiceException
*/
@Override
public DisableTableResponse disableTable(RpcController controller, DisableTableRequest req)
throws ServiceException;
/**
* Modify a table's metadata
*
* @param controller Unused (set to null).
* @param req ModifyTableRequest that contains:<br>
* - tableName: table to modify<br>
* - tableSchema: new descriptor for table
* @throws ServiceException
*/
@Override
public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req)
throws ServiceException;
/* Cluster-level */
/**
* Shutdown an HBase cluster.
* @param controller Unused (set to null).
* @param request ShutdownRequest
* @return ShutdownResponse
* @throws ServiceException
*/
@Override
public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request)
throws ServiceException;
/**
* Stop HBase Master only.
* Does not shutdown the cluster.
* @param controller Unused (set to null).
* @param request StopMasterRequest
* @return StopMasterResponse
* @throws ServiceException
*/
@Override
public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request)
throws ServiceException;
/**
* Run the balancer. Will run the balancer and if regions to move, it will
* go ahead and do the reassignments. Can NOT run for various reasons. Check
* logs.
* @param c Unused (set to null).
* @param request BalanceRequest
* @return BalanceResponse that contains:<br>
* - balancerRan: True if balancer ran and was able to tell the region servers to
* unassign all the regions to balance (the re-assignment itself is async),
* false otherwise.
*/
@Override
public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException;
/**
* Turn the load balancer on or off.
* @param controller Unused (set to null).
* @param req SetBalancerRunningRequest that contains:<br>
* - on: If true, enable balancer. If false, disable balancer.<br>
* - synchronous: if true, wait until current balance() call, if outstanding, to return.
* @return SetBalancerRunningResponse that contains:<br>
* - prevBalanceValue: Previous balancer value
* @throws ServiceException
*/
@Override
public SetBalancerRunningResponse setBalancerRunning(
RpcController controller, SetBalancerRunningRequest req) throws ServiceException;
/**
* @param c Unused (set to null).
* @param req IsMasterRunningRequest
* @return IsMasterRunningRequest that contains:<br>
* isMasterRunning: true if master is available
* @throws ServiceException
*/
@Override
public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
throws ServiceException;
/**
* Run a scan of the catalog table
* @param c Unused (set to null).
* @param req CatalogScanRequest
* @return CatalogScanResponse that contains the int return code corresponding
* to the number of entries cleaned
* @throws ServiceException
*/
@Override
public CatalogScanResponse runCatalogScan(RpcController c,
CatalogScanRequest req) throws ServiceException;
/**
* Enable/Disable the catalog janitor
* @param c Unused (set to null).
* @param req EnableCatalogJanitorRequest that contains:<br>
* - enable: If true, enable catalog janitor. If false, disable janitor.<br>
* @return EnableCatalogJanitorResponse that contains:<br>
* - prevValue: true, if it was enabled previously; false, otherwise
* @throws ServiceException
*/
@Override
public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c,
EnableCatalogJanitorRequest req) throws ServiceException;
/**
* Query whether the catalog janitor is enabled
* @param c Unused (set to null).
* @param req IsCatalogJanitorEnabledRequest
* @return IsCatalogCatalogJanitorEnabledResponse that contains:<br>
* - value: true, if it is enabled; false, otherwise
* @throws ServiceException
*/
@Override
public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c,
IsCatalogJanitorEnabledRequest req) throws ServiceException;
}
public interface MasterAdminProtocol
extends MasterAdminService.BlockingInterface, MasterProtocol {}

View File

@ -21,19 +21,8 @@ package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.security.TokenInfo;
/**
* Protocol that a client uses to communicate with the Master (for monitoring purposes).
@ -43,57 +32,5 @@ import com.google.protobuf.ServiceException;
@TokenInfo("HBASE_AUTH_TOKEN")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface MasterMonitorProtocol extends
MasterMonitorService.BlockingInterface, MasterProtocol {
public static final long VERSION = 1L;
/**
* Used by the client to get the number of regions that have received the
* updated schema
*
* @param controller Unused (set to null).
* @param req GetSchemaAlterStatusRequest that contains:<br>
* - tableName
* @return GetSchemaAlterStatusResponse indicating the number of regions updated.
* yetToUpdateRegions is the regions that are yet to be updated totalRegions
* is the total number of regions of the table
* @throws ServiceException
*/
@Override
public GetSchemaAlterStatusResponse getSchemaAlterStatus(
RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException;
/**
* Get list of TableDescriptors for requested tables.
* @param controller Unused (set to null).
* @param req GetTableDescriptorsRequest that contains:<br>
* - tableNames: requested tables, or if empty, all are requested
* @return GetTableDescriptorsResponse
* @throws ServiceException
*/
@Override
public GetTableDescriptorsResponse getTableDescriptors(
RpcController controller, GetTableDescriptorsRequest req) throws ServiceException;
/**
* Return cluster status.
* @param controller Unused (set to null).
* @param req GetClusterStatusRequest
* @return status object
* @throws ServiceException
*/
@Override
public GetClusterStatusResponse getClusterStatus(RpcController controller, GetClusterStatusRequest req)
throws ServiceException;
/**
* @param c Unused (set to null).
* @param req IsMasterRunningRequest
* @return IsMasterRunningRequest that contains:<br>
* isMasterRunning: true if master is available
* @throws ServiceException
*/
@Override
public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
throws ServiceException;
}
public interface MasterMonitorProtocol
extends MasterMonitorService.BlockingInterface, MasterProtocol {}

View File

@ -16,29 +16,14 @@
* limitations under the License.
*/
// Functions implemented by all the master protocols (e.g. MasterAdminProtocol,
// MasterMonitorProtocol). Currently, this is only isMasterRunning, which is used,
// on proxy creation, to check if the master has been stopped. If it has,
// a MasterNotRunningException is thrown back to the client, and the client retries.
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
public interface MasterProtocol extends VersionedProtocol, MasterService.BlockingInterface {
/**
* @param c Unused (set to null).
* @param req IsMasterRunningRequest
* @return IsMasterRunningRequest that contains:<br>
* isMasterRunning: true if master is available
* @throws ServiceException
*/
public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
throws ServiceException;
}
/**
* Functions implemented by all the master protocols: e.g. {@link MasterAdminProtocol}
* and {@link MasterMonitorProtocol}. Currently, the only shared method
* {@link #isMasterRunning(com.google.protobuf.RpcController, org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest)}
* which is used on connection setup to check if the master has been stopped.
*/
public interface MasterProtocol extends IpcProtocol, MasterService.BlockingInterface {}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
@ -33,7 +32,5 @@ import org.apache.hadoop.hbase.security.KerberosInfo;
@TokenInfo("HBASE_AUTH_TOKEN")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface RegionServerStatusProtocol extends
RegionServerStatusService.BlockingInterface, VersionedProtocol {
public static final long VERSION = 1L;
}
public interface RegionServerStatusProtocol
extends RegionServerStatusService.BlockingInterface, IpcProtocol {}

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
@ -31,7 +31,5 @@ import org.apache.hadoop.hbase.security.KerberosInfo;
serverPrincipal = "hbase.regionserver.kerberos.principal")
@TokenInfo("HBASE_AUTH_TOKEN")
@InterfaceAudience.Private
public interface AdminProtocol extends
AdminService.BlockingInterface, VersionedProtocol {
public static final long VERSION = 1L;
}
public interface AdminProtocol
extends AdminService.BlockingInterface, IpcProtocol {}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.security.TokenInfo;
import org.apache.hadoop.hbase.security.KerberosInfo;
@ -33,7 +33,5 @@ import org.apache.hadoop.hbase.security.KerberosInfo;
@TokenInfo("HBASE_AUTH_TOKEN")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface ClientProtocol extends
ClientService.BlockingInterface, VersionedProtocol {
public static final long VERSION = 1L;
}
public interface ClientProtocol
extends ClientService.BlockingInterface, IpcProtocol {}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MasterProtocol;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.RegionMovedException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
@ -74,7 +75,6 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
@ -550,8 +550,8 @@ public class HConnectionManager {
private final Configuration conf;
// Known region ServerName.toString() -> RegionClient/Admin
private final ConcurrentHashMap<String, Map<String, VersionedProtocol>> servers =
new ConcurrentHashMap<String, Map<String, VersionedProtocol>>();
private final ConcurrentHashMap<String, Map<String, IpcProtocol>> servers =
new ConcurrentHashMap<String, Map<String, IpcProtocol>>();
private final ConcurrentHashMap<String, String> connectionLock =
new ConcurrentHashMap<String, String>();
@ -681,12 +681,10 @@ public class HConnectionManager {
public int userCount;
public long keepAliveUntil = Long.MAX_VALUE;
public final Class<? extends MasterProtocol> protocolClass;
public long version;
public MasterProtocolState (
final Class<? extends MasterProtocol> protocolClass, long version) {
final Class<? extends MasterProtocol> protocolClass) {
this.protocolClass = protocolClass;
this.version = version;
}
}
@ -718,9 +716,8 @@ public class HConnectionManager {
InetSocketAddress isa =
new InetSocketAddress(sn.getHostname(), sn.getPort());
MasterProtocol tryMaster = (MasterProtocol) HBaseClientRPC.getProxy(
MasterProtocol tryMaster = (MasterProtocol)HBaseClientRPC.getProxy(
masterProtocolState.protocolClass,
masterProtocolState.version,
isa, this.conf, this.rpcTimeout);
if (tryMaster.isMasterRunning(
@ -1349,17 +1346,16 @@ public class HConnectionManager {
}
@Override
public ClientProtocol getClient(
final String hostname, final int port) throws IOException {
return (ClientProtocol)getProtocol(hostname, port,
clientClass, ClientProtocol.VERSION);
public ClientProtocol getClient(final String hostname, final int port)
throws IOException {
return (ClientProtocol)getProtocol(hostname, port, clientClass);
}
@Override
public AdminProtocol getAdmin(final String hostname,
final int port, final boolean master) throws IOException {
return (AdminProtocol)getProtocol(hostname, port,
adminClass, AdminProtocol.VERSION);
public AdminProtocol getAdmin(final String hostname, final int port,
final boolean master)
throws IOException {
return (AdminProtocol)getProtocol(hostname, port, adminClass);
}
/**
@ -1372,22 +1368,22 @@ public class HConnectionManager {
* @return Proxy.
* @throws IOException
*/
VersionedProtocol getProtocol(final String hostname,
final int port, final Class <? extends VersionedProtocol> protocolClass,
final long version) throws IOException {
IpcProtocol getProtocol(final String hostname,
final int port, final Class <? extends IpcProtocol> protocolClass)
throws IOException {
String rsName = Addressing.createHostAndPortStr(hostname, port);
// See if we already have a connection (common case)
Map<String, VersionedProtocol> protocols = this.servers.get(rsName);
Map<String, IpcProtocol> protocols = this.servers.get(rsName);
if (protocols == null) {
protocols = new HashMap<String, VersionedProtocol>();
Map<String, VersionedProtocol> existingProtocols =
protocols = new HashMap<String, IpcProtocol>();
Map<String, IpcProtocol> existingProtocols =
this.servers.putIfAbsent(rsName, protocols);
if (existingProtocols != null) {
protocols = existingProtocols;
}
}
String protocol = protocolClass.getName();
VersionedProtocol server = protocols.get(protocol);
IpcProtocol server = protocols.get(protocol);
if (server == null) {
// create a unique lock for this RS + protocol (if necessary)
String lockKey = protocol + "@" + rsName;
@ -1402,7 +1398,7 @@ public class HConnectionManager {
InetSocketAddress address = new InetSocketAddress(hostname, port);
// definitely a cache miss. establish an RPC for this RS
server = HBaseClientRPC.waitForProxy(
protocolClass, version, address, this.conf,
protocolClass, address, this.conf,
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
protocols.put(protocol, server);
} catch (RemoteException e) {
@ -1599,9 +1595,9 @@ public class HConnectionManager {
}
MasterProtocolState masterAdminProtocol =
new MasterProtocolState(MasterAdminProtocol.class, MasterAdminProtocol.VERSION);
new MasterProtocolState(MasterAdminProtocol.class);
MasterProtocolState masterMonitorProtocol =
new MasterProtocolState(MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION);
new MasterProtocolState(MasterMonitorProtocol.class);
/**
* This function allows HBaseAdmin and potentially others
@ -2273,8 +2269,8 @@ public class HConnectionManager {
delayedClosing.stop("Closing connection");
if (stopProxy) {
closeMaster();
for (Map<String, VersionedProtocol> i : servers.values()) {
for (VersionedProtocol server: i.values()) {
for (Map<String, IpcProtocol> i : servers.values()) {
for (IpcProtocol server: i.values()) {
HBaseClientRPC.stopProxy(server);
}
}

View File

@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
@ -105,8 +106,8 @@ import com.google.protobuf.Message.Builder;
@InterfaceAudience.Private
public class HBaseClient {
public static final Log LOG = LogFactory
.getLog("org.apache.hadoop.ipc.HBaseClient");
public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap<ConnectionId, Connection> connections;
private static final Map<String, Method> methodInstances =
new ConcurrentHashMap<String, Method>();
@ -190,12 +191,13 @@ public class HBaseClient {
}
public static class FailedServerException extends IOException {
private static final long serialVersionUID = -4744376109431464127L;
public FailedServerException(String s) {
super(s);
}
}
/**
* set the ping interval value in configuration
*
@ -260,9 +262,9 @@ public class HBaseClient {
/** A call waiting for a value. */
protected class Call {
final int id; // call id
final RpcRequestBody param; // rpc request object
Message value; // value, null if error
final int id; // call id
final RpcRequestBody param; // rpc request object
Message value; // value, null if error
IOException error; // exception, null if value
boolean done; // true when call is done
long startTime;
@ -306,6 +308,7 @@ public class HBaseClient {
return this.startTime;
}
}
protected static Map<String,TokenSelector<? extends TokenIdentifier>> tokenHandlers =
new HashMap<String,TokenSelector<? extends TokenIdentifier>>();
static {
@ -339,9 +342,12 @@ public class HBaseClient {
private int reloginMaxBackoff; // max pause before relogin on sasl failure
// currently active calls
protected final ConcurrentSkipListMap<Integer, Call> calls = new ConcurrentSkipListMap<Integer, Call>();
protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time
protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
protected final ConcurrentSkipListMap<Integer, Call> calls =
new ConcurrentSkipListMap<Integer, Call>();
protected final AtomicLong lastActivity =
new AtomicLong(); // last I/O activity time
protected final AtomicBoolean shouldCloseConnection =
new AtomicBoolean(); // indicate if the connection is closed
protected IOException closeException; // close reason
Connection(ConnectionId remoteId) throws IOException {
@ -968,8 +974,8 @@ public class HBaseClient {
}
private Method getMethod(Class<? extends VersionedProtocol> protocol,
String methodName) {
private Method getMethod(Class<? extends IpcProtocol> protocol,
String methodName) {
Method method = methodInstances.get(methodName);
if (method != null) {
return method;
@ -1296,7 +1302,7 @@ public class HBaseClient {
* Throws exceptions if there are network problems or if the remote code
* threw an exception. */
public Message call(RpcRequestBody param, InetSocketAddress addr,
Class<? extends VersionedProtocol> protocol,
Class<? extends IpcProtocol> protocol,
User ticket, int rpcTimeout)
throws InterruptedException, IOException {
Call call = new Call(param);
@ -1362,28 +1368,12 @@ public class HBaseClient {
}
}
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored.
* @param params RpcRequestBody parameters
* @param addresses socket addresses
* @return RpcResponseBody[]
* @throws IOException e
* @deprecated Use {@code #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead
*/
@Deprecated
public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses)
throws IOException, InterruptedException {
return call(params, addresses, null, null);
}
/** Makes a set of calls in parallel. Each parameter is sent to the
* corresponding address. When all values are available, or have timed out
* or errored, the collected results are returned in an array. The array
* contains nulls for calls that timed out or errored. */
public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses,
Class<? extends VersionedProtocol> protocol,
Class<? extends IpcProtocol> protocol,
User ticket)
throws IOException, InterruptedException {
if (addresses.length == 0) return new RpcResponseBody[0];
@ -1418,7 +1408,7 @@ public class HBaseClient {
/* Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused. */
protected Connection getConnection(InetSocketAddress addr,
Class<? extends VersionedProtocol> protocol,
Class<? extends IpcProtocol> protocol,
User ticket,
int rpcTimeout,
Call call)
@ -1461,11 +1451,10 @@ public class HBaseClient {
final InetSocketAddress address;
final User ticket;
final int rpcTimeout;
Class<? extends VersionedProtocol> protocol;
Class<? extends IpcProtocol> protocol;
private static final int PRIME = 16777619;
ConnectionId(InetSocketAddress address,
Class<? extends VersionedProtocol> protocol,
ConnectionId(InetSocketAddress address, Class<? extends IpcProtocol> protocol,
User ticket,
int rpcTimeout) {
this.protocol = protocol;
@ -1478,7 +1467,7 @@ public class HBaseClient {
return address;
}
Class<? extends VersionedProtocol> getProtocol() {
Class<? extends IpcProtocol> getProtocol() {
return protocol;
}
@ -1504,4 +1493,4 @@ public class HBaseClient {
(ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout;
}
}
}
}

View File

@ -19,19 +19,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@ -39,27 +27,43 @@ import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
/**
* An RPC implementation. This class provides the client side implementation.
* An RPC implementation. This class provides the client side.
*/
@InterfaceAudience.Private
public class HBaseClientRPC {
protected static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC");
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC");
// cache of RpcEngines by protocol
private static final Map<Class, RpcClientEngine> PROTOCOL_ENGINES
= new HashMap<Class, RpcClientEngine>();
/**
* Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine} implementation
* to load to handle connection protocols. Handlers for individual protocols can be
* configured using {@code "hbase.rpc.client.engine." + protocol.class.name}.
* Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine}
* implementation to load to handle connection protocols. Handlers for individual
* protocols can be configured using {@code "hbase.rpc.client.engine." +
* protocol.class.name}.
*/
public static final String RPC_ENGINE_PROP = "hbase.rpc.client.engine";
// track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class, RpcClientEngine> PROXY_ENGINES
= new HashMap<Class, RpcClientEngine>();
// cache of RpcEngines by protocol
private static final Map<Class<? extends IpcProtocol>, RpcClientEngine> PROTOCOL_ENGINES =
new HashMap<Class<? extends IpcProtocol>, RpcClientEngine>();
// Track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class<?>, RpcClientEngine> PROXY_ENGINES =
new HashMap<Class<?>, RpcClientEngine>();
// thread-specific RPC timeout, which may override that of RpcEngine
private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
@Override
@ -68,37 +72,31 @@ public class HBaseClientRPC {
}
};
static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
throws NoSuchFieldException, IllegalAccessException {
Field versionField = protocol.getField("VERSION");
versionField.setAccessible(true);
return versionField.getLong(protocol);
}
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine, RpcClientEngine.class);
Class<? extends IpcProtocol> protocol, Class<? extends RpcClientEngine> engine) {
conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine,
RpcClientEngine.class);
}
// return the RpcEngine configured to handle a protocol
static synchronized RpcClientEngine getProtocolEngine(Class protocol,
Configuration conf) {
static synchronized RpcClientEngine getProtocolEngine(
Class<? extends IpcProtocol> protocol, Configuration conf) {
RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
// check for a configured default engine
Class<?> defaultEngine =
conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class);
conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class);
// check for a per interface override
Class<?> impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(),
defaultEngine);
defaultEngine);
LOG.debug("Using " + impl.getName() + " for " + protocol.getName());
engine = (RpcClientEngine) ReflectionUtils.newInstance(impl, conf);
if (protocol.isInterface())
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
protocol),
engine);
if (protocol.isInterface()) {
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), protocol),
engine);
}
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
@ -111,7 +109,6 @@ public class HBaseClientRPC {
/**
* @param protocol protocol interface
* @param clientVersion which client version we expect
* @param addr address of remote service
* @param conf configuration
* @param maxAttempts max attempts
@ -120,23 +117,21 @@ public class HBaseClientRPC {
* @return proxy
* @throws java.io.IOException e
*/
@SuppressWarnings("unchecked")
public static VersionedProtocol waitForProxy(Class protocol,
long clientVersion,
public static IpcProtocol waitForProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr,
Configuration conf,
int maxAttempts,
int rpcTimeout,
long timeout
) throws IOException {
long timeout)
throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
long startTime = System.currentTimeMillis();
IOException ioe;
int reconnectAttempts = 0;
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch (SocketTimeoutException te) { // namenode is busy
return getProxy(protocol, addr, conf, rpcTimeout);
} catch (SocketTimeoutException te) {
LOG.info("Problem connecting to server: " + addr);
ioe = te;
} catch (IOException ioex) {
@ -204,7 +199,6 @@ public class HBaseClientRPC {
* talking to a server at the named address.
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
* @param factory socket factory
@ -212,14 +206,12 @@ public class HBaseClientRPC {
* @return proxy
* @throws java.io.IOException e
*/
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion,
public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
return getProxy(protocol, clientVersion, addr,
User.getCurrent(), conf, factory, rpcTimeout);
return getProxy(protocol, addr, User.getCurrent(), conf, factory, rpcTimeout);
}
/**
@ -227,7 +219,6 @@ public class HBaseClientRPC {
* talking to a server at the named address.
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param ticket ticket
* @param conf configuration
@ -236,15 +227,13 @@ public class HBaseClientRPC {
* @return proxy
* @throws java.io.IOException e
*/
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, User ticket,
public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, User ticket,
Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
RpcClientEngine engine = getProtocolEngine(protocol, conf);
VersionedProtocol proxy = engine
.getProxy(protocol, clientVersion, addr, ticket, conf, factory,
Math.min(rpcTimeout, getRpcTimeout()));
IpcProtocol proxy = engine.getProxy(protocol, addr, ticket, conf, factory,
Math.min(rpcTimeout, getRpcTimeout()));
return proxy;
}
@ -252,21 +241,17 @@ public class HBaseClientRPC {
* Construct a client-side proxy object with the default SocketFactory
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
* @param rpcTimeout timeout for each RPC
* @return a proxy instance
* @throws java.io.IOException e
*/
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
int rpcTimeout)
throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
public static IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, Configuration conf, int rpcTimeout)
throws IOException {
return getProxy(protocol, addr, conf,
NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
}
/**
@ -274,7 +259,7 @@ public class HBaseClientRPC {
*
* @param proxy the proxy to be stopped
*/
public static void stopProxy(VersionedProtocol proxy) {
public static void stopProxy(IpcProtocol proxy) {
if (proxy != null) {
getProxyEngine(proxy).stopProxy(proxy);
}
@ -291,4 +276,4 @@ public class HBaseClientRPC {
public static void resetRpcTimeout() {
rpcTimeout.remove();
}
}
}

View File

@ -67,6 +67,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
@ -168,18 +169,18 @@ public abstract class HBaseServer implements RpcServer {
new ThreadLocal<RpcServer>();
private volatile boolean started = false;
private static final Map<String, Class<? extends VersionedProtocol>>
PROTOCOL_CACHE =
new ConcurrentHashMap<String, Class<? extends VersionedProtocol>>();
private static final Map<String, Class<? extends IpcProtocol>> PROTOCOL_CACHE =
new ConcurrentHashMap<String, Class<? extends IpcProtocol>>();
static Class<? extends VersionedProtocol> getProtocolClass(
@SuppressWarnings("unchecked")
static Class<? extends IpcProtocol> getProtocolClass(
String protocolName, Configuration conf)
throws ClassNotFoundException {
Class<? extends VersionedProtocol> protocol =
Class<? extends IpcProtocol> protocol =
PROTOCOL_CACHE.get(protocolName);
if (protocol == null) {
protocol = (Class<? extends VersionedProtocol>)
protocol = (Class<? extends IpcProtocol>)
conf.getClassByName(protocolName);
PROTOCOL_CACHE.put(protocolName, protocol);
}
@ -271,7 +272,7 @@ public abstract class HBaseServer implements RpcServer {
protected BlockingQueue<Call> replicationQueue;
private int numOfReplicationHandlers = 0;
private Handler[] replicationHandlers = null;
protected HBaseRPCErrorHandler errorHandler = null;
/**
@ -351,7 +352,7 @@ public abstract class HBaseServer implements RpcServer {
if (errorClass != null) {
this.isError = true;
}
ByteBufferOutputStream buf = null;
if (value != null) {
buf = new ByteBufferOutputStream(((Message)value).getSerializedSize());
@ -453,7 +454,7 @@ public abstract class HBaseServer implements RpcServer {
public synchronized boolean isReturnValueDelayed() {
return this.delayReturnValue;
}
@Override
public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException {
if (!connection.channel.isOpen()) {
@ -1111,7 +1112,7 @@ public abstract class HBaseServer implements RpcServer {
protected String hostAddress;
protected int remotePort;
ConnectionHeader header;
Class<? extends VersionedProtocol> protocol;
Class<? extends IpcProtocol> protocol;
protected UserGroupInformation user = null;
private AuthMethod authMethod;
private boolean saslContextEstablished;
@ -1315,7 +1316,7 @@ public abstract class HBaseServer implements RpcServer {
LOG.debug("SASL server context established. Authenticated client: "
+ user + ". Negotiated QoP is "
+ saslServer.getNegotiatedProperty(Sasl.QOP));
}
}
metrics.authenticationSuccess();
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
saslContextEstablished = true;
@ -1428,7 +1429,7 @@ public abstract class HBaseServer implements RpcServer {
}
}
if (dataLength < 0) {
throw new IllegalArgumentException("Unexpected data length "
throw new IllegalArgumentException("Unexpected data length "
+ dataLength + "!! from " + getHostAddress());
}
data = ByteBuffer.allocate(dataLength);
@ -1749,7 +1750,7 @@ public abstract class HBaseServer implements RpcServer {
status.pause("Waiting for a call");
Call call = myCallQueue.take(); // pop the queue; maybe blocked here
status.setStatus("Setting up call");
status.setConnection(call.connection.getHostAddress(),
status.setConnection(call.connection.getHostAddress(),
call.connection.getRemotePort());
if (LOG.isDebugEnabled())
@ -2010,11 +2011,12 @@ public abstract class HBaseServer implements RpcServer {
}
return handlers;
}
public SecretManager<? extends TokenIdentifier> getSecretManager() {
return this.secretManager;
}
@SuppressWarnings("unchecked")
public void setSecretManager(SecretManager<? extends TokenIdentifier> secretManager) {
this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
}
@ -2042,7 +2044,7 @@ public abstract class HBaseServer implements RpcServer {
}
}
}
/** Wait for the server to be stopped.
* Does not wait for all subthreads to finish.
* See {@link #stop()}.
@ -2101,7 +2103,7 @@ public abstract class HBaseServer implements RpcServer {
connection.getProtocol());
}
authManager.authorize(user != null ? user : null,
protocol, getConf(), addr);
protocol, getConf(), addr);
}
}

View File

@ -19,17 +19,17 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
/**
* A simple RPC mechanism.
*
@ -56,8 +56,8 @@ public class HBaseServerRPC {
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServerRPC");
// cache of RpcEngines by protocol
private static final Map<Class, RpcServerEngine> PROTOCOL_ENGINES
= new HashMap<Class, RpcServerEngine>();
private static final Map<Class<? extends IpcProtocol>, RpcServerEngine> PROTOCOL_ENGINES =
new HashMap<Class<? extends IpcProtocol>, RpcServerEngine>();
/**
* Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcServerEngine} implementation to
@ -65,24 +65,20 @@ public class HBaseServerRPC {
* configured using {@code "hbase.rpc.server.engine." + protocol.class.name}.
*/
public static final String RPC_ENGINE_PROP = "hbase.rpc.server.engine";
// track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class, RpcServerEngine> PROXY_ENGINES
= new HashMap<Class, RpcServerEngine>();
private HBaseServerRPC() {
super();
} // no public ctor
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
Class<? extends IpcProtocol> protocol, Class<? extends RpcServerEngine> engine) {
conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine, RpcServerEngine.class);
}
// return the RpcEngine configured to handle a protocol
static synchronized RpcServerEngine getProtocolEngine(Class protocol,
Configuration conf) {
static synchronized RpcServerEngine getProtocolEngine(Class<? extends IpcProtocol> protocol,
Configuration conf) {
RpcServerEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
// check for a configured default engine
@ -94,59 +90,15 @@ public class HBaseServerRPC {
defaultEngine);
LOG.debug("Using " + impl.getName() + " for " + protocol.getName());
engine = (RpcServerEngine) ReflectionUtils.newInstance(impl, conf);
if (protocol.isInterface())
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
protocol),
engine);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
// return the RpcEngine that handles a proxy object
private static synchronized RpcServerEngine getProxyEngine(Object proxy) {
return PROXY_ENGINES.get(proxy.getClass());
}
/**
* Construct a server for a protocol implementation instance listening on a
* port and address.
*
* @param instance instance
* @param bindAddress bind address
* @param port port to bind to
* @param numHandlers number of handlers to start
* @param verbose verbose flag
* @param conf configuration
* @return Server
* @throws IOException e
*/
public static RpcServer getServer(final Object instance,
final Class<?>[] ifaces,
final String bindAddress, final int port,
final int numHandlers,
int metaHandlerCount,
final boolean verbose,
Configuration conf,
int highPriorityLevel)
throws IOException {
return getServer(instance.getClass(),
instance,
ifaces,
bindAddress,
port,
numHandlers,
metaHandlerCount,
verbose,
conf,
highPriorityLevel);
}
/**
* Construct a server for a protocol implementation instance.
*/
public static RpcServer getServer(Class protocol,
public static RpcServer getServer(Class<? extends IpcProtocol> protocol,
final Object instance,
final Class<?>[] ifaces,
String bindAddress,
@ -157,9 +109,8 @@ public class HBaseServerRPC {
Configuration conf,
int highPriorityLevel)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol,
instance,
return getProtocolEngine(protocol, conf).
getServer(instance,
ifaces,
bindAddress,
port,
@ -169,5 +120,4 @@ public class HBaseServerRPC {
conf,
highPriorityLevel);
}
}
}

View File

@ -19,33 +19,27 @@
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.ipc.RemoteException;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
public class ProtobufRpcClientEngine implements RpcClientEngine {
private static final Log LOG =
@ -57,18 +51,18 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
protected final static ClientCache CLIENTS = new ClientCache();
@Override
public VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol, long clientVersion,
public IpcProtocol getProxy(
Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout);
return (VersionedProtocol) Proxy.newProxyInstance(
return (IpcProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);
}
@Override
public void stopProxy(VersionedProtocol proxy) {
public void stopProxy(IpcProtocol proxy) {
if (proxy!=null) {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
@ -77,30 +71,14 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
static class Invoker implements InvocationHandler {
private static final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private Class<? extends VersionedProtocol> protocol;
private Class<? extends IpcProtocol> protocol;
private InetSocketAddress address;
private User ticket;
private HBaseClient client;
private boolean isClosed = false;
final private int rpcTimeout;
private final long clientProtocolVersion;
// For generated protocol classes which don't have VERSION field,
// such as protobuf interfaces.
static final Map<Class<?>, Long>
PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
static {
PROTOCOL_VERSION.put(ClientService.BlockingInterface.class,
Long.valueOf(ClientProtocol.VERSION));
PROTOCOL_VERSION.put(AdminService.BlockingInterface.class,
Long.valueOf(AdminProtocol.VERSION));
PROTOCOL_VERSION.put(RegionServerStatusService.BlockingInterface.class,
Long.valueOf(RegionServerStatusProtocol.VERSION));
PROTOCOL_VERSION.put(MasterMonitorProtocol.class,Long.valueOf(MasterMonitorProtocol.VERSION));
PROTOCOL_VERSION.put(MasterAdminProtocol.class,Long.valueOf(MasterAdminProtocol.VERSION));
}
public Invoker(Class<? extends VersionedProtocol> protocol,
public Invoker(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
this.protocol = protocol;
@ -108,20 +86,6 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
this.rpcTimeout = rpcTimeout;
Long version = PROTOCOL_VERSION.get(protocol);
if (version != null) {
this.clientProtocolVersion = version;
} else {
try {
this.clientProtocolVersion = HBaseClientRPC.getProtocolVersion(protocol);
} catch (NoSuchFieldException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
}
}
}
private RpcRequestBody constructRpcRequest(Method method,
@ -144,7 +108,6 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
}
builder.setRequestClassName(param.getClass().getName());
builder.setRequest(param.toByteString());
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
}
@ -214,5 +177,4 @@ public class ProtobufRpcClientEngine implements RpcClientEngine {
return protoType;
}
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -56,10 +57,10 @@ class ProtobufRpcServerEngine implements RpcServerEngine {
}
@Override
public Server getServer(Class<? extends VersionedProtocol> protocol,
Object instance, Class<?>[] ifaces, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
Configuration conf, int highPriorityLevel) throws IOException {
public Server getServer(Object instance, Class<?>[] ifaces,
String bindAddress, int port, int numHandlers, int metaHandlerCount,
boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException {
return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
metaHandlerCount, verbose, highPriorityLevel);
}
@ -78,9 +79,6 @@ class ProtobufRpcServerEngine implements RpcServerEngine {
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
/** Names for suffixed metrics */
private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec.";
private final int warnResponseTime;
private final int warnResponseSize;
@ -153,9 +151,9 @@ class ProtobufRpcServerEngine implements RpcServerEngine {
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
public Message call(Class<? extends VersionedProtocol> protocol,
RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
throws IOException {
public Message call(Class<? extends IpcProtocol> protocol,
RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
throws IOException {
try {
String methodName = rpcRequest.getMethodName();
Method method = getMethod(protocol, methodName);
@ -170,13 +168,6 @@ class ProtobufRpcServerEngine implements RpcServerEngine {
* The rpcProxy's has a declared protocol name that is
* sent form client to server at connection time.
*/
//TODO: use the clientVersion to do protocol compatibility checks, and
//this could be used here to handle complex use cases like deciding
//which implementation of the protocol should be used to service the
//current request, etc. Ideally, we shouldn't land up in a situation
//where we need to support such a use case.
//For now the clientVersion field is simply ignored
long clientVersion = rpcRequest.getClientProtocolVersion();
if (verbose) {
LOG.info("Call: protocol name=" + protocol.getName() +
@ -243,7 +234,6 @@ class ProtobufRpcServerEngine implements RpcServerEngine {
buffer.append("(");
buffer.append(param.getClass().getName());
buffer.append(")");
buffer.append(", client version="+clientVersion);
logResponse(new Object[]{rpcRequest.getRequest()},
methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"),
status.getClient(), startTime, processingTime, qTime,
@ -271,7 +261,7 @@ class ProtobufRpcServerEngine implements RpcServerEngine {
}
}
static Method getMethod(Class<? extends VersionedProtocol> protocol,
static Method getMethod(Class<? extends IpcProtocol> protocol,
String methodName) {
Method method = methodInstances.get(methodName);
if (method != null) {
@ -377,4 +367,4 @@ class ProtobufRpcServerEngine implements RpcServerEngine {
LOG.info(v);
}
}
}
}

View File

@ -1,243 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
@InterfaceAudience.Private
public class ProtocolSignature implements Writable {
static { // register a ctor
WritableFactories.setFactory
(ProtocolSignature.class,
new WritableFactory() {
public Writable newInstance() { return new ProtocolSignature(); }
});
}
private long version;
private int[] methods = null; // an array of method hash codes
/**
* default constructor
*/
public ProtocolSignature() {
}
/**
* Constructor
*
* @param version server version
* @param methodHashcodes hash codes of the methods supported by server
*/
public ProtocolSignature(long version, int[] methodHashcodes) {
this.version = version;
this.methods = methodHashcodes;
}
public long getVersion() {
return version;
}
public int[] getMethods() {
return methods;
}
@Override
public void readFields(DataInput in) throws IOException {
version = in.readLong();
boolean hasMethods = in.readBoolean();
if (hasMethods) {
int numMethods = in.readInt();
methods = new int[numMethods];
for (int i=0; i<numMethods; i++) {
methods[i] = in.readInt();
}
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(version);
if (methods == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeInt(methods.length);
for (int method : methods) {
out.writeInt(method);
}
}
}
/**
* Calculate a method's hash code considering its method
* name, returning type, and its parameter types
*
* @param method a method
* @return its hash code
*/
static int getFingerprint(Method method) {
int hashcode = method.getName().hashCode();
hashcode = hashcode + 31*method.getReturnType().getName().hashCode();
for (Class<?> type : method.getParameterTypes()) {
hashcode = 31*hashcode ^ type.getName().hashCode();
}
return hashcode;
}
/**
* Convert an array of Method into an array of hash codes
*
* @param methods
* @return array of hash codes
*/
private static int[] getFingerprints(Method[] methods) {
if (methods == null) {
return null;
}
int[] hashCodes = new int[methods.length];
for (int i = 0; i<methods.length; i++) {
hashCodes[i] = getFingerprint(methods[i]);
}
return hashCodes;
}
/**
* Get the hash code of an array of methods
* Methods are sorted before hashcode is calculated.
* So the returned value is irrelevant of the method order in the array.
*
* @param methods an array of methods
* @return the hash code
*/
static int getFingerprint(Method[] methods) {
return getFingerprint(getFingerprints(methods));
}
/**
* Get the hash code of an array of hashcodes
* Hashcodes are sorted before hashcode is calculated.
* So the returned value is irrelevant of the hashcode order in the array.
*
* @param methods an array of methods
* @return the hash code
*/
static int getFingerprint(int[] hashcodes) {
Arrays.sort(hashcodes);
return Arrays.hashCode(hashcodes);
}
private static class ProtocolSigFingerprint {
private ProtocolSignature signature;
private int fingerprint;
ProtocolSigFingerprint(ProtocolSignature sig, int fingerprint) {
this.signature = sig;
this.fingerprint = fingerprint;
}
}
/**
* A cache that maps a protocol's name to its signature & finger print
*/
final private static HashMap<String, ProtocolSigFingerprint>
PROTOCOL_FINGERPRINT_CACHE =
new HashMap<String, ProtocolSigFingerprint>();
/**
* Return a protocol's signature and finger print from cache
*
* @param protocol a protocol class
* @param serverVersion protocol version
* @return its signature and finger print
*/
private static ProtocolSigFingerprint getSigFingerprint(
Class <? extends VersionedProtocol> protocol, long serverVersion) {
String protocolName = protocol.getName();
synchronized (PROTOCOL_FINGERPRINT_CACHE) {
ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName);
if (sig == null) {
int[] serverMethodHashcodes = getFingerprints(protocol.getMethods());
sig = new ProtocolSigFingerprint(
new ProtocolSignature(serverVersion, serverMethodHashcodes),
getFingerprint(serverMethodHashcodes));
PROTOCOL_FINGERPRINT_CACHE.put(protocolName, sig);
}
return sig;
}
}
/**
* Get a server protocol's signature
*
* @param clientMethodsHashCode client protocol methods hashcode
* @param serverVersion server protocol version
* @param protocol protocol
* @return the server's protocol signature
*/
static ProtocolSignature getProtocolSignature(
int clientMethodsHashCode,
long serverVersion,
Class<? extends VersionedProtocol> protocol) {
// try to get the finger print & signature from the cache
ProtocolSigFingerprint sig = getSigFingerprint(protocol, serverVersion);
// check if the client side protocol matches the one on the server side
if (clientMethodsHashCode == sig.fingerprint) {
return new ProtocolSignature(serverVersion, null); // null indicates a match
}
return sig.signature;
}
/**
* Get a server protocol's signature
*
* @param server server implementation
* @param protocol server protocol
* @param clientVersion client's version
* @param clientMethodsHash client's protocol's hash code
* @return the server protocol's signature
* @throws IOException if any error occurs
*/
@SuppressWarnings("unchecked")
public static ProtocolSignature getProtocolSignature(VersionedProtocol server,
String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Class<? extends VersionedProtocol> inter;
try {
inter = (Class<? extends VersionedProtocol>)Class.forName(protocol);
} catch (Exception e) {
throw new IOException(e);
}
long serverVersion = server.getProtocolVersion(protocol, clientVersion);
return ProtocolSignature.getProtocolSignature(
clientMethodsHash, serverVersion, inter);
}
}

View File

@ -21,6 +21,7 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User;
import java.net.InetAddress;
@ -90,7 +91,7 @@ public class RequestContext {
*/
public static void set(User user,
InetAddress remoteAddress,
Class<? extends VersionedProtocol> protocol) {
Class<? extends IpcProtocol> protocol) {
RequestContext ctx = instance.get();
ctx.user = user;
ctx.remoteAddress = remoteAddress;
@ -111,12 +112,12 @@ public class RequestContext {
private User user;
private InetAddress remoteAddress;
private Class<? extends VersionedProtocol> protocol;
private Class<? extends IpcProtocol> protocol;
// indicates we're within a RPC request invocation
private boolean inRequest;
private RequestContext(User user, InetAddress remoteAddr,
Class<? extends VersionedProtocol> protocol) {
Class<? extends IpcProtocol> protocol) {
this.user = user;
this.remoteAddress = remoteAddr;
this.protocol = protocol;
@ -130,11 +131,11 @@ public class RequestContext {
return remoteAddress;
}
public Class<? extends VersionedProtocol> getProtocol() {
public Class<? extends IpcProtocol> getProtocol() {
return protocol;
}
public boolean isInRequest() {
return inRequest;
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User;
import javax.net.SocketFactory;
@ -31,12 +32,10 @@ import java.net.InetSocketAddress;
@InterfaceAudience.Private
public interface RpcClientEngine {
/** Construct a client-side proxy object. */
VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr,
User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException;
IpcProtocol getProxy(Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException;
/** Stop this proxy. */
void stopProxy(VersionedProtocol proxy);
}
void stopProxy(IpcProtocol proxy);
}

View File

@ -23,16 +23,18 @@ import com.google.common.base.Function;
import com.google.protobuf.Message;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
*/
@InterfaceAudience.Private
public interface RpcServer {
// TODO: Needs cleanup. Why a 'start', and then a 'startThreads' and an 'openServer'?
// Also, the call takes a RpcRequestBody, an already composed combination of
// rpc Request and metadata. Should disentangle metadata and rpc Request Message.
void setSocketSendBufSize(int size);
@ -45,12 +47,12 @@ public interface RpcServer {
InetSocketAddress getListenerAddress();
/** Called for each call.
* @param param writable parameter
* @param param parameter
* @param receiveTime time
* @return Message
* @return Message Protobuf response Message
* @throws java.io.IOException e
*/
Message call(Class<? extends VersionedProtocol> protocol,
Message call(Class<? extends IpcProtocol> protocol,
RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
throws IOException;
@ -62,9 +64,8 @@ public interface RpcServer {
void startThreads();
/**
* Returns the metrics instance for reporting RPC call statistics
*/
MetricsHBaseServer getMetrics();
}
}

View File

@ -26,11 +26,9 @@ import java.io.IOException;
/** An RPC implementation for the server. */
@InterfaceAudience.Private
interface RpcServerEngine {
/** Construct a server for a protocol implementation instance. */
RpcServer getServer(Class<? extends VersionedProtocol> protocol, Object instance,
Class<?>[] ifaces, String bindAddress,
int port, int numHandlers, int metaHandlerCount,
boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException;
}
RpcServer getServer(Object instance, Class<?>[] protocols,
String bindAddress, int port, int numHandlers, int metaHandlerCount,
boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException;
}

View File

@ -1,59 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Superclass of all protocols that use Hadoop RPC.
* Subclasses of this interface are also supposed to have
* a static final long versionID field.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface VersionedProtocol {
/**
* Return protocol version corresponding to protocol interface.
* @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks
* @return the version that the server will speak
* @throws IOException if any IO error occurs
*/
@Deprecated
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException;
/**
* Return protocol version corresponding to protocol interface.
* @param protocol The classname of the protocol interface
* @param clientVersion The version of the protocol that the client speaks
* @param clientMethodsHash the hashcode of client protocol methods
* @return the server protocol signature containing its version and
* a list of its supported methods
* @see ProtocolSignature#getProtocolSignature(VersionedProtocol, String,
* long, int) for a default implementation
*/
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion,
int clientMethodsHash) throws IOException;
}

View File

@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.UnknownProtocolException;
@ -965,33 +964,6 @@ Server {
serverManager.expireServer(sn);
}
@Override
public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode)
throws IOException {
if (MasterMonitorProtocol.class.getName().equals(protocol)) {
return new ProtocolSignature(MasterMonitorProtocol.VERSION, null);
} else if (MasterAdminProtocol.class.getName().equals(protocol)) {
return new ProtocolSignature(MasterAdminProtocol.VERSION, null);
} else if (RegionServerStatusProtocol.class.getName().equals(protocol)) {
return new ProtocolSignature(RegionServerStatusProtocol.VERSION, null);
}
throw new IOException("Unknown protocol: " + protocol);
}
public long getProtocolVersion(String protocol, long clientVersion) {
if (MasterMonitorProtocol.class.getName().equals(protocol)) {
return MasterMonitorProtocol.VERSION;
} else if (MasterAdminProtocol.class.getName().equals(protocol)) {
return MasterAdminProtocol.VERSION;
} else if (RegionServerStatusProtocol.class.getName().equals(protocol)) {
return RegionServerStatusProtocol.VERSION;
}
// unknown protocol
LOG.warn("Version requested for unimplemented protocol: "+protocol);
return -1;
}
@Override
public TableDescriptors getTableDescriptors() {
return this.tableDescriptors;

View File

@ -106,7 +106,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
@ -1794,7 +1793,7 @@ public class HRegionServer implements ClientProtocol,
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
master = (RegionServerStatusProtocol) HBaseClientRPC.waitForProxy(
RegionServerStatusProtocol.class, RegionServerStatusProtocol.VERSION,
RegionServerStatusProtocol.class,
isa, this.conf, -1,
this.rpcTimeout, this.rpcTimeout);
LOG.info("Connected to master at " + isa);
@ -2042,31 +2041,6 @@ public class HRegionServer implements ClientProtocol,
return regions.toArray(new HRegionInfo[regions.size()]);
}
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
public ProtocolSignature getProtocolSignature(
String protocol, long version, int clientMethodsHashCode)
throws IOException {
if (protocol.equals(ClientProtocol.class.getName())) {
return new ProtocolSignature(ClientProtocol.VERSION, null);
} else if (protocol.equals(AdminProtocol.class.getName())) {
return new ProtocolSignature(AdminProtocol.VERSION, null);
}
throw new IOException("Unknown protocol: " + protocol);
}
@Override
@QosPriority(priority=HConstants.HIGH_QOS)
public long getProtocolVersion(final String protocol, final long clientVersion)
throws IOException {
if (protocol.equals(ClientProtocol.class.getName())) {
return ClientProtocol.VERSION;
} else if (protocol.equals(AdminProtocol.class.getName())) {
return AdminProtocol.VERSION;
}
throw new IOException("Unknown protocol: " + protocol);
}
@Override
public Leases getLeases() {
return leases;

View File

@ -383,7 +383,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
//file system, the tests should use getBaseTestDir, otherwise, we can use
//the working directory, and create a unique sub dir there
FileSystem fs = getTestFileSystem();
if (fs.getUri().getScheme().equals(fs.getLocal(conf).getUri().getScheme())) {
if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) {
File dataTestDir = new File(getDataTestDir().toString());
dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath());
} else {

View File

@ -29,9 +29,9 @@ import java.net.SocketTimeoutException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.security.User;
import com.google.protobuf.ServiceException;
@ -47,18 +47,18 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine {
public static double chanceOfTimeout = 0.3;
private static AtomicInteger invokations = new AtomicInteger();
public VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol, long clientVersion,
public IpcProtocol getProxy(
Class<? extends IpcProtocol> protocol,
InetSocketAddress addr, User ticket,
Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
// Start up the requested-for proxy so we can pass-through calls to the underlying
// RpcEngine. Also instantiate and return our own proxy (RandomTimeoutInvocationHandler)
// that will either throw exceptions or pass through to the underlying proxy.
VersionedProtocol actualProxy = super.getProxy(protocol, clientVersion, addr,
IpcProtocol actualProxy = super.getProxy(protocol, addr,
ticket, conf, factory, rpcTimeout);
RandomTimeoutInvocationHandler invoker =
new RandomTimeoutInvocationHandler(actualProxy);
VersionedProtocol object = (VersionedProtocol)Proxy.newProxyInstance(
IpcProtocol object = (IpcProtocol)Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);
return object;
}
@ -66,7 +66,8 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine {
/**
* Call this in order to set this class to run as the RpcEngine for the given protocol
*/
public static void setProtocolEngine(Configuration conf, Class protocol) {
public static void setProtocolEngine(Configuration conf,
Class<? extends IpcProtocol> protocol) {
HBaseClientRPC.setProtocolEngine(conf, protocol, RandomTimeoutRpcEngine.class);
}
@ -78,9 +79,9 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine {
}
static class RandomTimeoutInvocationHandler implements InvocationHandler {
private VersionedProtocol actual = null;
private IpcProtocol actual = null;
public RandomTimeoutInvocationHandler(VersionedProtocol actual) {
public RandomTimeoutInvocationHandler(IpcProtocol actual) {
this.actual = actual;
}
@ -96,4 +97,4 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine {
return Proxy.getInvocationHandler(actual).invoke(proxy, method, args);
}
}
}
}

View File

@ -24,22 +24,20 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.Level;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mortbay.log.Log;
@ -69,13 +67,13 @@ public class TestDelayedRpc {
private void testDelayedRpc(boolean delayReturnValue) throws Exception {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseServerRPC.getServer(new TestRpcImpl(delayReturnValue),
TestRpcImpl instance = new TestRpcImpl(delayReturnValue);
rpcServer = HBaseServerRPC.getServer(instance.getClass(), instance,
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0,
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000);
List<Integer> results = new ArrayList<Integer>();
@ -133,11 +131,12 @@ public class TestDelayedRpc {
log.setLevel(Level.WARN);
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseServerRPC.getServer(new TestRpcImpl(true),
TestRpcImpl instance = new TestRpcImpl(true);
rpcServer = HBaseServerRPC.getServer(instance.getClass(), instance,
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0,
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000);
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
@ -165,8 +164,7 @@ public class TestDelayedRpc {
log.removeAppender(listAppender);
}
public interface TestRpc extends VersionedProtocol {
public static final long VERSION = 1L;
public interface TestRpc extends IpcProtocol {
TestResponse test(TestArg delay);
}
@ -213,22 +211,6 @@ public class TestDelayedRpc {
responseBuilder.setResponse(0xDEADBEEF);
return responseBuilder.build();
}
@Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
Method [] methods = this.getClass().getMethods();
int [] hashes = new int [methods.length];
for (int i = 0; i < methods.length; i++) {
hashes[i] = methods[i].hashCode();
}
return new ProtocolSignature(clientVersion, hashes);
}
}
private static class TestThread extends Thread {
@ -263,13 +245,13 @@ public class TestDelayedRpc {
public void testEndDelayThrowing() throws IOException {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseServerRPC.getServer(new FaultyTestRpc(),
FaultyTestRpc instance = new FaultyTestRpc();
rpcServer = HBaseServerRPC.getServer(instance.getClass(), instance,
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0,
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class,
rpcServer.getListenerAddress(), conf, 1000);
int result = 0xDEADBEEF;
@ -312,18 +294,5 @@ public class TestDelayedRpc {
// Client will receive the Exception, not this value.
return TestResponse.newBuilder().setResponse(DELAYED).build();
}
@Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(clientVersion, new int [] {});
}
}
}
}

View File

@ -18,41 +18,40 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.Socket;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import javax.net.SocketFactory;
import java.lang.reflect.Method;
import java.util.*;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.mockito.Mockito.*;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import com.google.protobuf.Message;
import org.apache.hadoop.hbase.security.User;
import org.apache.commons.logging.*;
import org.apache.log4j.Logger;
@Category(SmallTests.class)
public class TestIPC {
public static final Log LOG = LogFactory.getLog(TestIPC.class);
private static final Random RANDOM = new Random();
private static class TestRpcServer extends HBaseServer {
TestRpcServer() throws IOException {
@ -60,11 +59,10 @@ public class TestIPC {
}
@Override
public Message call(Class<? extends VersionedProtocol> protocol,
RpcRequestBody rpcRequest,
long receiveTime,
MonitoredRPCHandler status) throws IOException {
return rpcRequest;
public Message call(Class<? extends IpcProtocol> protocol,
RpcRequestBody param, long receiveTime, MonitoredRPCHandler status)
throws IOException {
return param;
}
}
@ -97,4 +95,4 @@ public class TestIPC {
assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
}
}
}
}

View File

@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -52,7 +53,7 @@ public class TestProtoBufRpc {
private static RpcServer server;
public interface TestRpcService
extends TestProtobufRpcProto.BlockingInterface, VersionedProtocol {
extends TestProtobufRpcProto.BlockingInterface, IpcProtocol {
public long VERSION = 1;
}
@ -76,20 +77,6 @@ public class TestProtoBufRpc {
EmptyRequestProto request) throws ServiceException {
throw new ServiceException("error", new IOException("error"));
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
// TODO Auto-generated method stub
return null;
}
}
@Before
@ -120,7 +107,7 @@ public class TestProtoBufRpc {
HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class, 0,
return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class,
addr, conf, 10000);
}
@ -149,4 +136,4 @@ public class TestProtoBufRpc {
} catch (ServiceException e) {
}
}
}
}

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
@ -186,20 +185,6 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
this.nexts.put(regionName, rs);
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean isStopped() {
// TODO Auto-generated method stub

View File

@ -56,7 +56,7 @@ public class TestHMasterRPCException {
while (i < 20) {
try {
MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseClientRPC.getProxy(
MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100 * 10);
MasterMonitorProtocol.class, isa, conf, 100 * 10);
inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
fail();
} catch (ServiceException ex) {
@ -77,6 +77,4 @@ public class TestHMasterRPCException {
}
fail();
}
}
}

View File

@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.IpcProtocol;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -45,11 +46,9 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@ -86,8 +85,7 @@ public class TestTokenAuthentication {
serverPrincipal = "hbase.test.kerberos.principal")
@TokenInfo("HBASE_AUTH_TOKEN")
private static interface BlockingAuthenticationService
extends AuthenticationProtos.AuthenticationService.BlockingInterface, VersionedProtocol {
long VERSION = 1L;
extends AuthenticationProtos.AuthenticationService.BlockingInterface, IpcProtocol {
}
/**
@ -292,17 +290,6 @@ public class TestTokenAuthentication {
throw new ServiceException(ioe);
}
}
/* VersionedProtocol implementation */
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return BlockingAuthenticationService.VERSION;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(BlockingAuthenticationService.VERSION, null);
}
}
@ -365,7 +352,7 @@ public class TestTokenAuthentication {
testuser.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.TOKEN);
final Configuration conf = TEST_UTIL.getConfiguration();
testuser.setConfiguration(conf);
UserGroupInformation.setConfiguration(conf);
Token<AuthenticationTokenIdentifier> token =
secretManager.generateToken("testuser");
LOG.debug("Got token: " + token.toString());
@ -379,7 +366,6 @@ public class TestTokenAuthentication {
AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
(AuthenticationProtos.AuthenticationService.BlockingInterface)
HBaseClientRPC.waitForProxy(BlockingAuthenticationService.class,
BlockingAuthenticationService.VERSION,
server.getAddress(), c,
HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,

View File

@ -170,4 +170,4 @@ public class MockRegionServerServices implements RegionServerServices {
public Leases getLeases() {
return null;
}
}
}