diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java index 302a4114d51..d900c0a8cb8 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RPCProtos.java @@ -1637,11 +1637,7 @@ public final class RPCProtos { boolean hasMethodName(); String getMethodName(); - // optional uint64 clientProtocolVersion = 2; - boolean hasClientProtocolVersion(); - long getClientProtocolVersion(); - - // optional bytes request = 3; + // optional bytes request = 2; boolean hasRequest(); com.google.protobuf.ByteString getRequest(); @@ -1710,21 +1706,11 @@ public final class RPCProtos { } } - // optional uint64 clientProtocolVersion = 2; - public static final int CLIENTPROTOCOLVERSION_FIELD_NUMBER = 2; - private long clientProtocolVersion_; - public boolean hasClientProtocolVersion() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public long getClientProtocolVersion() { - return clientProtocolVersion_; - } - - // optional bytes request = 3; - public static final int REQUEST_FIELD_NUMBER = 3; + // optional bytes request = 2; + public static final int REQUEST_FIELD_NUMBER = 2; private com.google.protobuf.ByteString request_; public boolean hasRequest() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public com.google.protobuf.ByteString getRequest() { return request_; @@ -1734,7 +1720,7 @@ public final class RPCProtos { public static final int REQUESTCLASSNAME_FIELD_NUMBER = 4; private java.lang.Object requestClassName_; public boolean hasRequestClassName() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public String getRequestClassName() { java.lang.Object ref = requestClassName_; @@ -1764,7 +1750,6 @@ public final class RPCProtos { private void initFields() { methodName_ = ""; - clientProtocolVersion_ = 0L; request_ = com.google.protobuf.ByteString.EMPTY; requestClassName_ = ""; } @@ -1788,12 +1773,9 @@ public final class RPCProtos { output.writeBytes(1, getMethodNameBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeUInt64(2, clientProtocolVersion_); + output.writeBytes(2, request_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeBytes(3, request_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(4, getRequestClassNameBytes()); } getUnknownFields().writeTo(output); @@ -1811,13 +1793,9 @@ public final class RPCProtos { } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(2, clientProtocolVersion_); + .computeBytesSize(2, request_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, request_); - } - if (((bitField0_ & 0x00000008) == 0x00000008)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, getRequestClassNameBytes()); } @@ -1849,11 +1827,6 @@ public final class RPCProtos { result = result && getMethodName() .equals(other.getMethodName()); } - result = result && (hasClientProtocolVersion() == other.hasClientProtocolVersion()); - if (hasClientProtocolVersion()) { - result = result && (getClientProtocolVersion() - == other.getClientProtocolVersion()); - } result = result && (hasRequest() == other.hasRequest()); if (hasRequest()) { result = result && getRequest() @@ -1877,10 +1850,6 @@ public final class RPCProtos { hash = (37 * hash) + METHODNAME_FIELD_NUMBER; hash = (53 * hash) + getMethodName().hashCode(); } - if (hasClientProtocolVersion()) { - hash = (37 * hash) + CLIENTPROTOCOLVERSION_FIELD_NUMBER; - hash = (53 * hash) + hashLong(getClientProtocolVersion()); - } if (hasRequest()) { hash = (37 * hash) + REQUEST_FIELD_NUMBER; hash = (53 * hash) + getRequest().hashCode(); @@ -2007,12 +1976,10 @@ public final class RPCProtos { super.clear(); methodName_ = ""; bitField0_ = (bitField0_ & ~0x00000001); - clientProtocolVersion_ = 0L; - bitField0_ = (bitField0_ & ~0x00000002); request_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); requestClassName_ = ""; - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -2058,14 +2025,10 @@ public final class RPCProtos { if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.clientProtocolVersion_ = clientProtocolVersion_; + result.request_ = request_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.request_ = request_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } result.requestClassName_ = requestClassName_; result.bitField0_ = to_bitField0_; onBuilt(); @@ -2086,9 +2049,6 @@ public final class RPCProtos { if (other.hasMethodName()) { setMethodName(other.getMethodName()); } - if (other.hasClientProtocolVersion()) { - setClientProtocolVersion(other.getClientProtocolVersion()); - } if (other.hasRequest()) { setRequest(other.getRequest()); } @@ -2135,18 +2095,13 @@ public final class RPCProtos { methodName_ = input.readBytes(); break; } - case 16: { + case 18: { bitField0_ |= 0x00000002; - clientProtocolVersion_ = input.readUInt64(); - break; - } - case 26: { - bitField0_ |= 0x00000004; request_ = input.readBytes(); break; } case 34: { - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; requestClassName_ = input.readBytes(); break; } @@ -2192,31 +2147,10 @@ public final class RPCProtos { onChanged(); } - // optional uint64 clientProtocolVersion = 2; - private long clientProtocolVersion_ ; - public boolean hasClientProtocolVersion() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public long getClientProtocolVersion() { - return clientProtocolVersion_; - } - public Builder setClientProtocolVersion(long value) { - bitField0_ |= 0x00000002; - clientProtocolVersion_ = value; - onChanged(); - return this; - } - public Builder clearClientProtocolVersion() { - bitField0_ = (bitField0_ & ~0x00000002); - clientProtocolVersion_ = 0L; - onChanged(); - return this; - } - - // optional bytes request = 3; + // optional bytes request = 2; private com.google.protobuf.ByteString request_ = com.google.protobuf.ByteString.EMPTY; public boolean hasRequest() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public com.google.protobuf.ByteString getRequest() { return request_; @@ -2225,13 +2159,13 @@ public final class RPCProtos { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; request_ = value; onChanged(); return this; } public Builder clearRequest() { - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); request_ = getDefaultInstance().getRequest(); onChanged(); return this; @@ -2240,7 +2174,7 @@ public final class RPCProtos { // optional string requestClassName = 4; private java.lang.Object requestClassName_ = ""; public boolean hasRequestClassName() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public String getRequestClassName() { java.lang.Object ref = requestClassName_; @@ -2256,19 +2190,19 @@ public final class RPCProtos { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; requestClassName_ = value; onChanged(); return this; } public Builder clearRequestClassName() { - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); requestClassName_ = getDefaultInstance().getRequestClassName(); onChanged(); return this; } void setRequestClassName(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; requestClassName_ = value; onChanged(); } @@ -3761,17 +3695,16 @@ public final class RPCProtos { " \001(\0132\020.UserInformation\022?\n\010protocol\030\002 \001(\t" + ":-org.apache.hadoop.hbase.client.ClientP" + "rotocol\"<\n\020RpcRequestHeader\022\016\n\006callId\030\001 " + - "\002(\r\022\030\n\005tinfo\030\002 \001(\0132\t.RPCTInfo\"n\n\016RpcRequ" + - "estBody\022\022\n\nmethodName\030\001 \002(\t\022\035\n\025clientPro" + - "tocolVersion\030\002 \001(\004\022\017\n\007request\030\003 \001(\014\022\030\n\020r" + - "equestClassName\030\004 \001(\t\"{\n\021RpcResponseHead", - "er\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 \002(\0162\031.Rpc" + - "ResponseHeader.Status\"+\n\006Status\022\013\n\007SUCCE" + - "SS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017RpcRespons" + - "eBody\022\020\n\010response\030\001 \001(\014\"9\n\014RpcException\022" + - "\025\n\rexceptionName\030\001 \002(\t\022\022\n\nstackTrace\030\002 \001" + - "(\tB<\n*org.apache.hadoop.hbase.protobuf.g" + - "eneratedB\tRPCProtosH\001\240\001\001" + "\002(\r\022\030\n\005tinfo\030\002 \001(\0132\t.RPCTInfo\"O\n\016RpcRequ" + + "estBody\022\022\n\nmethodName\030\001 \002(\t\022\017\n\007request\030\002" + + " \001(\014\022\030\n\020requestClassName\030\004 \001(\t\"{\n\021RpcRes" + + "ponseHeader\022\016\n\006callId\030\001 \002(\r\022)\n\006status\030\002 ", + "\002(\0162\031.RpcResponseHeader.Status\"+\n\006Status" + + "\022\013\n\007SUCCESS\020\000\022\t\n\005ERROR\020\001\022\t\n\005FATAL\020\002\"#\n\017R" + + "pcResponseBody\022\020\n\010response\030\001 \001(\014\"9\n\014RpcE" + + "xception\022\025\n\rexceptionName\030\001 \002(\t\022\022\n\nstack" + + "Trace\030\002 \001(\tB<\n*org.apache.hadoop.hbase.p" + + "rotobuf.generatedB\tRPCProtosH\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3807,7 +3740,7 @@ public final class RPCProtos { internal_static_RpcRequestBody_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RpcRequestBody_descriptor, - new java.lang.String[] { "MethodName", "ClientProtocolVersion", "Request", "RequestClassName", }, + new java.lang.String[] { "MethodName", "Request", "RequestClassName", }, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.class, org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody.Builder.class); internal_static_RpcResponseHeader_descriptor = diff --git a/hbase-protocol/src/main/protobuf/RPC.proto b/hbase-protocol/src/main/protobuf/RPC.proto index bdd31d2a1c2..90ec7ced26f 100644 --- a/hbase-protocol/src/main/protobuf/RPC.proto +++ b/hbase-protocol/src/main/protobuf/RPC.proto @@ -86,13 +86,10 @@ message RpcRequestBody { /** Name of the RPC method */ required string methodName = 1; - /** protocol version of class declaring the called method */ - optional uint64 clientProtocolVersion = 2; - /** Bytes corresponding to the client protobuf request. This is the actual * bytes corresponding to the RPC request argument. */ - optional bytes request = 3; + optional bytes request = 2; /** Some metainfo about the request. Helps us to treat RPCs with * different priorities. For now this is just the classname of the request diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/IpcProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/IpcProtocol.java new file mode 100644 index 00000000000..3b0c535a34e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/IpcProtocol.java @@ -0,0 +1,32 @@ +package org.apache.hadoop.hbase; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Marker Interface used by ipc. We need a means of referring to + * ipc "protocols" generically. For example, we need to tell an rpc + * server the "protocols" it implements and it helps if all protocols + * implement a common 'type'. That is what this Interface is used for. + */ +// This Interface replaces the old VersionedProtocol Interface. Rather +// than redo a bunch of code its removal, instead we put in place this +// Interface and change all VP references to Protocol references. + +// It is moved up here to top-level because it is ugly having members +// of super packages reach down into subpackages. +public interface IpcProtocol {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java index 57d83efd5a4..0039f16cf91 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterAdminProtocol.java @@ -21,52 +21,8 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService; -import org.apache.hadoop.hbase.security.TokenInfo; import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; - - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.security.TokenInfo; /** * Protocol that a client uses to communicate with the Master (for admin purposes). @@ -76,274 +32,5 @@ import com.google.protobuf.ServiceException; @TokenInfo("HBASE_AUTH_TOKEN") @InterfaceAudience.Private @InterfaceStability.Evolving -public interface MasterAdminProtocol extends - MasterAdminService.BlockingInterface, MasterProtocol { - public static final long VERSION = 1L; - - /* Column-level */ - - /** - * Adds a column to the specified table - * @param controller Unused (set to null). - * @param req AddColumnRequest that contains:
- * - tableName: table to modify
- * - column: column descriptor - * @throws ServiceException - */ - @Override - public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req) - throws ServiceException; - - /** - * Deletes a column from the specified table. Table must be disabled. - * @param controller Unused (set to null). - * @param req DeleteColumnRequest that contains:
- * - tableName: table to alter
- * - columnName: column family to remove - * @throws ServiceException - */ - @Override - public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req) - throws ServiceException; - - /** - * Modifies an existing column on the specified table - * @param controller Unused (set to null). - * @param req ModifyColumnRequest that contains:
- * - tableName: table name
- * - descriptor: new column descriptor - * @throws ServiceException e - */ - @Override - public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req) - throws ServiceException; - - /* Region-level */ - - /** - * Move a region to a specified destination server. - * @param controller Unused (set to null). - * @param req The request that contains:
- * - region: The encoded region name; i.e. the hash that makes - * up the region name suffix: e.g. if regionname is - * TestTable,0094429456,1289497600452.527db22f95c8a9e0116f0cc13c680396., - * then the encoded region name is: 527db22f95c8a9e0116f0cc13c680396.
- * - destServerName: The servername of the destination regionserver. If - * passed the empty byte array we'll assign to a random server. A server name - * is made of host, port and startcode. Here is an example: - * host187.example.com,60020,1289493121758. - * @throws ServiceException that wraps a UnknownRegionException if we can't find a - * region named encodedRegionName - */ - @Override - public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req) - throws ServiceException; - - /** - * Assign a region to a server chosen at random. - * @param controller Unused (set to null). - * @param req contains the region to assign. Will use existing RegionPlan if one - * found. - * @throws ServiceException - */ - @Override - public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req) - throws ServiceException; - - /** - * Unassign a region from current hosting regionserver. Region will then be - * assigned to a regionserver chosen at random. Region could be reassigned - * back to the same server. Use {@link #moveRegion} if you want to - * control the region movement. - * @param controller Unused (set to null). - * @param req The request that contains:
- * - region: Region to unassign. Will clear any existing RegionPlan - * if one found.
- * - force: If true, force unassign (Will remove region from - * regions-in-transition too if present as well as from assigned regions -- - * radical!.If results in double assignment use hbck -fix to resolve. - * @throws ServiceException - */ - @Override - public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req) - throws ServiceException; - - /** - * Offline a region from the assignment manager's in-memory state. The - * region should be in a closed state and there will be no attempt to - * automatically reassign the region as in unassign. This is a special - * method, and should only be used by experts or hbck. - * @param controller Unused (set to null). - * @param request OfflineRegionRequest that contains:
- * - region: Region to offline. Will clear any existing RegionPlan - * if one found. - * @throws ServiceException - */ - @Override - public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request) - throws ServiceException; - - /* Table-level */ - - /** - * Creates a new table asynchronously. If splitKeys are specified, then the - * table will be created with an initial set of multiple regions. - * If splitKeys is null, the table will be created with a single region. - * @param controller Unused (set to null). - * @param req CreateTableRequest that contains:
- * - tablesSchema: table descriptor
- * - splitKeys - * @throws ServiceException - */ - @Override - public CreateTableResponse createTable(RpcController controller, CreateTableRequest req) - throws ServiceException; - - /** - * Deletes a table - * @param controller Unused (set to null). - * @param req DeleteTableRequest that contains:
- * - tableName: table to delete - * @throws ServiceException - */ - @Override - public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest req) - throws ServiceException; - - /** - * Puts the table on-line (only needed if table has been previously taken offline) - * @param controller Unused (set to null). - * @param req EnableTableRequest that contains:
- * - tableName: table to enable - * @throws ServiceException - */ - @Override - public EnableTableResponse enableTable(RpcController controller, EnableTableRequest req) - throws ServiceException; - - /** - * Take table offline - * - * @param controller Unused (set to null). - * @param req DisableTableRequest that contains:
- * - tableName: table to take offline - * @throws ServiceException - */ - @Override - public DisableTableResponse disableTable(RpcController controller, DisableTableRequest req) - throws ServiceException; - - /** - * Modify a table's metadata - * - * @param controller Unused (set to null). - * @param req ModifyTableRequest that contains:
- * - tableName: table to modify
- * - tableSchema: new descriptor for table - * @throws ServiceException - */ - @Override - public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req) - throws ServiceException; - - /* Cluster-level */ - - /** - * Shutdown an HBase cluster. - * @param controller Unused (set to null). - * @param request ShutdownRequest - * @return ShutdownResponse - * @throws ServiceException - */ - @Override - public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request) - throws ServiceException; - - /** - * Stop HBase Master only. - * Does not shutdown the cluster. - * @param controller Unused (set to null). - * @param request StopMasterRequest - * @return StopMasterResponse - * @throws ServiceException - */ - @Override - public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request) - throws ServiceException; - - /** - * Run the balancer. Will run the balancer and if regions to move, it will - * go ahead and do the reassignments. Can NOT run for various reasons. Check - * logs. - * @param c Unused (set to null). - * @param request BalanceRequest - * @return BalanceResponse that contains:
- * - balancerRan: True if balancer ran and was able to tell the region servers to - * unassign all the regions to balance (the re-assignment itself is async), - * false otherwise. - */ - @Override - public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException; - - /** - * Turn the load balancer on or off. - * @param controller Unused (set to null). - * @param req SetBalancerRunningRequest that contains:
- * - on: If true, enable balancer. If false, disable balancer.
- * - synchronous: if true, wait until current balance() call, if outstanding, to return. - * @return SetBalancerRunningResponse that contains:
- * - prevBalanceValue: Previous balancer value - * @throws ServiceException - */ - @Override - public SetBalancerRunningResponse setBalancerRunning( - RpcController controller, SetBalancerRunningRequest req) throws ServiceException; - - /** - * @param c Unused (set to null). - * @param req IsMasterRunningRequest - * @return IsMasterRunningRequest that contains:
- * isMasterRunning: true if master is available - * @throws ServiceException - */ - @Override - public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req) - throws ServiceException; - - /** - * Run a scan of the catalog table - * @param c Unused (set to null). - * @param req CatalogScanRequest - * @return CatalogScanResponse that contains the int return code corresponding - * to the number of entries cleaned - * @throws ServiceException - */ - @Override - public CatalogScanResponse runCatalogScan(RpcController c, - CatalogScanRequest req) throws ServiceException; - - /** - * Enable/Disable the catalog janitor - * @param c Unused (set to null). - * @param req EnableCatalogJanitorRequest that contains:
- * - enable: If true, enable catalog janitor. If false, disable janitor.
- * @return EnableCatalogJanitorResponse that contains:
- * - prevValue: true, if it was enabled previously; false, otherwise - * @throws ServiceException - */ - @Override - public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c, - EnableCatalogJanitorRequest req) throws ServiceException; - - /** - * Query whether the catalog janitor is enabled - * @param c Unused (set to null). - * @param req IsCatalogJanitorEnabledRequest - * @return IsCatalogCatalogJanitorEnabledResponse that contains:
- * - value: true, if it is enabled; false, otherwise - * @throws ServiceException - */ - @Override - public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c, - IsCatalogJanitorEnabledRequest req) throws ServiceException; -} +public interface MasterAdminProtocol +extends MasterAdminService.BlockingInterface, MasterProtocol {} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java index d8cff7d0ff5..b8c3dff95a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterMonitorProtocol.java @@ -21,19 +21,8 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService; -import org.apache.hadoop.hbase.security.TokenInfo; import org.apache.hadoop.hbase.security.KerberosInfo; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; - -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.security.TokenInfo; /** * Protocol that a client uses to communicate with the Master (for monitoring purposes). @@ -43,57 +32,5 @@ import com.google.protobuf.ServiceException; @TokenInfo("HBASE_AUTH_TOKEN") @InterfaceAudience.Public @InterfaceStability.Evolving -public interface MasterMonitorProtocol extends - MasterMonitorService.BlockingInterface, MasterProtocol { - public static final long VERSION = 1L; - - /** - * Used by the client to get the number of regions that have received the - * updated schema - * - * @param controller Unused (set to null). - * @param req GetSchemaAlterStatusRequest that contains:
- * - tableName - * @return GetSchemaAlterStatusResponse indicating the number of regions updated. - * yetToUpdateRegions is the regions that are yet to be updated totalRegions - * is the total number of regions of the table - * @throws ServiceException - */ - @Override - public GetSchemaAlterStatusResponse getSchemaAlterStatus( - RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException; - - /** - * Get list of TableDescriptors for requested tables. - * @param controller Unused (set to null). - * @param req GetTableDescriptorsRequest that contains:
- * - tableNames: requested tables, or if empty, all are requested - * @return GetTableDescriptorsResponse - * @throws ServiceException - */ - @Override - public GetTableDescriptorsResponse getTableDescriptors( - RpcController controller, GetTableDescriptorsRequest req) throws ServiceException; - - /** - * Return cluster status. - * @param controller Unused (set to null). - * @param req GetClusterStatusRequest - * @return status object - * @throws ServiceException - */ - @Override - public GetClusterStatusResponse getClusterStatus(RpcController controller, GetClusterStatusRequest req) - throws ServiceException; - - /** - * @param c Unused (set to null). - * @param req IsMasterRunningRequest - * @return IsMasterRunningRequest that contains:
- * isMasterRunning: true if master is available - * @throws ServiceException - */ - @Override - public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req) - throws ServiceException; -} +public interface MasterMonitorProtocol +extends MasterMonitorService.BlockingInterface, MasterProtocol {} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java index 62b3b84f726..17632bd21d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MasterProtocol.java @@ -16,29 +16,14 @@ * limitations under the License. */ -// Functions implemented by all the master protocols (e.g. MasterAdminProtocol, -// MasterMonitorProtocol). Currently, this is only isMasterRunning, which is used, -// on proxy creation, to check if the master has been stopped. If it has, -// a MasterNotRunningException is thrown back to the client, and the client retries. - package org.apache.hadoop.hbase; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -public interface MasterProtocol extends VersionedProtocol, MasterService.BlockingInterface { - - /** - * @param c Unused (set to null). - * @param req IsMasterRunningRequest - * @return IsMasterRunningRequest that contains:
- * isMasterRunning: true if master is available - * @throws ServiceException - */ - public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req) - throws ServiceException; -} +/** + * Functions implemented by all the master protocols: e.g. {@link MasterAdminProtocol} + * and {@link MasterMonitorProtocol}. Currently, the only shared method + * {@link #isMasterRunning(com.google.protobuf.RpcController, org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest)} + * which is used on connection setup to check if the master has been stopped. + */ +public interface MasterProtocol extends IpcProtocol, MasterService.BlockingInterface {} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionServerStatusProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionServerStatusProtocol.java index 069cea3892d..0ec10034a15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionServerStatusProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionServerStatusProtocol.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.hbase.security.TokenInfo; import org.apache.hadoop.hbase.security.KerberosInfo; @@ -33,7 +32,5 @@ import org.apache.hadoop.hbase.security.KerberosInfo; @TokenInfo("HBASE_AUTH_TOKEN") @InterfaceAudience.Private @InterfaceStability.Evolving -public interface RegionServerStatusProtocol extends - RegionServerStatusService.BlockingInterface, VersionedProtocol { - public static final long VERSION = 1L; -} +public interface RegionServerStatusProtocol +extends RegionServerStatusService.BlockingInterface, IpcProtocol {} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java index 50f8b2a7e6a..cd7ce3c5665 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AdminProtocol.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.security.TokenInfo; import org.apache.hadoop.hbase.security.KerberosInfo; @@ -31,7 +31,5 @@ import org.apache.hadoop.hbase.security.KerberosInfo; serverPrincipal = "hbase.regionserver.kerberos.principal") @TokenInfo("HBASE_AUTH_TOKEN") @InterfaceAudience.Private -public interface AdminProtocol extends - AdminService.BlockingInterface, VersionedProtocol { - public static final long VERSION = 1L; -} +public interface AdminProtocol +extends AdminService.BlockingInterface, IpcProtocol {} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java index 57d84e5a4d8..53244416b73 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientProtocol.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.security.TokenInfo; import org.apache.hadoop.hbase.security.KerberosInfo; @@ -33,7 +33,5 @@ import org.apache.hadoop.hbase.security.KerberosInfo; @TokenInfo("HBASE_AUTH_TOKEN") @InterfaceAudience.Public @InterfaceStability.Evolving -public interface ClientProtocol extends - ClientService.BlockingInterface, VersionedProtocol { - public static final long VERSION = 1L; -} +public interface ClientProtocol +extends ClientService.BlockingInterface, IpcProtocol {} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 7d6f307b807..6edaa5bac3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.MasterAdminProtocol; import org.apache.hadoop.hbase.MasterMonitorProtocol; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterProtocol; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.RegionMovedException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; @@ -74,7 +75,6 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.HBaseClientRPC; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema; @@ -550,8 +550,8 @@ public class HConnectionManager { private final Configuration conf; // Known region ServerName.toString() -> RegionClient/Admin - private final ConcurrentHashMap> servers = - new ConcurrentHashMap>(); + private final ConcurrentHashMap> servers = + new ConcurrentHashMap>(); private final ConcurrentHashMap connectionLock = new ConcurrentHashMap(); @@ -681,12 +681,10 @@ public class HConnectionManager { public int userCount; public long keepAliveUntil = Long.MAX_VALUE; public final Class protocolClass; - public long version; public MasterProtocolState ( - final Class protocolClass, long version) { + final Class protocolClass) { this.protocolClass = protocolClass; - this.version = version; } } @@ -718,9 +716,8 @@ public class HConnectionManager { InetSocketAddress isa = new InetSocketAddress(sn.getHostname(), sn.getPort()); - MasterProtocol tryMaster = (MasterProtocol) HBaseClientRPC.getProxy( + MasterProtocol tryMaster = (MasterProtocol)HBaseClientRPC.getProxy( masterProtocolState.protocolClass, - masterProtocolState.version, isa, this.conf, this.rpcTimeout); if (tryMaster.isMasterRunning( @@ -1349,17 +1346,16 @@ public class HConnectionManager { } @Override - public ClientProtocol getClient( - final String hostname, final int port) throws IOException { - return (ClientProtocol)getProtocol(hostname, port, - clientClass, ClientProtocol.VERSION); + public ClientProtocol getClient(final String hostname, final int port) + throws IOException { + return (ClientProtocol)getProtocol(hostname, port, clientClass); } @Override - public AdminProtocol getAdmin(final String hostname, - final int port, final boolean master) throws IOException { - return (AdminProtocol)getProtocol(hostname, port, - adminClass, AdminProtocol.VERSION); + public AdminProtocol getAdmin(final String hostname, final int port, + final boolean master) + throws IOException { + return (AdminProtocol)getProtocol(hostname, port, adminClass); } /** @@ -1372,22 +1368,22 @@ public class HConnectionManager { * @return Proxy. * @throws IOException */ - VersionedProtocol getProtocol(final String hostname, - final int port, final Class protocolClass, - final long version) throws IOException { + IpcProtocol getProtocol(final String hostname, + final int port, final Class protocolClass) + throws IOException { String rsName = Addressing.createHostAndPortStr(hostname, port); // See if we already have a connection (common case) - Map protocols = this.servers.get(rsName); + Map protocols = this.servers.get(rsName); if (protocols == null) { - protocols = new HashMap(); - Map existingProtocols = + protocols = new HashMap(); + Map existingProtocols = this.servers.putIfAbsent(rsName, protocols); if (existingProtocols != null) { protocols = existingProtocols; } } String protocol = protocolClass.getName(); - VersionedProtocol server = protocols.get(protocol); + IpcProtocol server = protocols.get(protocol); if (server == null) { // create a unique lock for this RS + protocol (if necessary) String lockKey = protocol + "@" + rsName; @@ -1402,7 +1398,7 @@ public class HConnectionManager { InetSocketAddress address = new InetSocketAddress(hostname, port); // definitely a cache miss. establish an RPC for this RS server = HBaseClientRPC.waitForProxy( - protocolClass, version, address, this.conf, + protocolClass, address, this.conf, this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); protocols.put(protocol, server); } catch (RemoteException e) { @@ -1599,9 +1595,9 @@ public class HConnectionManager { } MasterProtocolState masterAdminProtocol = - new MasterProtocolState(MasterAdminProtocol.class, MasterAdminProtocol.VERSION); + new MasterProtocolState(MasterAdminProtocol.class); MasterProtocolState masterMonitorProtocol = - new MasterProtocolState(MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION); + new MasterProtocolState(MasterMonitorProtocol.class); /** * This function allows HBaseAdmin and potentially others @@ -2273,8 +2269,8 @@ public class HConnectionManager { delayedClosing.stop("Closing connection"); if (stopProxy) { closeMaster(); - for (Map i : servers.values()) { - for (VersionedProtocol server: i.values()) { + for (Map i : servers.values()) { + for (IpcProtocol server: i.values()) { HBaseClientRPC.stopProxy(server); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java index bbb91d3ef72..c19a62dcc94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java @@ -55,6 +55,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; @@ -105,8 +106,8 @@ import com.google.protobuf.Message.Builder; @InterfaceAudience.Private public class HBaseClient { - public static final Log LOG = LogFactory - .getLog("org.apache.hadoop.ipc.HBaseClient"); + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient"); protected final PoolMap connections; private static final Map methodInstances = new ConcurrentHashMap(); @@ -190,12 +191,13 @@ public class HBaseClient { } public static class FailedServerException extends IOException { + private static final long serialVersionUID = -4744376109431464127L; + public FailedServerException(String s) { super(s); } } - /** * set the ping interval value in configuration * @@ -260,9 +262,9 @@ public class HBaseClient { /** A call waiting for a value. */ protected class Call { - final int id; // call id - final RpcRequestBody param; // rpc request object - Message value; // value, null if error + final int id; // call id + final RpcRequestBody param; // rpc request object + Message value; // value, null if error IOException error; // exception, null if value boolean done; // true when call is done long startTime; @@ -306,6 +308,7 @@ public class HBaseClient { return this.startTime; } } + protected static Map> tokenHandlers = new HashMap>(); static { @@ -339,9 +342,12 @@ public class HBaseClient { private int reloginMaxBackoff; // max pause before relogin on sasl failure // currently active calls - protected final ConcurrentSkipListMap calls = new ConcurrentSkipListMap(); - protected final AtomicLong lastActivity = new AtomicLong();// last I/O activity time - protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed + protected final ConcurrentSkipListMap calls = + new ConcurrentSkipListMap(); + protected final AtomicLong lastActivity = + new AtomicLong(); // last I/O activity time + protected final AtomicBoolean shouldCloseConnection = + new AtomicBoolean(); // indicate if the connection is closed protected IOException closeException; // close reason Connection(ConnectionId remoteId) throws IOException { @@ -968,8 +974,8 @@ public class HBaseClient { } - private Method getMethod(Class protocol, - String methodName) { + private Method getMethod(Class protocol, + String methodName) { Method method = methodInstances.get(methodName); if (method != null) { return method; @@ -1296,7 +1302,7 @@ public class HBaseClient { * Throws exceptions if there are network problems or if the remote code * threw an exception. */ public Message call(RpcRequestBody param, InetSocketAddress addr, - Class protocol, + Class protocol, User ticket, int rpcTimeout) throws InterruptedException, IOException { Call call = new Call(param); @@ -1362,28 +1368,12 @@ public class HBaseClient { } } - /** Makes a set of calls in parallel. Each parameter is sent to the - * corresponding address. When all values are available, or have timed out - * or errored, the collected results are returned in an array. The array - * contains nulls for calls that timed out or errored. - * @param params RpcRequestBody parameters - * @param addresses socket addresses - * @return RpcResponseBody[] - * @throws IOException e - * @deprecated Use {@code #call(RpcRequestBody[], InetSocketAddress[], Class, User)} instead - */ - @Deprecated - public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses) - throws IOException, InterruptedException { - return call(params, addresses, null, null); - } - /** Makes a set of calls in parallel. Each parameter is sent to the * corresponding address. When all values are available, or have timed out * or errored, the collected results are returned in an array. The array * contains nulls for calls that timed out or errored. */ public Message[] call(RpcRequestBody[] params, InetSocketAddress[] addresses, - Class protocol, + Class protocol, User ticket) throws IOException, InterruptedException { if (addresses.length == 0) return new RpcResponseBody[0]; @@ -1418,7 +1408,7 @@ public class HBaseClient { /* Get a connection from the pool, or create a new one and add it to the * pool. Connections to a given host/port are reused. */ protected Connection getConnection(InetSocketAddress addr, - Class protocol, + Class protocol, User ticket, int rpcTimeout, Call call) @@ -1461,11 +1451,10 @@ public class HBaseClient { final InetSocketAddress address; final User ticket; final int rpcTimeout; - Class protocol; + Class protocol; private static final int PRIME = 16777619; - ConnectionId(InetSocketAddress address, - Class protocol, + ConnectionId(InetSocketAddress address, Class protocol, User ticket, int rpcTimeout) { this.protocol = protocol; @@ -1478,7 +1467,7 @@ public class HBaseClient { return address; } - Class getProtocol() { + Class getProtocol() { return protocol; } @@ -1504,4 +1493,4 @@ public class HBaseClient { (ticket == null ? 0 : ticket.hashCode()) )) ^ rpcTimeout; } } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java index 1b4f20b192e..8c84558f89b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClientRPC.java @@ -19,19 +19,7 @@ package org.apache.hadoop.hbase.ipc; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.RetriesExhaustedException; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.ReflectionUtils; - -import javax.net.SocketFactory; import java.io.IOException; -import java.lang.reflect.Field; import java.lang.reflect.Proxy; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -39,27 +27,43 @@ import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.IpcProtocol; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.ReflectionUtils; + /** - * An RPC implementation. This class provides the client side implementation. + * An RPC implementation. This class provides the client side. */ @InterfaceAudience.Private public class HBaseClientRPC { - protected static final Log LOG = - LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC"); + LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC"); - // cache of RpcEngines by protocol - private static final Map PROTOCOL_ENGINES - = new HashMap(); /** - * Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine} implementation - * to load to handle connection protocols. Handlers for individual protocols can be - * configured using {@code "hbase.rpc.client.engine." + protocol.class.name}. + * Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine} + * implementation to load to handle connection protocols. Handlers for individual + * protocols can be configured using {@code "hbase.rpc.client.engine." + + * protocol.class.name}. */ public static final String RPC_ENGINE_PROP = "hbase.rpc.client.engine"; - // track what RpcEngine is used by a proxy class, for stopProxy() - private static final Map PROXY_ENGINES - = new HashMap(); + + // cache of RpcEngines by protocol + private static final Map, RpcClientEngine> PROTOCOL_ENGINES = + new HashMap, RpcClientEngine>(); + + // Track what RpcEngine is used by a proxy class, for stopProxy() + private static final Map, RpcClientEngine> PROXY_ENGINES = + new HashMap, RpcClientEngine>(); + // thread-specific RPC timeout, which may override that of RpcEngine private static ThreadLocal rpcTimeout = new ThreadLocal() { @Override @@ -68,37 +72,31 @@ public class HBaseClientRPC { } }; - static long getProtocolVersion(Class protocol) - throws NoSuchFieldException, IllegalAccessException { - Field versionField = protocol.getField("VERSION"); - versionField.setAccessible(true); - return versionField.getLong(protocol); - } - // set a protocol to use a non-default RpcEngine static void setProtocolEngine(Configuration conf, - Class protocol, Class engine) { - conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine, RpcClientEngine.class); + Class protocol, Class engine) { + conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine, + RpcClientEngine.class); } // return the RpcEngine configured to handle a protocol - static synchronized RpcClientEngine getProtocolEngine(Class protocol, - Configuration conf) { + static synchronized RpcClientEngine getProtocolEngine( + Class protocol, Configuration conf) { RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { // check for a configured default engine Class defaultEngine = - conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class); + conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class); // check for a per interface override Class impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(), - defaultEngine); + defaultEngine); LOG.debug("Using " + impl.getName() + " for " + protocol.getName()); engine = (RpcClientEngine) ReflectionUtils.newInstance(impl, conf); - if (protocol.isInterface()) - PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), - protocol), - engine); + if (protocol.isInterface()) { + PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), protocol), + engine); + } PROTOCOL_ENGINES.put(protocol, engine); } return engine; @@ -111,7 +109,6 @@ public class HBaseClientRPC { /** * @param protocol protocol interface - * @param clientVersion which client version we expect * @param addr address of remote service * @param conf configuration * @param maxAttempts max attempts @@ -120,23 +117,21 @@ public class HBaseClientRPC { * @return proxy * @throws java.io.IOException e */ - @SuppressWarnings("unchecked") - public static VersionedProtocol waitForProxy(Class protocol, - long clientVersion, + public static IpcProtocol waitForProxy(Class protocol, InetSocketAddress addr, Configuration conf, int maxAttempts, int rpcTimeout, - long timeout - ) throws IOException { + long timeout) + throws IOException { // HBase does limited number of reconnects which is different from hadoop. long startTime = System.currentTimeMillis(); IOException ioe; int reconnectAttempts = 0; while (true) { try { - return getProxy(protocol, clientVersion, addr, conf, rpcTimeout); - } catch (SocketTimeoutException te) { // namenode is busy + return getProxy(protocol, addr, conf, rpcTimeout); + } catch (SocketTimeoutException te) { LOG.info("Problem connecting to server: " + addr); ioe = te; } catch (IOException ioex) { @@ -204,7 +199,6 @@ public class HBaseClientRPC { * talking to a server at the named address. * * @param protocol interface - * @param clientVersion version we are expecting * @param addr remote address * @param conf configuration * @param factory socket factory @@ -212,14 +206,12 @@ public class HBaseClientRPC { * @return proxy * @throws java.io.IOException e */ - public static VersionedProtocol getProxy(Class protocol, - long clientVersion, + public static IpcProtocol getProxy(Class protocol, InetSocketAddress addr, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - return getProxy(protocol, clientVersion, addr, - User.getCurrent(), conf, factory, rpcTimeout); + return getProxy(protocol, addr, User.getCurrent(), conf, factory, rpcTimeout); } /** @@ -227,7 +219,6 @@ public class HBaseClientRPC { * talking to a server at the named address. * * @param protocol interface - * @param clientVersion version we are expecting * @param addr remote address * @param ticket ticket * @param conf configuration @@ -236,15 +227,13 @@ public class HBaseClientRPC { * @return proxy * @throws java.io.IOException e */ - public static VersionedProtocol getProxy( - Class protocol, - long clientVersion, InetSocketAddress addr, User ticket, + public static IpcProtocol getProxy(Class protocol, + InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { RpcClientEngine engine = getProtocolEngine(protocol, conf); - VersionedProtocol proxy = engine - .getProxy(protocol, clientVersion, addr, ticket, conf, factory, - Math.min(rpcTimeout, getRpcTimeout())); + IpcProtocol proxy = engine.getProxy(protocol, addr, ticket, conf, factory, + Math.min(rpcTimeout, getRpcTimeout())); return proxy; } @@ -252,21 +241,17 @@ public class HBaseClientRPC { * Construct a client-side proxy object with the default SocketFactory * * @param protocol interface - * @param clientVersion version we are expecting * @param addr remote address * @param conf configuration * @param rpcTimeout timeout for each RPC * @return a proxy instance * @throws java.io.IOException e */ - public static VersionedProtocol getProxy( - Class protocol, - long clientVersion, InetSocketAddress addr, Configuration conf, - int rpcTimeout) - throws IOException { - - return getProxy(protocol, clientVersion, addr, conf, NetUtils - .getDefaultSocketFactory(conf), rpcTimeout); + public static IpcProtocol getProxy(Class protocol, + InetSocketAddress addr, Configuration conf, int rpcTimeout) + throws IOException { + return getProxy(protocol, addr, conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout); } /** @@ -274,7 +259,7 @@ public class HBaseClientRPC { * * @param proxy the proxy to be stopped */ - public static void stopProxy(VersionedProtocol proxy) { + public static void stopProxy(IpcProtocol proxy) { if (proxy != null) { getProxyEngine(proxy).stopProxy(proxy); } @@ -291,4 +276,4 @@ public class HBaseClientRPC { public static void resetRpcTimeout() { rpcTimeout.remove(); } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java index 92f9c455ac1..8b075fafd82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java @@ -67,6 +67,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader; @@ -168,18 +169,18 @@ public abstract class HBaseServer implements RpcServer { new ThreadLocal(); private volatile boolean started = false; - private static final Map> - PROTOCOL_CACHE = - new ConcurrentHashMap>(); + private static final Map> PROTOCOL_CACHE = + new ConcurrentHashMap>(); - static Class getProtocolClass( + @SuppressWarnings("unchecked") + static Class getProtocolClass( String protocolName, Configuration conf) throws ClassNotFoundException { - Class protocol = + Class protocol = PROTOCOL_CACHE.get(protocolName); if (protocol == null) { - protocol = (Class) + protocol = (Class) conf.getClassByName(protocolName); PROTOCOL_CACHE.put(protocolName, protocol); } @@ -271,7 +272,7 @@ public abstract class HBaseServer implements RpcServer { protected BlockingQueue replicationQueue; private int numOfReplicationHandlers = 0; private Handler[] replicationHandlers = null; - + protected HBaseRPCErrorHandler errorHandler = null; /** @@ -351,7 +352,7 @@ public abstract class HBaseServer implements RpcServer { if (errorClass != null) { this.isError = true; } - + ByteBufferOutputStream buf = null; if (value != null) { buf = new ByteBufferOutputStream(((Message)value).getSerializedSize()); @@ -453,7 +454,7 @@ public abstract class HBaseServer implements RpcServer { public synchronized boolean isReturnValueDelayed() { return this.delayReturnValue; } - + @Override public void throwExceptionIfCallerDisconnected() throws CallerDisconnectedException { if (!connection.channel.isOpen()) { @@ -1111,7 +1112,7 @@ public abstract class HBaseServer implements RpcServer { protected String hostAddress; protected int remotePort; ConnectionHeader header; - Class protocol; + Class protocol; protected UserGroupInformation user = null; private AuthMethod authMethod; private boolean saslContextEstablished; @@ -1315,7 +1316,7 @@ public abstract class HBaseServer implements RpcServer { LOG.debug("SASL server context established. Authenticated client: " + user + ". Negotiated QoP is " + saslServer.getNegotiatedProperty(Sasl.QOP)); - } + } metrics.authenticationSuccess(); AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user); saslContextEstablished = true; @@ -1428,7 +1429,7 @@ public abstract class HBaseServer implements RpcServer { } } if (dataLength < 0) { - throw new IllegalArgumentException("Unexpected data length " + throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } data = ByteBuffer.allocate(dataLength); @@ -1749,7 +1750,7 @@ public abstract class HBaseServer implements RpcServer { status.pause("Waiting for a call"); Call call = myCallQueue.take(); // pop the queue; maybe blocked here status.setStatus("Setting up call"); - status.setConnection(call.connection.getHostAddress(), + status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); if (LOG.isDebugEnabled()) @@ -2010,11 +2011,12 @@ public abstract class HBaseServer implements RpcServer { } return handlers; } - + public SecretManager getSecretManager() { return this.secretManager; } + @SuppressWarnings("unchecked") public void setSecretManager(SecretManager secretManager) { this.secretManager = (SecretManager) secretManager; } @@ -2042,7 +2044,7 @@ public abstract class HBaseServer implements RpcServer { } } } - + /** Wait for the server to be stopped. * Does not wait for all subthreads to finish. * See {@link #stop()}. @@ -2101,7 +2103,7 @@ public abstract class HBaseServer implements RpcServer { connection.getProtocol()); } authManager.authorize(user != null ? user : null, - protocol, getConf(), addr); + protocol, getConf(), addr); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java index b5ee23dabde..d7d4566c983 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServerRPC.java @@ -19,17 +19,17 @@ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.util.ReflectionUtils; -import java.io.IOException; -import java.lang.reflect.Proxy; -import java.util.HashMap; -import java.util.Map; - /** * A simple RPC mechanism. * @@ -56,8 +56,8 @@ public class HBaseServerRPC { LogFactory.getLog("org.apache.hadoop.ipc.HBaseServerRPC"); // cache of RpcEngines by protocol - private static final Map PROTOCOL_ENGINES - = new HashMap(); + private static final Map, RpcServerEngine> PROTOCOL_ENGINES = + new HashMap, RpcServerEngine>(); /** * Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcServerEngine} implementation to @@ -65,24 +65,20 @@ public class HBaseServerRPC { * configured using {@code "hbase.rpc.server.engine." + protocol.class.name}. */ public static final String RPC_ENGINE_PROP = "hbase.rpc.server.engine"; - // track what RpcEngine is used by a proxy class, for stopProxy() - private static final Map PROXY_ENGINES - = new HashMap(); private HBaseServerRPC() { super(); } // no public ctor - // set a protocol to use a non-default RpcEngine static void setProtocolEngine(Configuration conf, - Class protocol, Class engine) { + Class protocol, Class engine) { conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine, RpcServerEngine.class); } // return the RpcEngine configured to handle a protocol - static synchronized RpcServerEngine getProtocolEngine(Class protocol, - Configuration conf) { + static synchronized RpcServerEngine getProtocolEngine(Class protocol, + Configuration conf) { RpcServerEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { // check for a configured default engine @@ -94,59 +90,15 @@ public class HBaseServerRPC { defaultEngine); LOG.debug("Using " + impl.getName() + " for " + protocol.getName()); engine = (RpcServerEngine) ReflectionUtils.newInstance(impl, conf); - if (protocol.isInterface()) - PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(), - protocol), - engine); PROTOCOL_ENGINES.put(protocol, engine); } return engine; } - // return the RpcEngine that handles a proxy object - private static synchronized RpcServerEngine getProxyEngine(Object proxy) { - return PROXY_ENGINES.get(proxy.getClass()); - } - - - /** - * Construct a server for a protocol implementation instance listening on a - * port and address. - * - * @param instance instance - * @param bindAddress bind address - * @param port port to bind to - * @param numHandlers number of handlers to start - * @param verbose verbose flag - * @param conf configuration - * @return Server - * @throws IOException e - */ - public static RpcServer getServer(final Object instance, - final Class[] ifaces, - final String bindAddress, final int port, - final int numHandlers, - int metaHandlerCount, - final boolean verbose, - Configuration conf, - int highPriorityLevel) - throws IOException { - return getServer(instance.getClass(), - instance, - ifaces, - bindAddress, - port, - numHandlers, - metaHandlerCount, - verbose, - conf, - highPriorityLevel); - } - /** * Construct a server for a protocol implementation instance. */ - public static RpcServer getServer(Class protocol, + public static RpcServer getServer(Class protocol, final Object instance, final Class[] ifaces, String bindAddress, @@ -157,9 +109,8 @@ public class HBaseServerRPC { Configuration conf, int highPriorityLevel) throws IOException { - return getProtocolEngine(protocol, conf) - .getServer(protocol, - instance, + return getProtocolEngine(protocol, conf). + getServer(instance, ifaces, bindAddress, port, @@ -169,5 +120,4 @@ public class HBaseServerRPC { conf, highPriorityLevel); } - -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java index 067118467bb..a32009593a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcClientEngine.java @@ -19,33 +19,27 @@ package org.apache.hadoop.hbase.ipc; -import com.google.protobuf.Message; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.MasterAdminProtocol; -import org.apache.hadoop.hbase.MasterMonitorProtocol; -import org.apache.hadoop.hbase.RegionServerStatusProtocol; -import org.apache.hadoop.hbase.client.AdminProtocol; -import org.apache.hadoop.hbase.client.ClientProtocol; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.ipc.RemoteException; - -import javax.net.SocketFactory; import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.IpcProtocol; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.ipc.RemoteException; + +import com.google.protobuf.Message; +import com.google.protobuf.ServiceException; + public class ProtobufRpcClientEngine implements RpcClientEngine { private static final Log LOG = @@ -57,18 +51,18 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { protected final static ClientCache CLIENTS = new ClientCache(); @Override - public VersionedProtocol getProxy( - Class protocol, long clientVersion, + public IpcProtocol getProxy( + Class protocol, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout); - return (VersionedProtocol) Proxy.newProxyInstance( + return (IpcProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker); } @Override - public void stopProxy(VersionedProtocol proxy) { + public void stopProxy(IpcProtocol proxy) { if (proxy!=null) { ((Invoker)Proxy.getInvocationHandler(proxy)).close(); } @@ -77,30 +71,14 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { static class Invoker implements InvocationHandler { private static final Map returnTypes = new ConcurrentHashMap(); - private Class protocol; + private Class protocol; private InetSocketAddress address; private User ticket; private HBaseClient client; private boolean isClosed = false; final private int rpcTimeout; - private final long clientProtocolVersion; - // For generated protocol classes which don't have VERSION field, - // such as protobuf interfaces. - static final Map, Long> - PROTOCOL_VERSION = new HashMap, Long>(); - static { - PROTOCOL_VERSION.put(ClientService.BlockingInterface.class, - Long.valueOf(ClientProtocol.VERSION)); - PROTOCOL_VERSION.put(AdminService.BlockingInterface.class, - Long.valueOf(AdminProtocol.VERSION)); - PROTOCOL_VERSION.put(RegionServerStatusService.BlockingInterface.class, - Long.valueOf(RegionServerStatusProtocol.VERSION)); - PROTOCOL_VERSION.put(MasterMonitorProtocol.class,Long.valueOf(MasterMonitorProtocol.VERSION)); - PROTOCOL_VERSION.put(MasterAdminProtocol.class,Long.valueOf(MasterAdminProtocol.VERSION)); - } - - public Invoker(Class protocol, + public Invoker(Class protocol, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { this.protocol = protocol; @@ -108,20 +86,6 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { this.ticket = ticket; this.client = CLIENTS.getClient(conf, factory); this.rpcTimeout = rpcTimeout; - Long version = PROTOCOL_VERSION.get(protocol); - if (version != null) { - this.clientProtocolVersion = version; - } else { - try { - this.clientProtocolVersion = HBaseClientRPC.getProtocolVersion(protocol); - } catch (NoSuchFieldException e) { - throw new RuntimeException("Exception encountered during " + - protocol, e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Exception encountered during " + - protocol, e); - } - } } private RpcRequestBody constructRpcRequest(Method method, @@ -144,7 +108,6 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { } builder.setRequestClassName(param.getClass().getName()); builder.setRequest(param.toByteString()); - builder.setClientProtocolVersion(clientProtocolVersion); rpcRequest = builder.build(); return rpcRequest; } @@ -214,5 +177,4 @@ public class ProtobufRpcClientEngine implements RpcClientEngine { return protoType; } } - -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java index c3882a5e77d..cf684977b15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtobufRpcServerEngine.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -56,10 +57,10 @@ class ProtobufRpcServerEngine implements RpcServerEngine { } @Override - public Server getServer(Class protocol, - Object instance, Class[] ifaces, String bindAddress, int port, - int numHandlers, int metaHandlerCount, boolean verbose, - Configuration conf, int highPriorityLevel) throws IOException { + public Server getServer(Object instance, Class[] ifaces, + String bindAddress, int port, int numHandlers, int metaHandlerCount, + boolean verbose, Configuration conf, int highPriorityLevel) + throws IOException { return new Server(instance, ifaces, conf, bindAddress, port, numHandlers, metaHandlerCount, verbose, highPriorityLevel); } @@ -78,9 +79,6 @@ class ProtobufRpcServerEngine implements RpcServerEngine { private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; - /** Names for suffixed metrics */ - private static final String ABOVE_ONE_SEC_METRIC = ".aboveOneSec."; - private final int warnResponseTime; private final int warnResponseSize; @@ -153,9 +151,9 @@ class ProtobufRpcServerEngine implements RpcServerEngine { * the return response has protobuf response payload. On failure, the * exception name and the stack trace are returned in the protobuf response. */ - public Message call(Class protocol, - RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status) - throws IOException { + public Message call(Class protocol, + RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status) + throws IOException { try { String methodName = rpcRequest.getMethodName(); Method method = getMethod(protocol, methodName); @@ -170,13 +168,6 @@ class ProtobufRpcServerEngine implements RpcServerEngine { * The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. */ - //TODO: use the clientVersion to do protocol compatibility checks, and - //this could be used here to handle complex use cases like deciding - //which implementation of the protocol should be used to service the - //current request, etc. Ideally, we shouldn't land up in a situation - //where we need to support such a use case. - //For now the clientVersion field is simply ignored - long clientVersion = rpcRequest.getClientProtocolVersion(); if (verbose) { LOG.info("Call: protocol name=" + protocol.getName() + @@ -243,7 +234,6 @@ class ProtobufRpcServerEngine implements RpcServerEngine { buffer.append("("); buffer.append(param.getClass().getName()); buffer.append(")"); - buffer.append(", client version="+clientVersion); logResponse(new Object[]{rpcRequest.getRequest()}, methodName, buffer.toString(), (tooLarge ? "TooLarge" : "TooSlow"), status.getClient(), startTime, processingTime, qTime, @@ -271,7 +261,7 @@ class ProtobufRpcServerEngine implements RpcServerEngine { } } - static Method getMethod(Class protocol, + static Method getMethod(Class protocol, String methodName) { Method method = methodInstances.get(methodName); if (method != null) { @@ -377,4 +367,4 @@ class ProtobufRpcServerEngine implements RpcServerEngine { LOG.info(v); } } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtocolSignature.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtocolSignature.java deleted file mode 100644 index b3d1df581cd..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ProtocolSignature.java +++ /dev/null @@ -1,243 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.HashMap; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.io.WritableFactory; - -@InterfaceAudience.Private -public class ProtocolSignature implements Writable { - static { // register a ctor - WritableFactories.setFactory - (ProtocolSignature.class, - new WritableFactory() { - public Writable newInstance() { return new ProtocolSignature(); } - }); - } - - private long version; - private int[] methods = null; // an array of method hash codes - - /** - * default constructor - */ - public ProtocolSignature() { - } - - /** - * Constructor - * - * @param version server version - * @param methodHashcodes hash codes of the methods supported by server - */ - public ProtocolSignature(long version, int[] methodHashcodes) { - this.version = version; - this.methods = methodHashcodes; - } - - public long getVersion() { - return version; - } - - public int[] getMethods() { - return methods; - } - - @Override - public void readFields(DataInput in) throws IOException { - version = in.readLong(); - boolean hasMethods = in.readBoolean(); - if (hasMethods) { - int numMethods = in.readInt(); - methods = new int[numMethods]; - for (int i=0; i type : method.getParameterTypes()) { - hashcode = 31*hashcode ^ type.getName().hashCode(); - } - return hashcode; - } - - /** - * Convert an array of Method into an array of hash codes - * - * @param methods - * @return array of hash codes - */ - private static int[] getFingerprints(Method[] methods) { - if (methods == null) { - return null; - } - int[] hashCodes = new int[methods.length]; - for (int i = 0; i - PROTOCOL_FINGERPRINT_CACHE = - new HashMap(); - - /** - * Return a protocol's signature and finger print from cache - * - * @param protocol a protocol class - * @param serverVersion protocol version - * @return its signature and finger print - */ - private static ProtocolSigFingerprint getSigFingerprint( - Class protocol, long serverVersion) { - String protocolName = protocol.getName(); - synchronized (PROTOCOL_FINGERPRINT_CACHE) { - ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName); - if (sig == null) { - int[] serverMethodHashcodes = getFingerprints(protocol.getMethods()); - sig = new ProtocolSigFingerprint( - new ProtocolSignature(serverVersion, serverMethodHashcodes), - getFingerprint(serverMethodHashcodes)); - PROTOCOL_FINGERPRINT_CACHE.put(protocolName, sig); - } - return sig; - } - } - - /** - * Get a server protocol's signature - * - * @param clientMethodsHashCode client protocol methods hashcode - * @param serverVersion server protocol version - * @param protocol protocol - * @return the server's protocol signature - */ - static ProtocolSignature getProtocolSignature( - int clientMethodsHashCode, - long serverVersion, - Class protocol) { - // try to get the finger print & signature from the cache - ProtocolSigFingerprint sig = getSigFingerprint(protocol, serverVersion); - - // check if the client side protocol matches the one on the server side - if (clientMethodsHashCode == sig.fingerprint) { - return new ProtocolSignature(serverVersion, null); // null indicates a match - } - - return sig.signature; - } - - /** - * Get a server protocol's signature - * - * @param server server implementation - * @param protocol server protocol - * @param clientVersion client's version - * @param clientMethodsHash client's protocol's hash code - * @return the server protocol's signature - * @throws IOException if any error occurs - */ - @SuppressWarnings("unchecked") - public static ProtocolSignature getProtocolSignature(VersionedProtocol server, - String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Class inter; - try { - inter = (Class)Class.forName(protocol); - } catch (Exception e) { - throw new IOException(e); - } - long serverVersion = server.getProtocolVersion(protocol, clientVersion); - return ProtocolSignature.getProtocolSignature( - clientMethodsHash, serverVersion, inter); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java index 4353c5ad927..eefb6b88d64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RequestContext.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.security.User; import java.net.InetAddress; @@ -90,7 +91,7 @@ public class RequestContext { */ public static void set(User user, InetAddress remoteAddress, - Class protocol) { + Class protocol) { RequestContext ctx = instance.get(); ctx.user = user; ctx.remoteAddress = remoteAddress; @@ -111,12 +112,12 @@ public class RequestContext { private User user; private InetAddress remoteAddress; - private Class protocol; + private Class protocol; // indicates we're within a RPC request invocation private boolean inRequest; private RequestContext(User user, InetAddress remoteAddr, - Class protocol) { + Class protocol) { this.user = user; this.remoteAddress = remoteAddr; this.protocol = protocol; @@ -130,11 +131,11 @@ public class RequestContext { return remoteAddress; } - public Class getProtocol() { + public Class getProtocol() { return protocol; } public boolean isInRequest() { return inRequest; } -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java index f6dcbf964fe..2e24e764432 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientEngine.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.ipc; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.security.User; import javax.net.SocketFactory; @@ -31,12 +32,10 @@ import java.net.InetSocketAddress; @InterfaceAudience.Private public interface RpcClientEngine { /** Construct a client-side proxy object. */ - VersionedProtocol getProxy(Class protocol, - long clientVersion, InetSocketAddress addr, - User ticket, Configuration conf, - SocketFactory factory, int rpcTimeout) throws IOException; + IpcProtocol getProxy(Class protocol, + InetSocketAddress addr, User ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException; /** Stop this proxy. */ - void stopProxy(VersionedProtocol proxy); - -} + void stopProxy(IpcProtocol proxy); +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 50883e941f6..6e244bdea07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -23,16 +23,18 @@ import com.google.common.base.Function; import com.google.protobuf.Message; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import java.io.IOException; import java.net.InetSocketAddress; -/** - */ @InterfaceAudience.Private public interface RpcServer { + // TODO: Needs cleanup. Why a 'start', and then a 'startThreads' and an 'openServer'? + // Also, the call takes a RpcRequestBody, an already composed combination of + // rpc Request and metadata. Should disentangle metadata and rpc Request Message. void setSocketSendBufSize(int size); @@ -45,12 +47,12 @@ public interface RpcServer { InetSocketAddress getListenerAddress(); /** Called for each call. - * @param param writable parameter + * @param param parameter * @param receiveTime time - * @return Message + * @return Message Protobuf response Message * @throws java.io.IOException e */ - Message call(Class protocol, + Message call(Class protocol, RpcRequestBody param, long receiveTime, MonitoredRPCHandler status) throws IOException; @@ -62,9 +64,8 @@ public interface RpcServer { void startThreads(); - /** * Returns the metrics instance for reporting RPC call statistics */ MetricsHBaseServer getMetrics(); -} +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java index 466efedd026..e6600538614 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerEngine.java @@ -26,11 +26,9 @@ import java.io.IOException; /** An RPC implementation for the server. */ @InterfaceAudience.Private interface RpcServerEngine { - /** Construct a server for a protocol implementation instance. */ - RpcServer getServer(Class protocol, Object instance, - Class[] ifaces, String bindAddress, - int port, int numHandlers, int metaHandlerCount, - boolean verbose, Configuration conf, int highPriorityLevel) - throws IOException; -} + RpcServer getServer(Object instance, Class[] protocols, + String bindAddress, int port, int numHandlers, int metaHandlerCount, + boolean verbose, Configuration conf, int highPriorityLevel) + throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/VersionedProtocol.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/VersionedProtocol.java deleted file mode 100644 index 3667c6db5cc..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/VersionedProtocol.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.ipc; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Superclass of all protocols that use Hadoop RPC. - * Subclasses of this interface are also supposed to have - * a static final long versionID field. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface VersionedProtocol { - - /** - * Return protocol version corresponding to protocol interface. - * @param protocol The classname of the protocol interface - * @param clientVersion The version of the protocol that the client speaks - * @return the version that the server will speak - * @throws IOException if any IO error occurs - */ - @Deprecated - public long getProtocolVersion(String protocol, - long clientVersion) throws IOException; - - /** - * Return protocol version corresponding to protocol interface. - * @param protocol The classname of the protocol interface - * @param clientVersion The version of the protocol that the client speaks - * @param clientMethodsHash the hashcode of client protocol methods - * @return the server protocol signature containing its version and - * a list of its supported methods - * @see ProtocolSignature#getProtocolSignature(VersionedProtocol, String, - * long, int) for a default implementation - */ - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, - int clientMethodsHash) throws IOException; -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 834541412e5..9437d99a835 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -81,7 +81,6 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HBaseServerRPC; -import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.UnknownProtocolException; @@ -965,33 +964,6 @@ Server { serverManager.expireServer(sn); } - @Override - public ProtocolSignature getProtocolSignature( - String protocol, long version, int clientMethodsHashCode) - throws IOException { - if (MasterMonitorProtocol.class.getName().equals(protocol)) { - return new ProtocolSignature(MasterMonitorProtocol.VERSION, null); - } else if (MasterAdminProtocol.class.getName().equals(protocol)) { - return new ProtocolSignature(MasterAdminProtocol.VERSION, null); - } else if (RegionServerStatusProtocol.class.getName().equals(protocol)) { - return new ProtocolSignature(RegionServerStatusProtocol.VERSION, null); - } - throw new IOException("Unknown protocol: " + protocol); - } - - public long getProtocolVersion(String protocol, long clientVersion) { - if (MasterMonitorProtocol.class.getName().equals(protocol)) { - return MasterMonitorProtocol.VERSION; - } else if (MasterAdminProtocol.class.getName().equals(protocol)) { - return MasterAdminProtocol.VERSION; - } else if (RegionServerStatusProtocol.class.getName().equals(protocol)) { - return RegionServerStatusProtocol.VERSION; - } - // unknown protocol - LOG.warn("Version requested for unimplemented protocol: "+protocol); - return -1; - } - @Override public TableDescriptors getTableDescriptors() { return this.tableDescriptors; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d2cc20fdb35..7af0d52d175 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -106,7 +106,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseServerRPC; -import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -1794,7 +1793,7 @@ public class HRegionServer implements ClientProtocol, // Do initial RPC setup. The final argument indicates that the RPC // should retry indefinitely. master = (RegionServerStatusProtocol) HBaseClientRPC.waitForProxy( - RegionServerStatusProtocol.class, RegionServerStatusProtocol.VERSION, + RegionServerStatusProtocol.class, isa, this.conf, -1, this.rpcTimeout, this.rpcTimeout); LOG.info("Connected to master at " + isa); @@ -2042,31 +2041,6 @@ public class HRegionServer implements ClientProtocol, return regions.toArray(new HRegionInfo[regions.size()]); } - @Override - @QosPriority(priority=HConstants.HIGH_QOS) - public ProtocolSignature getProtocolSignature( - String protocol, long version, int clientMethodsHashCode) - throws IOException { - if (protocol.equals(ClientProtocol.class.getName())) { - return new ProtocolSignature(ClientProtocol.VERSION, null); - } else if (protocol.equals(AdminProtocol.class.getName())) { - return new ProtocolSignature(AdminProtocol.VERSION, null); - } - throw new IOException("Unknown protocol: " + protocol); - } - - @Override - @QosPriority(priority=HConstants.HIGH_QOS) - public long getProtocolVersion(final String protocol, final long clientVersion) - throws IOException { - if (protocol.equals(ClientProtocol.class.getName())) { - return ClientProtocol.VERSION; - } else if (protocol.equals(AdminProtocol.class.getName())) { - return AdminProtocol.VERSION; - } - throw new IOException("Unknown protocol: " + protocol); - } - @Override public Leases getLeases() { return leases; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 023ca3cec93..0285818e1f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -383,7 +383,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { //file system, the tests should use getBaseTestDir, otherwise, we can use //the working directory, and create a unique sub dir there FileSystem fs = getTestFileSystem(); - if (fs.getUri().getScheme().equals(fs.getLocal(conf).getUri().getScheme())) { + if (fs.getUri().getScheme().equals(FileSystem.getLocal(conf).getUri().getScheme())) { File dataTestDir = new File(getDataTestDir().toString()); dataTestDirOnTestFS = new Path(dataTestDir.getAbsolutePath()); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java index ea915c53bdc..899b238057f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/RandomTimeoutRpcEngine.java @@ -29,9 +29,9 @@ import java.net.SocketTimeoutException; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.security.User; import com.google.protobuf.ServiceException; @@ -47,18 +47,18 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine { public static double chanceOfTimeout = 0.3; private static AtomicInteger invokations = new AtomicInteger(); - public VersionedProtocol getProxy( - Class protocol, long clientVersion, + public IpcProtocol getProxy( + Class protocol, InetSocketAddress addr, User ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { // Start up the requested-for proxy so we can pass-through calls to the underlying // RpcEngine. Also instantiate and return our own proxy (RandomTimeoutInvocationHandler) // that will either throw exceptions or pass through to the underlying proxy. - VersionedProtocol actualProxy = super.getProxy(protocol, clientVersion, addr, + IpcProtocol actualProxy = super.getProxy(protocol, addr, ticket, conf, factory, rpcTimeout); RandomTimeoutInvocationHandler invoker = new RandomTimeoutInvocationHandler(actualProxy); - VersionedProtocol object = (VersionedProtocol)Proxy.newProxyInstance( + IpcProtocol object = (IpcProtocol)Proxy.newProxyInstance( protocol.getClassLoader(), new Class[]{protocol}, invoker); return object; } @@ -66,7 +66,8 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine { /** * Call this in order to set this class to run as the RpcEngine for the given protocol */ - public static void setProtocolEngine(Configuration conf, Class protocol) { + public static void setProtocolEngine(Configuration conf, + Class protocol) { HBaseClientRPC.setProtocolEngine(conf, protocol, RandomTimeoutRpcEngine.class); } @@ -78,9 +79,9 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine { } static class RandomTimeoutInvocationHandler implements InvocationHandler { - private VersionedProtocol actual = null; + private IpcProtocol actual = null; - public RandomTimeoutInvocationHandler(VersionedProtocol actual) { + public RandomTimeoutInvocationHandler(IpcProtocol actual) { this.actual = actual; } @@ -96,4 +97,4 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine { return Proxy.getInvocationHandler(actual).invoke(proxy, method, args); } } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java index a4ec0616377..ebfe00fbafd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java @@ -24,22 +24,20 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; -import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IpcProtocol; +import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestArg; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos.TestResponse; -import org.apache.hadoop.hbase.MediumTests; import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; -import org.apache.log4j.Level; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mortbay.log.Log; @@ -69,13 +67,13 @@ public class TestDelayedRpc { private void testDelayedRpc(boolean delayReturnValue) throws Exception { Configuration conf = HBaseConfiguration.create(); InetSocketAddress isa = new InetSocketAddress("localhost", 0); - - rpcServer = HBaseServerRPC.getServer(new TestRpcImpl(delayReturnValue), + TestRpcImpl instance = new TestRpcImpl(delayReturnValue); + rpcServer = HBaseServerRPC.getServer(instance.getClass(), instance, new Class[]{ TestRpcImpl.class }, isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0, + TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, rpcServer.getListenerAddress(), conf, 1000); List results = new ArrayList(); @@ -133,11 +131,12 @@ public class TestDelayedRpc { log.setLevel(Level.WARN); InetSocketAddress isa = new InetSocketAddress("localhost", 0); - rpcServer = HBaseServerRPC.getServer(new TestRpcImpl(true), + TestRpcImpl instance = new TestRpcImpl(true); + rpcServer = HBaseServerRPC.getServer(instance.getClass(), instance, new Class[]{ TestRpcImpl.class }, isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0, + TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, rpcServer.getListenerAddress(), conf, 1000); Thread threads[] = new Thread[MAX_DELAYED_RPC + 1]; @@ -165,8 +164,7 @@ public class TestDelayedRpc { log.removeAppender(listAppender); } - public interface TestRpc extends VersionedProtocol { - public static final long VERSION = 1L; + public interface TestRpc extends IpcProtocol { TestResponse test(TestArg delay); } @@ -213,22 +211,6 @@ public class TestDelayedRpc { responseBuilder.setResponse(0xDEADBEEF); return responseBuilder.build(); } - - @Override - public long getProtocolVersion(String arg0, long arg1) throws IOException { - return 0; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - Method [] methods = this.getClass().getMethods(); - int [] hashes = new int [methods.length]; - for (int i = 0; i < methods.length; i++) { - hashes[i] = methods[i].hashCode(); - } - return new ProtocolSignature(clientVersion, hashes); - } } private static class TestThread extends Thread { @@ -263,13 +245,13 @@ public class TestDelayedRpc { public void testEndDelayThrowing() throws IOException { Configuration conf = HBaseConfiguration.create(); InetSocketAddress isa = new InetSocketAddress("localhost", 0); - - rpcServer = HBaseServerRPC.getServer(new FaultyTestRpc(), + FaultyTestRpc instance = new FaultyTestRpc(); + rpcServer = HBaseServerRPC.getServer(instance.getClass(), instance, new Class[]{ TestRpcImpl.class }, isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0); rpcServer.start(); - TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0, + TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, rpcServer.getListenerAddress(), conf, 1000); int result = 0xDEADBEEF; @@ -312,18 +294,5 @@ public class TestDelayedRpc { // Client will receive the Exception, not this value. return TestResponse.newBuilder().setResponse(DELAYED).build(); } - - @Override - public long getProtocolVersion(String arg0, long arg1) throws IOException { - return 0; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - return new ProtocolSignature(clientVersion, new int [] {}); - } } - -} - +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index a1fa7b6a2e6..f54350d4e01 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -18,41 +18,40 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; -import java.net.Socket; -import java.net.InetSocketAddress; -import java.net.SocketTimeoutException; -import javax.net.SocketFactory; -import java.lang.reflect.Method; -import java.util.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; -import static org.junit.Assert.*; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IpcProtocol; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.StringUtils; import org.junit.Test; import org.junit.experimental.categories.Category; - -import static org.mockito.Mockito.*; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.SmallTests; - -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody; import com.google.protobuf.Message; -import org.apache.hadoop.hbase.security.User; - -import org.apache.commons.logging.*; -import org.apache.log4j.Logger; @Category(SmallTests.class) public class TestIPC { public static final Log LOG = LogFactory.getLog(TestIPC.class); - private static final Random RANDOM = new Random(); private static class TestRpcServer extends HBaseServer { TestRpcServer() throws IOException { @@ -60,11 +59,10 @@ public class TestIPC { } @Override - public Message call(Class protocol, - RpcRequestBody rpcRequest, - long receiveTime, - MonitoredRPCHandler status) throws IOException { - return rpcRequest; + public Message call(Class protocol, + RpcRequestBody param, long receiveTime, MonitoredRPCHandler status) + throws IOException { + return param; } } @@ -97,4 +95,4 @@ public class TestIPC { assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); } } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 0d8684e8cb1..77ffcff67ef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -52,7 +53,7 @@ public class TestProtoBufRpc { private static RpcServer server; public interface TestRpcService - extends TestProtobufRpcProto.BlockingInterface, VersionedProtocol { + extends TestProtobufRpcProto.BlockingInterface, IpcProtocol { public long VERSION = 1; } @@ -76,20 +77,6 @@ public class TestProtoBufRpc { EmptyRequestProto request) throws ServiceException { throw new ServiceException("error", new IOException("error")); } - - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - // TODO Auto-generated method stub - return null; - } } @Before @@ -120,7 +107,7 @@ public class TestProtoBufRpc { HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class); HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class); - return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class, 0, + return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class, addr, conf, 10000); } @@ -149,4 +136,4 @@ public class TestProtoBufRpc { } catch (ServiceException e) { } } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index e6ae47ddae0..8889dd1d745 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.ClientProtocol; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest; @@ -186,20 +185,6 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer this.nexts.put(regionName, rs); } - @Override - public long getProtocolVersion(String protocol, long clientVersion) - throws IOException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, - long clientVersion, int clientMethodsHash) throws IOException { - // TODO Auto-generated method stub - return null; - } - @Override public boolean isStopped() { // TODO Auto-generated method stub diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java index 17c486119d1..6d68b0bcbdc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestHMasterRPCException.java @@ -56,7 +56,7 @@ public class TestHMasterRPCException { while (i < 20) { try { MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseClientRPC.getProxy( - MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100 * 10); + MasterMonitorProtocol.class, isa, conf, 100 * 10); inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance()); fail(); } catch (ServiceException ex) { @@ -77,6 +77,4 @@ public class TestHMasterRPCException { } fail(); } - -} - +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java index 317b3468242..76674378d59 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterId; +import org.apache.hadoop.hbase.IpcProtocol; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -45,11 +46,9 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.HBaseClientRPC; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HBaseServerRPC; -import org.apache.hadoop.hbase.ipc.ProtocolSignature; import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.ipc.VersionedProtocol; import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -86,8 +85,7 @@ public class TestTokenAuthentication { serverPrincipal = "hbase.test.kerberos.principal") @TokenInfo("HBASE_AUTH_TOKEN") private static interface BlockingAuthenticationService - extends AuthenticationProtos.AuthenticationService.BlockingInterface, VersionedProtocol { - long VERSION = 1L; + extends AuthenticationProtos.AuthenticationService.BlockingInterface, IpcProtocol { } /** @@ -292,17 +290,6 @@ public class TestTokenAuthentication { throw new ServiceException(ioe); } } - - /* VersionedProtocol implementation */ - @Override - public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - return BlockingAuthenticationService.VERSION; - } - - @Override - public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException { - return new ProtocolSignature(BlockingAuthenticationService.VERSION, null); - } } @@ -365,7 +352,7 @@ public class TestTokenAuthentication { testuser.setAuthenticationMethod( UserGroupInformation.AuthenticationMethod.TOKEN); final Configuration conf = TEST_UTIL.getConfiguration(); - testuser.setConfiguration(conf); + UserGroupInformation.setConfiguration(conf); Token token = secretManager.generateToken("testuser"); LOG.debug("Got token: " + token.toString()); @@ -379,7 +366,6 @@ public class TestTokenAuthentication { AuthenticationProtos.AuthenticationService.BlockingInterface proxy = (AuthenticationProtos.AuthenticationService.BlockingInterface) HBaseClientRPC.waitForProxy(BlockingAuthenticationService.class, - BlockingAuthenticationService.VERSION, server.getAddress(), c, HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS, HConstants.DEFAULT_HBASE_RPC_TIMEOUT, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 25569c5ee23..108e64895bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -170,4 +170,4 @@ public class MockRegionServerServices implements RegionServerServices { public Leases getLeases() { return null; } -} +} \ No newline at end of file