diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 4d0d16b3999..00c4cdd1fd0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -214,7 +214,7 @@ public abstract class AbstractRpcClient implements RpcClient { * new Connection each time. * @return A pair with the Message response and the Cell data (if any). */ - Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, + private Message callBlockingMethod(Descriptors.MethodDescriptor md, PayloadCarryingRpcController pcrc, Message param, Message returnType, final User ticket, final InetSocketAddress isa) throws ServiceException { if (pcrc == null) { diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java index 0a7ac8a042f..65d8d8badda 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java @@ -19,10 +19,17 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.RpcClient.SPECIFIC_WRITE_THREAD; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Lists; +import com.google.protobuf.BlockingService; +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; + import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -42,11 +49,9 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.IntegrationTests; @@ -55,12 +60,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import com.google.protobuf.Descriptors.MethodDescriptor; @Category(IntegrationTests.class) public class IntegrationTestRpcClient { @@ -95,29 +94,6 @@ public class IntegrationTestRpcClient { } } - static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws ServiceException { - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); - protected AbstractRpcClient createRpcClient(Configuration conf, boolean isSyncClient) { return isSyncClient ? new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) : @@ -301,14 +277,11 @@ public class IntegrationTestRpcClient { @Override public void run() { - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - while (running.get()) { boolean isBigPayload = random.nextBoolean(); String message = isBigPayload ? BIG_PAYLOAD : id + numCalls; EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - EchoResponseProto ret = EchoResponseProto.newBuilder().setMessage("foo").build(); - + EchoResponseProto ret; TestRpcServer server = cluster.getRandomServer(); try { User user = User.getCurrent(); @@ -317,8 +290,8 @@ public class IntegrationTestRpcClient { throw new IOException("Listener channel is closed"); } sending.set(true); - ret = (EchoResponseProto) - rpcClient.callBlockingMethod(md, null, param, ret, user, address); + BlockingInterface stub = newBlockingStub(rpcClient, address, user); + ret = stub.echo(null, param); } catch (Exception e) { LOG.warn(e); continue; // expected in case connection is closing or closed diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java index 3da0254b2d8..d28945c4019 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java @@ -1718,6 +1718,965 @@ public final class TestProtos { // @@protoc_insertion_point(class_scope:EchoResponseProto) } + public interface PauseRequestProtoOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required uint32 ms = 1; + /** + * required uint32 ms = 1; + */ + boolean hasMs(); + /** + * required uint32 ms = 1; + */ + int getMs(); + } + /** + * Protobuf type {@code PauseRequestProto} + */ + public static final class PauseRequestProto extends + com.google.protobuf.GeneratedMessage + implements PauseRequestProtoOrBuilder { + // Use PauseRequestProto.newBuilder() to construct. + private PauseRequestProto(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private PauseRequestProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final PauseRequestProto defaultInstance; + public static PauseRequestProto getDefaultInstance() { + return defaultInstance; + } + + public PauseRequestProto getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private PauseRequestProto( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + ms_ = input.readUInt32(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_PauseRequestProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_PauseRequestProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.class, org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public PauseRequestProto parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new PauseRequestProto(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required uint32 ms = 1; + public static final int MS_FIELD_NUMBER = 1; + private int ms_; + /** + * required uint32 ms = 1; + */ + public boolean hasMs() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required uint32 ms = 1; + */ + public int getMs() { + return ms_; + } + + private void initFields() { + ms_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasMs()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeUInt32(1, ms_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(1, ms_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto other = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto) obj; + + boolean result = true; + result = result && (hasMs() == other.hasMs()); + if (hasMs()) { + result = result && (getMs() + == other.getMs()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasMs()) { + hash = (37 * hash) + MS_FIELD_NUMBER; + hash = (53 * hash) + getMs(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code PauseRequestProto} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProtoOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_PauseRequestProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_PauseRequestProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.class, org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + ms_ = 0; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_PauseRequestProto_descriptor; + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto getDefaultInstanceForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto build() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto buildPartial() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto result = new org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.ms_ = ms_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto) { + return mergeFrom((org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto other) { + if (other == org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.getDefaultInstance()) return this; + if (other.hasMs()) { + setMs(other.getMs()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasMs()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required uint32 ms = 1; + private int ms_ ; + /** + * required uint32 ms = 1; + */ + public boolean hasMs() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required uint32 ms = 1; + */ + public int getMs() { + return ms_; + } + /** + * required uint32 ms = 1; + */ + public Builder setMs(int value) { + bitField0_ |= 0x00000001; + ms_ = value; + onChanged(); + return this; + } + /** + * required uint32 ms = 1; + */ + public Builder clearMs() { + bitField0_ = (bitField0_ & ~0x00000001); + ms_ = 0; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:PauseRequestProto) + } + + static { + defaultInstance = new PauseRequestProto(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:PauseRequestProto) + } + + public interface AddrResponseProtoOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string addr = 1; + /** + * required string addr = 1; + */ + boolean hasAddr(); + /** + * required string addr = 1; + */ + java.lang.String getAddr(); + /** + * required string addr = 1; + */ + com.google.protobuf.ByteString + getAddrBytes(); + } + /** + * Protobuf type {@code AddrResponseProto} + */ + public static final class AddrResponseProto extends + com.google.protobuf.GeneratedMessage + implements AddrResponseProtoOrBuilder { + // Use AddrResponseProto.newBuilder() to construct. + private AddrResponseProto(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private AddrResponseProto(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final AddrResponseProto defaultInstance; + public static AddrResponseProto getDefaultInstance() { + return defaultInstance; + } + + public AddrResponseProto getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private AddrResponseProto( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + addr_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_AddrResponseProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_AddrResponseProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.class, org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public AddrResponseProto parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new AddrResponseProto(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string addr = 1; + public static final int ADDR_FIELD_NUMBER = 1; + private java.lang.Object addr_; + /** + * required string addr = 1; + */ + public boolean hasAddr() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string addr = 1; + */ + public java.lang.String getAddr() { + java.lang.Object ref = addr_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + addr_ = s; + } + return s; + } + } + /** + * required string addr = 1; + */ + public com.google.protobuf.ByteString + getAddrBytes() { + java.lang.Object ref = addr_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + addr_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + private void initFields() { + addr_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasAddr()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getAddrBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getAddrBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto)) { + return super.equals(obj); + } + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto other = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto) obj; + + boolean result = true; + result = result && (hasAddr() == other.hasAddr()); + if (hasAddr()) { + result = result && getAddr() + .equals(other.getAddr()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasAddr()) { + hash = (37 * hash) + ADDR_FIELD_NUMBER; + hash = (53 * hash) + getAddr().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code AddrResponseProto} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProtoOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_AddrResponseProto_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_AddrResponseProto_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.class, org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.Builder.class); + } + + // Construct using org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + addr_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.internal_static_AddrResponseProto_descriptor; + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto getDefaultInstanceForType() { + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance(); + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto build() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto buildPartial() { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto result = new org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.addr_ = addr_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto) { + return mergeFrom((org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto other) { + if (other == org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance()) return this; + if (other.hasAddr()) { + bitField0_ |= 0x00000001; + addr_ = other.addr_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasAddr()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string addr = 1; + private java.lang.Object addr_ = ""; + /** + * required string addr = 1; + */ + public boolean hasAddr() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string addr = 1; + */ + public java.lang.String getAddr() { + java.lang.Object ref = addr_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + addr_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string addr = 1; + */ + public com.google.protobuf.ByteString + getAddrBytes() { + java.lang.Object ref = addr_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + addr_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string addr = 1; + */ + public Builder setAddr( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + addr_ = value; + onChanged(); + return this; + } + /** + * required string addr = 1; + */ + public Builder clearAddr() { + bitField0_ = (bitField0_ & ~0x00000001); + addr_ = getDefaultInstance().getAddr(); + onChanged(); + return this; + } + /** + * required string addr = 1; + */ + public Builder setAddrBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + addr_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:AddrResponseProto) + } + + static { + defaultInstance = new AddrResponseProto(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:AddrResponseProto) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_EmptyRequestProto_descriptor; private static @@ -1738,6 +2697,16 @@ public final class TestProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_EchoResponseProto_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_PauseRequestProto_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_PauseRequestProto_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_AddrResponseProto_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_AddrResponseProto_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -1750,8 +2719,10 @@ public final class TestProtos { "\n\ntest.proto\"\023\n\021EmptyRequestProto\"\024\n\022Emp" + "tyResponseProto\"#\n\020EchoRequestProto\022\017\n\007m" + "essage\030\001 \002(\t\"$\n\021EchoResponseProto\022\017\n\007mes" + - "sage\030\001 \002(\tB?\n.org.apache.hadoop.hbase.ip" + - "c.protobuf.generatedB\nTestProtos\240\001\001" + "sage\030\001 \002(\t\"\037\n\021PauseRequestProto\022\n\n\002ms\030\001 " + + "\002(\r\"!\n\021AddrResponseProto\022\014\n\004addr\030\001 \002(\tB?" + + "\n.org.apache.hadoop.hbase.ipc.protobuf.g" + + "eneratedB\nTestProtos\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1782,6 +2753,18 @@ public final class TestProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_EchoResponseProto_descriptor, new java.lang.String[] { "Message", }); + internal_static_PauseRequestProto_descriptor = + getDescriptor().getMessageTypes().get(4); + internal_static_PauseRequestProto_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_PauseRequestProto_descriptor, + new java.lang.String[] { "Ms", }); + internal_static_AddrResponseProto_descriptor = + getDescriptor().getMessageTypes().get(5); + internal_static_AddrResponseProto_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_AddrResponseProto_descriptor, + new java.lang.String[] { "Addr", }); return null; } }; diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestRpcServiceProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestRpcServiceProtos.java index 3fd34e96a6e..a5b1777bc7a 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestRpcServiceProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestRpcServiceProtos.java @@ -45,6 +45,22 @@ public final class TestRpcServiceProtos { org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request, com.google.protobuf.RpcCallback done); + /** + * rpc pause(.PauseRequestProto) returns (.EmptyResponseProto); + */ + public abstract void pause( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request, + com.google.protobuf.RpcCallback done); + + /** + * rpc addr(.EmptyRequestProto) returns (.AddrResponseProto); + */ + public abstract void addr( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request, + com.google.protobuf.RpcCallback done); + } public static com.google.protobuf.Service newReflectiveService( @@ -74,6 +90,22 @@ public final class TestRpcServiceProtos { impl.error(controller, request, done); } + @java.lang.Override + public void pause( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request, + com.google.protobuf.RpcCallback done) { + impl.pause(controller, request, done); + } + + @java.lang.Override + public void addr( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request, + com.google.protobuf.RpcCallback done) { + impl.addr(controller, request, done); + } + }; } @@ -102,6 +134,10 @@ public final class TestRpcServiceProtos { return impl.echo(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto)request); case 2: return impl.error(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request); + case 3: + return impl.pause(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto)request); + case 4: + return impl.addr(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request); default: throw new java.lang.AssertionError("Can't get here."); } @@ -122,6 +158,10 @@ public final class TestRpcServiceProtos { return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto.getDefaultInstance(); case 2: return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance(); + case 3: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.getDefaultInstance(); + case 4: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -142,6 +182,10 @@ public final class TestRpcServiceProtos { return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance(); case 2: return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(); + case 3: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(); + case 4: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -174,6 +218,22 @@ public final class TestRpcServiceProtos { org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request, com.google.protobuf.RpcCallback done); + /** + * rpc pause(.PauseRequestProto) returns (.EmptyResponseProto); + */ + public abstract void pause( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request, + com.google.protobuf.RpcCallback done); + + /** + * rpc addr(.EmptyRequestProto) returns (.AddrResponseProto); + */ + public abstract void addr( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request, + com.google.protobuf.RpcCallback done); + public static final com.google.protobuf.Descriptors.ServiceDescriptor getDescriptor() { @@ -211,6 +271,16 @@ public final class TestRpcServiceProtos { com.google.protobuf.RpcUtil.specializeCallback( done)); return; + case 3: + this.pause(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto)request, + com.google.protobuf.RpcUtil.specializeCallback( + done)); + return; + case 4: + this.addr(controller, (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto)request, + com.google.protobuf.RpcUtil.specializeCallback( + done)); + return; default: throw new java.lang.AssertionError("Can't get here."); } @@ -231,6 +301,10 @@ public final class TestRpcServiceProtos { return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto.getDefaultInstance(); case 2: return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance(); + case 3: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto.getDefaultInstance(); + case 4: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -251,6 +325,10 @@ public final class TestRpcServiceProtos { return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto.getDefaultInstance(); case 2: return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(); + case 3: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(); + case 4: + return org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance(); default: throw new java.lang.AssertionError("Can't get here."); } @@ -316,6 +394,36 @@ public final class TestRpcServiceProtos { org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.class, org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance())); } + + public void pause( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request, + com.google.protobuf.RpcCallback done) { + channel.callMethod( + getDescriptor().getMethods().get(3), + controller, + request, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.class, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance())); + } + + public void addr( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request, + com.google.protobuf.RpcCallback done) { + channel.callMethod( + getDescriptor().getMethods().get(4), + controller, + request, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance(), + com.google.protobuf.RpcUtil.generalizeCallback( + done, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.class, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance())); + } } public static BlockingInterface newBlockingStub( @@ -338,6 +446,16 @@ public final class TestRpcServiceProtos { com.google.protobuf.RpcController controller, org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request) throws com.google.protobuf.ServiceException; + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto pause( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request) + throws com.google.protobuf.ServiceException; + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto addr( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request) + throws com.google.protobuf.ServiceException; } private static final class BlockingStub implements BlockingInterface { @@ -382,6 +500,30 @@ public final class TestRpcServiceProtos { org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance()); } + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto pause( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto request) + throws com.google.protobuf.ServiceException { + return (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto) channel.callBlockingMethod( + getDescriptor().getMethods().get(3), + controller, + request, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto.getDefaultInstance()); + } + + + public org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto addr( + com.google.protobuf.RpcController controller, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto request) + throws com.google.protobuf.ServiceException { + return (org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto) channel.callBlockingMethod( + getDescriptor().getMethods().get(4), + controller, + request, + org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto.getDefaultInstance()); + } + } // @@protoc_insertion_point(class_scope:TestProtobufRpcProto) @@ -396,14 +538,16 @@ public final class TestRpcServiceProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\026test_rpc_service.proto\032\ntest.proto2\250\001\n" + + "\n\026test_rpc_service.proto\032\ntest.proto2\212\002\n" + "\024TestProtobufRpcProto\022/\n\004ping\022\022.EmptyReq" + "uestProto\032\023.EmptyResponseProto\022-\n\004echo\022\021" + ".EchoRequestProto\032\022.EchoResponseProto\0220\n" + "\005error\022\022.EmptyRequestProto\032\023.EmptyRespon" + - "seProtoBL\n.org.apache.hadoop.hbase.ipc.p" + - "rotobuf.generatedB\024TestRpcServiceProtos\210" + - "\001\001\240\001\001" + "seProto\0220\n\005pause\022\022.PauseRequestProto\032\023.E" + + "mptyResponseProto\022.\n\004addr\022\022.EmptyRequest" + + "Proto\032\022.AddrResponseProtoBL\n.org.apache." + + "hadoop.hbase.ipc.protobuf.generatedB\024Tes" + + "tRpcServiceProtos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/hbase-protocol/src/main/protobuf/test.proto b/hbase-protocol/src/main/protobuf/test.proto index 566b04b5a09..72b68e9aae6 100644 --- a/hbase-protocol/src/main/protobuf/test.proto +++ b/hbase-protocol/src/main/protobuf/test.proto @@ -33,3 +33,11 @@ message EchoRequestProto { message EchoResponseProto { required string message = 1; } + +message PauseRequestProto { + required uint32 ms = 1; +} + +message AddrResponseProto { + required string addr = 1; +} diff --git a/hbase-protocol/src/main/protobuf/test_rpc_service.proto b/hbase-protocol/src/main/protobuf/test_rpc_service.proto index 4ed0380224b..5f91dc4df4c 100644 --- a/hbase-protocol/src/main/protobuf/test_rpc_service.proto +++ b/hbase-protocol/src/main/protobuf/test_rpc_service.proto @@ -30,4 +30,6 @@ service TestProtobufRpcProto { rpc ping(EmptyRequestProto) returns (EmptyResponseProto); rpc echo(EchoRequestProto) returns (EchoResponseProto); rpc error(EmptyRequestProto) returns (EmptyResponseProto); + rpc pause(PauseRequestProto) returns (EmptyResponseProto); + rpc addr(EmptyRequestProto) returns (AddrResponseProto); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java index cdda28ab538..9e4e85dc5fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/ProtobufCoprocessorService.java @@ -23,19 +23,24 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.util.Threads; import java.io.IOException; /** * Test implementation of a coprocessor endpoint exposing the - * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by - * unit tests only. + * {@link TestRpcServiceProtos.TestProtobufRpcProto} service methods. For internal use by unit tests + * only. */ -public class ProtobufCoprocessorService - extends TestRpcServiceProtos.TestProtobufRpcProto +public class ProtobufCoprocessorService extends TestRpcServiceProtos.TestProtobufRpcProto implements CoprocessorService, Coprocessor { public ProtobufCoprocessorService() { } @@ -47,31 +52,46 @@ public class ProtobufCoprocessorService @Override public void ping(RpcController controller, TestProtos.EmptyRequestProto request, - RpcCallback done) { + RpcCallback done) { done.run(TestProtos.EmptyResponseProto.getDefaultInstance()); } @Override public void echo(RpcController controller, TestProtos.EchoRequestProto request, - RpcCallback done) { + RpcCallback done) { String message = request.getMessage(); done.run(TestProtos.EchoResponseProto.newBuilder().setMessage(message).build()); } @Override public void error(RpcController controller, TestProtos.EmptyRequestProto request, - RpcCallback done) { + RpcCallback done) { ResponseConverter.setControllerException(controller, new IOException("Test exception")); done.run(null); } + @Override + public void pause(RpcController controller, PauseRequestProto request, + RpcCallback done) { + Threads.sleepWithoutInterrupt(request.getMs()); + done.run(EmptyResponseProto.getDefaultInstance()); + } + + @Override + public void addr(RpcController controller, EmptyRequestProto request, + RpcCallback done) { + done.run(AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress()) + .build()); + } + @Override public void start(CoprocessorEnvironment env) throws IOException { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } @Override public void stop(CoprocessorEnvironment env) throws IOException { - //To change body of implemented methods use File | Settings | File Templates. + // To change body of implemented methods use File | Settings | File Templates. } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index be5ad56e61e..771ef93b7d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.anyObject; @@ -27,15 +31,9 @@ import static org.mockito.internal.verification.VerificationModeFactory.times; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; @@ -46,25 +44,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.StringUtils; -import org.junit.Assert; import org.junit.Test; /** @@ -74,61 +64,11 @@ public abstract class AbstractTestIPC { private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final byte[] CELL_BYTES = Bytes.toBytes("xyz"); private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); static byte[] BIG_CELL_BYTES = new byte[10 * 1024]; static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES); static final Configuration CONF = HBaseConfiguration.create(); - // We are using the test TestRpcServiceProtos generated classes and Service because they are - // available and basic with methods like 'echo', and ping. Below we make a blocking service - // by passing in implementation of blocking interface. We use this service in all tests that - // follow. - static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws ServiceException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List list = null; - if (cellScanner != null) { - list = new ArrayList<>(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - } - } catch (IOException e) { - throw new ServiceException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); /** * Instance of server. We actually don't do anything speical in here so could just use @@ -145,16 +85,9 @@ public abstract class AbstractTestIPC { } TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException { - super(null, "testRpcServer", Lists - .newArrayList(new BlockingServiceAndInterface(SERVICE, null)), new InetSocketAddress( - "localhost", 0), conf, scheduler); - } - - @Override - public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { - return super.call(service, md, param, cellScanner, receiveTime, status); + super(null, "testRpcServer", + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), conf, scheduler); } } @@ -162,28 +95,19 @@ public abstract class AbstractTestIPC { /** * Ensure we do not HAVE TO HAVE a codec. - * @throws InterruptedException - * @throws IOException */ @Test - public void testNoCodec() throws InterruptedException, IOException { + public void testNoCodec() throws IOException, ServiceException { Configuration conf = HBaseConfiguration.create(); TestRpcServer rpcServer = new TestRpcServer(); try (AbstractRpcClient client = createRpcClientNoCodec(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - final String message = "hello"; - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - Pair r = - client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); - assertTrue(r.getSecond() == null); - // Silly assertion that the message is in the returned pb. - assertTrue(r.getFirst().toString().contains(message)); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); + assertNull(pcrc.cellScanner()); } finally { rpcServer.stop(); } @@ -195,14 +119,9 @@ public abstract class AbstractTestIPC { * It is hard to verify the compression is actually happening under the wraps. Hope that if * unsupported, we'll get an exception out of some time (meantime, have to trace it manually to * confirm that compression is happening down in the client and server). - * @throws IOException - * @throws InterruptedException - * @throws SecurityException - * @throws NoSuchMethodException */ @Test - public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException, - NoSuchMethodException, ServiceException { + public void testCompressCellBlock() throws IOException, ServiceException { Configuration conf = new Configuration(HBaseConfiguration.create()); conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); List cells = new ArrayList<>(); @@ -213,20 +132,17 @@ public abstract class AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); try (AbstractRpcClient client = createRpcClient(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - Pair r = - client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController( + CellUtil.createCellScanner(cells)); + String message = "hello"; + assertEquals(message, + stub.echo(pcrc, EchoRequestProto.newBuilder().setMessage(message).build()).getMessage()); int index = 0; - while (r.getSecond().advance()) { - assertTrue(CELL.equals(r.getSecond().current())); + CellScanner cellScanner = pcrc.cellScanner(); + assertNotNull(cellScanner); + while (cellScanner.advance()) { + assertEquals(CELL, cellScanner.current()); index++; } assertEquals(count, index); @@ -244,14 +160,8 @@ public abstract class AbstractTestIPC { TestRpcServer rpcServer = new TestRpcServer(); try (AbstractRpcClient client = createRpcClientRTEDuringConnectionSetup(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - client.call(null, md, param, null, User.getCurrent(), address, - new MetricsConnection.CallStats()); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.ping(null, EmptyRequestProto.getDefaultInstance()); fail("Expected an exception to have been thrown!"); } catch (Exception e) { LOG.info("Caught expected exception: " + e.toString()); @@ -261,27 +171,21 @@ public abstract class AbstractTestIPC { } } - /** Tests that the rpc scheduler is called when requests arrive. */ + /** + * Tests that the rpc scheduler is called when requests arrive. + */ @Test - public void testRpcScheduler() throws IOException, InterruptedException { + public void testRpcScheduler() throws IOException, ServiceException, InterruptedException { RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); RpcServer rpcServer = new TestRpcServer(scheduler, CONF); verify(scheduler).init((RpcScheduler.Context) anyObject()); - AbstractRpcClient client = createRpcClient(CONF); - try { + try (AbstractRpcClient client = createRpcClient(CONF)) { rpcServer.start(); verify(scheduler).start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } for (int i = 0; i < 10; i++) { - client.call(new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); + stub.echo(null, param); } verify(scheduler, times(10)).dispatch((CallRunner) anyObject()); } finally { @@ -292,101 +196,93 @@ public abstract class AbstractTestIPC { /** Tests that the rpc scheduler is called when requests arrive. */ @Test - public void testRpcMaxRequestSize() throws IOException, InterruptedException { + public void testRpcMaxRequestSize() throws IOException, ServiceException { Configuration conf = new Configuration(CONF); conf.setInt(RpcServer.MAX_REQUEST_SIZE, 100); RpcServer rpcServer = new TestRpcServer(conf); - AbstractRpcClient client = createRpcClient(conf); - try { + try (AbstractRpcClient client = createRpcClient(conf)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + StringBuilder message = new StringBuilder(120); + for (int i = 0; i < 20; i++) { + message.append("hello."); + } // set total RPC size bigger than 100 bytes - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello.hello.hello.hello." - + "hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello.hello").build(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - try { - client.call(new PayloadCarryingRpcController( - CellUtil.createCellScanner(ImmutableList. of(CELL))), md, param, - md.getOutputType().toProto(), User.getCurrent(), address, - new MetricsConnection.CallStats()); - fail("RPC should have failed because it exceeds max request size"); - } catch(IOException ex) { - // pass - } + EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message.toString()).build(); + stub.echo( + new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList. of(CELL))), + param); + fail("RPC should have failed because it exceeds max request size"); + } catch (ServiceException e) { + LOG.info("Caught expected exception: " + e.toString()); + // the rpc server just close the connection so we can not get the detail message. } finally { rpcServer.stop(); } } - /** - * Instance of RpcServer that echoes client hostAddress back to client - */ - static class TestRpcServer1 extends RpcServer { - - private static final BlockingInterface SERVICE1 = - new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - @Override - public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) - throws ServiceException { - return EmptyResponseProto.newBuilder().build(); - } - - @Override - public EchoResponseProto echo(RpcController unused, EchoRequestProto request) - throws ServiceException { - final InetAddress remoteAddr = TestRpcServer1.getRemoteAddress(); - final String message = remoteAddr == null ? "NULL" : remoteAddr.getHostAddress(); - return EchoResponseProto.newBuilder().setMessage(message).build(); - } - - @Override - public EmptyResponseProto error(RpcController unused, EmptyRequestProto request) - throws ServiceException { - throw new ServiceException("error", new IOException("error")); - } - }; - - TestRpcServer1() throws IOException { - this(new FifoRpcScheduler(CONF, 1)); - } - - TestRpcServer1(RpcScheduler scheduler) throws IOException { - super(null, "testRemoteAddressInCallObject", Lists - .newArrayList(new BlockingServiceAndInterface(TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(SERVICE1), null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); - } - } - /** * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null * remoteAddress set to its Call Object * @throws ServiceException */ @Test - public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, - ServiceException { - final RpcScheduler scheduler = new FifoRpcScheduler(CONF, 1); - final TestRpcServer1 rpcServer = new TestRpcServer1(scheduler); - final InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); - try (AbstractRpcClient client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT, - localAddr, null)) { + public void testRpcServerForNotNullRemoteAddressInCallObject() + throws IOException, ServiceException { + TestRpcServer rpcServer = new TestRpcServer(); + InetSocketAddress localAddr = new InetSocketAddress("localhost", 0); + try (AbstractRpcClient client = createRpcClient(CONF)) { rpcServer.start(); - final InetSocketAddress isa = rpcServer.getListenerAddress(); - if (isa == null) { - throw new IOException("Listener channel is closed"); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + assertEquals(localAddr.getAddress().getHostAddress(), + stub.addr(null, EmptyRequestProto.getDefaultInstance()).getAddr()); + } finally { + rpcServer.stop(); + } + } + + @Test + public void testRemoteError() throws IOException, ServiceException { + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient client = createRpcClient(CONF)) { + rpcServer.start(); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.error(null, EmptyRequestProto.getDefaultInstance()); + } catch (ServiceException e) { + LOG.info("Caught expected exception: " + e.getMessage()); + IOException ioe = ProtobufUtil.handleRemoteException(e); + assertTrue(ioe instanceof DoNotRetryIOException); + assertTrue(ioe.getMessage().contains("server error!")); + } finally { + rpcServer.stop(); + } + } + + @Test + public void testTimeout() throws IOException { + TestRpcServer rpcServer = new TestRpcServer(); + try (AbstractRpcClient client = createRpcClient(CONF)) { + rpcServer.start(); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(); + int ms = 1000; + int timeout = 100; + for (int i = 0; i < 10; i++) { + pcrc.reset(); + pcrc.setCallTimeout(timeout); + long startTime = System.nanoTime(); + try { + stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(ms).build()); + } catch (ServiceException e) { + long waitTime = (System.nanoTime() - startTime) / 1000000; + // expected + LOG.info("Caught expected exception: " + e.getMessage()); + IOException ioe = ProtobufUtil.handleRemoteException(e); + assertTrue(ioe.getCause() instanceof CallTimeoutException); + // confirm that we got exception before the actual pause. + assertTrue(waitTime < ms); + } } - final BlockingRpcChannel channel = client.createBlockingRpcChannel( - ServerName.valueOf(isa.getHostName(), isa.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - BlockingInterface stub = TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); - final EchoRequestProto echoRequest = - EchoRequestProto.newBuilder().setMessage("GetRemoteAddress").build(); - final EchoResponseProto echoResponse = stub.echo(null, echoRequest); - Assert.assertEquals(localAddr.getAddress().getHostAddress(), echoResponse.getMessage()); } finally { rpcServer.stop(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java index 7efe1985469..565f5bf1e27 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; @@ -27,33 +25,12 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; -import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.experimental.categories.Category; @@ -65,8 +42,6 @@ import org.junit.runners.Parameterized.Parameters; @Category({ RPCTests.class, SmallTests.class }) public class TestAsyncIPC extends AbstractTestIPC { - private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class); - @Parameters public static Collection parameters() { List paramList = new ArrayList<>(); @@ -92,8 +67,8 @@ public class TestAsyncIPC extends AbstractTestIPC { if (useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null) { if (useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup) - || (!useNativeTransport - && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) { + || (!useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP + .getFirst() instanceof NioEventLoopGroup))) { AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully(); AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null; } @@ -123,80 +98,16 @@ public class TestAsyncIPC extends AbstractTestIPC { protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) { setConf(conf); return new AsyncRpcClient(conf, new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() { - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - promise.setFailure(new RuntimeException("Injected fault")); - } - }); + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + promise.setFailure(new RuntimeException("Injected fault")); } }); - } - - public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { - if (args.length != 2) { - System.out.println("Usage: TestAsyncIPC "); - return; - } - // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO); - // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO); - int cycles = Integer.parseInt(args[0]); - int cellcount = Integer.parseInt(args[1]); - Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - KeyValue kv = BIG_CELL; - Put p = new Put(CellUtil.cloneRow(kv)); - for (int i = 0; i < cellcount; i++) { - p.add(kv); - } - RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); - rm.add(p); - try (AsyncRpcClient client = new AsyncRpcClient(conf)) { - rpcServer.start(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); } - long startTime = System.currentTimeMillis(); - User user = User.getCurrent(); - for (int i = 0; i < cycles; i++) { - List cells = new ArrayList<>(); - // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - ClientProtos.RegionAction.Builder builder = - RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); - builder.setRegion(RegionSpecifier - .newBuilder() - .setType(RegionSpecifierType.REGION_NAME) - .setValue( - ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); - if (i % 100000 == 0) { - LOG.info("" + i); - // Uncomment this for a thread dump every so often. - // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - // "Thread dump " + Thread.currentThread().getName()); - } - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - // Pair response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); - /* - * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), - * count); - */ - } - LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " - + (System.currentTimeMillis() - startTime) + "ms"); - } finally { - rpcServer.stop(); - } + }); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java index 56de07d2cd3..b88cb7a2075 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java @@ -22,37 +22,14 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors.MethodDescriptor; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.Socket; -import java.util.ArrayList; -import java.util.List; import javax.net.SocketFactory; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScannable; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.codec.Codec; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.net.NetUtils; @@ -64,8 +41,6 @@ import org.mockito.stubbing.Answer; @Category({ RPCTests.class, SmallTests.class }) public class TestIPC extends AbstractTestIPC { - private static final Log LOG = LogFactory.getLog(TestIPC.class); - @Override protected RpcClientImpl createRpcClientNoCodec(Configuration conf) { return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) { @@ -96,71 +71,4 @@ public class TestIPC extends AbstractTestIPC { return new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); } - - public static void main(String[] args) throws IOException, SecurityException, - NoSuchMethodException, InterruptedException { - if (args.length != 2) { - System.out.println("Usage: TestIPC "); - return; - } - // ((Log4JLogger)HBaseServer.LOG).getLogger().setLevel(Level.INFO); - // ((Log4JLogger)HBaseClient.LOG).getLogger().setLevel(Level.INFO); - int cycles = Integer.parseInt(args[0]); - int cellcount = Integer.parseInt(args[1]); - Configuration conf = HBaseConfiguration.create(); - TestRpcServer rpcServer = new TestRpcServer(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT); - KeyValue kv = BIG_CELL; - Put p = new Put(CellUtil.cloneRow(kv)); - for (int i = 0; i < cellcount; i++) { - p.add(kv); - } - RowMutations rm = new RowMutations(CellUtil.cloneRow(kv)); - rm.add(p); - try { - rpcServer.start(); - long startTime = System.currentTimeMillis(); - User user = User.getCurrent(); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - for (int i = 0; i < cycles; i++) { - List cells = new ArrayList<>(); - // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm); - ClientProtos.RegionAction.Builder builder = - RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, - RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), - MutationProto.newBuilder()); - builder.setRegion(RegionSpecifier - .newBuilder() - .setType(RegionSpecifierType.REGION_NAME) - .setValue( - ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()))); - if (i % 100000 == 0) { - LOG.info("" + i); - // Uncomment this for a thread dump every so often. - // ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - // "Thread dump " + Thread.currentThread().getName()); - } - PayloadCarryingRpcController pcrc = - new PayloadCarryingRpcController(CellUtil.createCellScanner(cells)); - // Pair response = - client.call(pcrc, md, builder.build(), param, user, address, - new MetricsConnection.CallStats()); - /* - * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(), - * count); - */ - } - LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " - + (System.currentTimeMillis() - startTime) + "ms"); - } finally { - client.close(); - rpcServer.stop(); - } - } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java index 81869b4a053..dcde844e7db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java @@ -17,42 +17,39 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.net.InetSocketAddress; -import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.junit.Assert; -import org.junit.Test; -import org.junit.Before; import org.junit.After; +import org.junit.Before; +import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; - /** - * Test for testing protocol buffer based RPC mechanism. - * This test depends on test.proto definition of types in src/test/protobuf/test.proto - * and protobuf service definition from src/test/protobuf/test_rpc_service.proto + * Test for testing protocol buffer based RPC mechanism. This test depends on test.proto definition + * of types in src/test/protobuf/test.proto and protobuf service definition from + * src/test/protobuf/test_rpc_service.proto */ -@Category({RPCTests.class, MediumTests.class}) +@Category({ RPCTests.class, MediumTests.class }) public class TestProtoBufRpc { public final static String ADDRESS = "localhost"; public static int PORT = 0; @@ -60,47 +57,18 @@ public class TestProtoBufRpc { private Configuration conf; private RpcServerInterface server; - /** - * Implementation of the test service defined out in TestRpcServiceProtos - */ - static class PBServerImpl - implements TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface { - @Override - public EmptyResponseProto ping(RpcController unused, - EmptyRequestProto request) throws ServiceException { - return EmptyResponseProto.newBuilder().build(); - } - - @Override - public EchoResponseProto echo(RpcController unused, EchoRequestProto request) - throws ServiceException { - return EchoResponseProto.newBuilder().setMessage(request.getMessage()) - .build(); - } - - @Override - public EmptyResponseProto error(RpcController unused, - EmptyRequestProto request) throws ServiceException { - throw new ServiceException("error", new IOException("error")); - } - } - @Before - public void setUp() throws IOException { // Setup server for both protocols + public void setUp() throws IOException { // Setup server for both protocols this.conf = HBaseConfiguration.create(); Logger log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer"); log.setLevel(Level.DEBUG); log = Logger.getLogger("org.apache.hadoop.ipc.HBaseServer.trace"); log.setLevel(Level.TRACE); // Create server side implementation - PBServerImpl serverImpl = new PBServerImpl(); - BlockingService service = - TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(serverImpl); // Get RPC server for server side implementation this.server = new RpcServer(null, "testrpc", - Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), - new InetSocketAddress(ADDRESS, PORT), conf, - new FifoRpcScheduler(conf, 10)); + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress(ADDRESS, PORT), conf, new FifoRpcScheduler(conf, 10)); InetSocketAddress address = server.getListenerAddress(); if (address == null) { throw new IOException("Listener channel is closed"); @@ -118,25 +86,20 @@ public class TestProtoBufRpc { public void testProtoBufRpc() throws Exception { RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT); try { - BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( - ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()), - User.getCurrent(), 0); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + BlockingInterface stub = newBlockingStub(rpcClient, this.isa); // Test ping method - TestProtos.EmptyRequestProto emptyRequest = - TestProtos.EmptyRequestProto.newBuilder().build(); + TestProtos.EmptyRequestProto emptyRequest = TestProtos.EmptyRequestProto.newBuilder().build(); stub.ping(null, emptyRequest); // Test echo method EchoRequestProto echoRequest = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoResponseProto echoResponse = stub.echo(null, echoRequest); - Assert.assertEquals(echoResponse.getMessage(), "hello"); + assertEquals(echoResponse.getMessage(), "hello"); // Test error method - error should be thrown as RemoteException try { stub.error(null, emptyRequest); - Assert.fail("Expected exception is not thrown"); + fail("Expected exception is not thrown"); } catch (ServiceException e) { } } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java new file mode 100644 index 00000000000..ce7521e9dfb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import com.google.protobuf.BlockingService; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Threads; + +@InterfaceAudience.Private +public class TestProtobufRpcServiceImpl implements BlockingInterface { + + public static final BlockingService SERVICE = TestProtobufRpcProto + .newReflectiveBlockingService(new TestProtobufRpcServiceImpl()); + + public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr) + throws IOException { + return newBlockingStub(client, addr, User.getCurrent()); + } + + public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr, + User user) throws IOException { + return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel( + ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), user, 0)); + } + + public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException { + return TestProtobufRpcProto.newStub(client.createProtobufRpcChannel( + ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), + User.getCurrent(), 0)); + } + + @Override + public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return EmptyResponseProto.getDefaultInstance(); + } + + @Override + public EchoResponseProto echo(RpcController controller, EchoRequestProto request) + throws ServiceException { + if (controller instanceof PayloadCarryingRpcController) { + PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; + // If cells, scan them to check we are able to iterate what we were given and since this is an + // echo, just put them back on the controller creating a new block. Tests our block building. + CellScanner cellScanner = pcrc.cellScanner(); + List list = null; + if (cellScanner != null) { + list = new ArrayList<>(); + try { + while (cellScanner.advance()) { + list.add(cellScanner.current()); + } + } catch (IOException e) { + throw new ServiceException(e); + } + } + cellScanner = CellUtil.createCellScanner(list); + ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); + } + return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); + } + + @Override + public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) + throws ServiceException { + throw new ServiceException(new DoNotRetryIOException("server error!")); + } + + @Override + public EmptyResponseProto pause(RpcController controller, PauseRequestProto request) + throws ServiceException { + Threads.sleepWithoutInterrupt(request.getMs()); + return EmptyResponseProto.getDefaultInstance(); + } + + @Override + public AddrResponseProto addr(RpcController controller, EmptyRequestProto request) + throws ServiceException { + return AddrResponseProto.newBuilder().setAddr(RpcServer.getRemoteAddress().getHostAddress()) + .build(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java index a37ba118882..749009f2d47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java @@ -17,108 +17,31 @@ */ package org.apache.hadoop.hbase.ipc; -import com.google.common.collect.ImmutableList; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; +import static org.mockito.Mockito.mock; + import com.google.common.collect.Lists; -import com.google.protobuf.BlockingService; -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.MetricsConnection; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Pair; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mockito; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import static org.mockito.Mockito.mock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; -@Category({RPCTests.class, SmallTests.class}) +@Category({ RPCTests.class, SmallTests.class }) public class TestRpcHandlerException { - private static final Log LOG = LogFactory.getLog(TestRpcHandlerException.class); - static String example = "xyz"; - static byte[] CELL_BYTES = example.getBytes(); - static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES); private final static Configuration CONF = HBaseConfiguration.create(); - RpcExecutor rpcExecutor = Mockito.mock(RpcExecutor.class); - - // We are using the test TestRpcServiceProtos generated classes and Service because they are - // available and basic with methods like 'echo', and ping. Below we make a blocking service - // by passing in implementation of blocking interface. We use this service in all tests that - // follow. - private static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto - .newReflectiveBlockingService(new TestRpcServiceProtos - .TestProtobufRpcProto.BlockingInterface() { - - @Override - public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public EchoResponseProto echo(RpcController controller, EchoRequestProto request) - throws Error, RuntimeException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List list = null; - if (cellScanner != null) { - list = new ArrayList(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - throw new StackOverflowError(); - } - } catch (StackOverflowError e) { - throw e; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); - } - }); /** * Instance of server. We actually don't do anything speical in here so could just use @@ -126,29 +49,18 @@ public class TestRpcHandlerException { */ private static class TestRpcServer extends RpcServer { - TestRpcServer() throws IOException { - this(new FifoRpcScheduler(CONF, 1)); - } - TestRpcServer(RpcScheduler scheduler) throws IOException { super(null, "testRpcServer", - Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), - new InetSocketAddress("localhost", 0), CONF, scheduler); - } - - @Override - public Pair call(BlockingService service, MethodDescriptor md, - Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) - throws IOException { - return super.call(service, md, param, cellScanner, receiveTime, status); + Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)), + new InetSocketAddress("localhost", 0), CONF, scheduler); } } - /** Tests that the rpc scheduler is called when requests arrive. - * When Rpc handler thread dies, the client will hang and the test will fail. - * The test is meant to be a unit test to test the behavior. - * - * */ + /** + * Tests that the rpc scheduler is called when requests arrive. When Rpc handler thread dies, the + * client will hang and the test will fail. The test is meant to be a unit test to test the + * behavior. + */ private class AbortServer implements Abortable { private boolean aborted = false; @@ -163,7 +75,8 @@ public class TestRpcHandlerException { } } - /* This is a unit test to make sure to abort region server when the number of Rpc handler thread + /* + * This is a unit test to make sure to abort region server when the number of Rpc handler thread * caught errors exceeds the threshold. Client will hang when RS aborts. */ @Ignore @@ -173,21 +86,12 @@ public class TestRpcHandlerException { Abortable abortable = new AbortServer(); RpcScheduler scheduler = new SimpleRpcScheduler(CONF, 2, 0, 0, qosFunction, abortable, 0); RpcServer rpcServer = new TestRpcServer(scheduler); - RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT); - try { + try (RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT)) { rpcServer.start(); - MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); - EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); - PayloadCarryingRpcController controller = - new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL))); - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(), - address, new MetricsConnection.CallStats()); + BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress()); + stub.echo(null, EchoRequestProto.newBuilder().setMessage("hello").build()); } catch (Throwable e) { - assert(abortable.isAborted() == true); + assert (abortable.isAborted() == true); } finally { rpcServer.stop(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java index 385b7b08ae9..c1b8de75ecc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/AbstractTestSecureIPC.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.security; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; +import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getKeytabFileForTesting; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getPrincipalForTesting; import static org.apache.hadoop.hbase.security.HBaseKerberosUtils.getSecuredConfiguration; @@ -25,6 +27,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; +import com.google.common.collect.Lists; +import com.google.protobuf.ServiceException; + import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -32,27 +37,21 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.concurrent.ThreadLocalRandom; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; +import javax.security.sasl.SaslException; + import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos; -import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos; +import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; @@ -64,12 +63,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.mockito.Mockito; -import com.google.common.collect.Lists; -import com.google.protobuf.BlockingRpcChannel; -import com.google.protobuf.BlockingService; - -import javax.security.sasl.SaslException; - public abstract class AbstractTestSecureIPC { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -77,55 +70,6 @@ public abstract class AbstractTestSecureIPC { private static final File KEYTAB_FILE = new File(TEST_UTIL.getDataTestDir("keytab").toUri() .getPath()); - static final BlockingService SERVICE = - TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService( - new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface() { - - @Override - public TestProtos.EmptyResponseProto ping(RpcController controller, - TestProtos.EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public TestProtos.EmptyResponseProto error(RpcController controller, - TestProtos.EmptyRequestProto request) - throws ServiceException { - return null; - } - - @Override - public TestProtos.EchoResponseProto echo(RpcController controller, - TestProtos.EchoRequestProto request) - throws ServiceException { - if (controller instanceof PayloadCarryingRpcController) { - PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController) controller; - // If cells, scan them to check we are able to iterate what we were given and since - // this is - // an echo, just put them back on the controller creating a new block. Tests our - // block - // building. - CellScanner cellScanner = pcrc.cellScanner(); - List list = null; - if (cellScanner != null) { - list = new ArrayList(); - try { - while (cellScanner.advance()) { - list.add(cellScanner.current()); - } - } catch (IOException e) { - throw new ServiceException(e); - } - } - cellScanner = CellUtil.createCellScanner(list); - ((PayloadCarryingRpcController) controller).setCellScanner(cellScanner); - } - return TestProtos.EchoResponseProto.newBuilder() - .setMessage(request.getMessage()).build(); - } - }); - private static MiniKdc KDC; private static String HOST = "localhost"; private static String PRINCIPAL; @@ -262,16 +206,8 @@ public abstract class AbstractTestSecureIPC { rpcServer.start(); try (RpcClient rpcClient = RpcClientFactory.createClient(clientConf, HConstants.DEFAULT_CLUSTER_ID.toString())) { - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - BlockingRpcChannel channel = - rpcClient.createBlockingRpcChannel( - ServerName.valueOf(address.getHostName(), address.getPort(), - System.currentTimeMillis()), clientUser, 0); - TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = - TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(channel); + BlockingInterface stub = newBlockingStub(rpcClient, rpcServer.getListenerAddress(), + clientUser); List results = new ArrayList<>(); TestThread th1 = new TestThread(stub, results); final Throwable exception[] = new Throwable[1]; @@ -298,11 +234,11 @@ public abstract class AbstractTestSecureIPC { } public static class TestThread extends Thread { - private final TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub; + private final BlockingInterface stub; private final List results; - public TestThread(TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub, List results) { + public TestThread(BlockingInterface stub, List results) { this.stub = stub; this.results = results; }