HBASE-6787 Convert RowProcessorProtocol to protocol buffer service (Devaraj)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1412356 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-11-21 23:11:54 +00:00
parent 4442800016
commit ed9218145f
14 changed files with 2362 additions and 120 deletions

View File

@ -0,0 +1,667 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: MultiRowMutationProcessorMessages.proto
package org.apache.hadoop.hbase.protobuf.generated;
public final class MultiRowMutationProcessorProtos {
private MultiRowMutationProcessorProtos() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public interface MultiRowMutationProcessorRequestOrBuilder
extends com.google.protobuf.MessageOrBuilder {
}
public static final class MultiRowMutationProcessorRequest extends
com.google.protobuf.GeneratedMessage
implements MultiRowMutationProcessorRequestOrBuilder {
// Use MultiRowMutationProcessorRequest.newBuilder() to construct.
private MultiRowMutationProcessorRequest(Builder builder) {
super(builder);
}
private MultiRowMutationProcessorRequest(boolean noInit) {}
private static final MultiRowMutationProcessorRequest defaultInstance;
public static MultiRowMutationProcessorRequest getDefaultInstance() {
return defaultInstance;
}
public MultiRowMutationProcessorRequest getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorRequest_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorRequest_fieldAccessorTable;
}
private void initFields() {
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest other = (org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest) obj;
boolean result = true;
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
@java.lang.Override
public int hashCode() {
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequestOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorRequest_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorRequest_fieldAccessorTable;
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest.getDescriptor();
}
public org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest build() {
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest result = new org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest(this);
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest.getDefaultInstance()) return this;
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
}
}
}
// @@protoc_insertion_point(builder_scope:MultiRowMutationProcessorRequest)
}
static {
defaultInstance = new MultiRowMutationProcessorRequest(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:MultiRowMutationProcessorRequest)
}
public interface MultiRowMutationProcessorResponseOrBuilder
extends com.google.protobuf.MessageOrBuilder {
}
public static final class MultiRowMutationProcessorResponse extends
com.google.protobuf.GeneratedMessage
implements MultiRowMutationProcessorResponseOrBuilder {
// Use MultiRowMutationProcessorResponse.newBuilder() to construct.
private MultiRowMutationProcessorResponse(Builder builder) {
super(builder);
}
private MultiRowMutationProcessorResponse(boolean noInit) {}
private static final MultiRowMutationProcessorResponse defaultInstance;
public static MultiRowMutationProcessorResponse getDefaultInstance() {
return defaultInstance;
}
public MultiRowMutationProcessorResponse getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorResponse_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorResponse_fieldAccessorTable;
}
private void initFields() {
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse other = (org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse) obj;
boolean result = true;
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
@java.lang.Override
public int hashCode() {
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponseOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorResponse_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.internal_static_MultiRowMutationProcessorResponse_fieldAccessorTable;
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse.getDescriptor();
}
public org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse build() {
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse result = new org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse(this);
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse.getDefaultInstance()) return this;
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
}
}
}
// @@protoc_insertion_point(builder_scope:MultiRowMutationProcessorResponse)
}
static {
defaultInstance = new MultiRowMutationProcessorResponse(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:MultiRowMutationProcessorResponse)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_MultiRowMutationProcessorRequest_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_MultiRowMutationProcessorRequest_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_MultiRowMutationProcessorResponse_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_MultiRowMutationProcessorResponse_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\'MultiRowMutationProcessorMessages.prot" +
"o\"\"\n MultiRowMutationProcessorRequest\"#\n" +
"!MultiRowMutationProcessorResponseBR\n*or" +
"g.apache.hadoop.hbase.protobuf.generated" +
"B\037MultiRowMutationProcessorProtosH\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_MultiRowMutationProcessorRequest_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_MultiRowMutationProcessorRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MultiRowMutationProcessorRequest_descriptor,
new java.lang.String[] { },
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest.class,
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest.Builder.class);
internal_static_MultiRowMutationProcessorResponse_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_MultiRowMutationProcessorResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MultiRowMutationProcessorResponse_descriptor,
new java.lang.String[] { },
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse.class,
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Defines a protocol to perform multi row transactions.
* See BaseRowProcessorEndpoint for the implementation.
* See HRegion#processRowsWithLocks() for details.
*/
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "MultiRowMutationProcessorProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message MultiRowMutationProcessorRequest{
}
message MultiRowMutationProcessorResponse{
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Defines a protocol to perform multi row transactions.
* See BaseRowProcessorEndpoint for the implementation.
* See HRegion#processRowsWithLocks() for details.
*/
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "RowProcessorProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message RowProcessorRequest {
required string rowProcessorClassName = 1;
optional string rowProcessorInitializerMessageName = 2;
optional bytes rowProcessorInitializerMessage = 3;
}
message RowProcessorResult {
required bytes rowProcessorResult = 1;
}
service RowProcessorService {
rpc process (RowProcessorRequest) returns (RowProcessorResult);
}

View File

@ -678,9 +678,10 @@ public class AggregationClient {
final AggregateArgument.Builder requestBuilder = final AggregateArgument.Builder requestBuilder =
AggregateArgument.newBuilder(); AggregateArgument.newBuilder();
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
if (ci.columnInterpreterSpecificData() != null) { ByteString columnInterpreterSpecificData = null;
requestBuilder.setInterpreterSpecificBytes( if ((columnInterpreterSpecificData = ci.columnInterpreterSpecificData())
ci.columnInterpreterSpecificData()); != null) {
requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData);
} }
requestBuilder.setScan(ProtobufUtil.toScan(scan)); requestBuilder.setScan(ProtobufUtil.toScan(scan));
return requestBuilder.build(); return requestBuilder.build();

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -15,27 +15,36 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.coprocessor;
package org.apache.hadoop.hbase.client.coprocessor;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RowProcessor; import org.apache.hadoop.hbase.regionserver.RowProcessor;
import com.google.protobuf.Message;
/** /**
* Defines a protocol to perform multi row transactions. * Convenience class that is used to make RowProcessorEndpoint invocations.
* See {@link BaseRowProcessorEndpoint} for the implementation. * For example usage, refer TestRowProcessorEndpoint
* See {@link HRegion#processRowsWithLocks()} for detials. *
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface RowProcessorProtocol extends CoprocessorProtocol { public class RowProcessorClient {
public static <S extends Message, T extends Message>
/** RowProcessorRequest getRowProcessorPB(RowProcessor<S,T> r)
* @param processor The processor defines how to process the row throws IOException {
*/ final RowProcessorRequest.Builder requestBuilder =
<T> T process(RowProcessor<T> processor) throws IOException; RowProcessorRequest.newBuilder();
requestBuilder.setRowProcessorClassName(r.getClass().getName());
S s = r.getRequestData();
if (s != null) {
requestBuilder.setRowProcessorInitializerMessageName(s.getClass().getName());
requestBuilder.setRowProcessorInitializerMessage(s.toByteString());
}
return requestBuilder.build();
}
} }

View File

@ -18,21 +18,35 @@
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RowProcessor; import org.apache.hadoop.hbase.regionserver.RowProcessor;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/** /**
* This class demonstrates how to implement atomic read-modify-writes * This class demonstrates how to implement atomic read-modify-writes
* using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints. * using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints.
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message>
implements RowProcessorProtocol { extends RowProcessorService implements CoprocessorService, Coprocessor {
private RegionCoprocessorEnvironment env;
/** /**
* Pass a processor to HRegion to process multiple rows atomically. * Pass a processor to HRegion to process multiple rows atomically.
* *
@ -42,16 +56,93 @@ public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor
* *
* See {@link TestRowProcessorEndpoint} for example. * See {@link TestRowProcessorEndpoint} for example.
* *
* @param processor The object defines the read-modify-write procedure * The request contains information for constructing processor
* @return The processing result * (see {@link #constructRowProcessorFromRequest}. The processor object defines
* the read-modify-write procedure.
*/ */
@Override @Override
public <T> T process(RowProcessor<T> processor) public void process(RpcController controller, RowProcessorRequest request,
throws IOException { RpcCallback<RowProcessorResult> done) {
HRegion region = RowProcessorResult resultProto = null;
((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); try {
RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
HRegion region = env.getRegion();
region.processRowsWithLocks(processor); region.processRowsWithLocks(processor);
return processor.getResult(); T result = processor.getResult();
RowProcessorResult.Builder b = RowProcessorResult.newBuilder();
b.setRowProcessorResult(result.toByteString());
resultProto = b.build();
} catch (Exception e) {
ResponseConverter.setControllerException(controller, new IOException(e));
}
done.run(resultProto);
} }
@Override
public Service getService() {
return this;
}
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
* coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
* on a table region, so always expects this to be an instance of
* {@link RegionCoprocessorEnvironment}.
* @param env the environment provided by the coprocessor host
* @throws IOException if the provided environment is not an instance of
* {@code RegionCoprocessorEnvironment}
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}
@SuppressWarnings("unchecked")
RowProcessor<S,T> constructRowProcessorFromRequest(RowProcessorRequest request)
throws IOException {
String className = request.getRowProcessorClassName();
Class<?> cls;
try {
cls = Class.forName(className);
RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.newInstance();
if (request.hasRowProcessorInitializerMessageName()) {
Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
.asSubclass(Message.class);
Method m;
try {
m = imn.getMethod("parseFrom", ByteString.class);
} catch (SecurityException e) {
throw new IOException(e);
} catch (NoSuchMethodException e) {
throw new IOException(e);
}
S s;
try {
s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
} catch (IllegalArgumentException e) {
throw new IOException(e);
} catch (InvocationTargetException e) {
throw new IOException(e);
}
ci.initialize(s);
}
return ci;
} catch (ClassNotFoundException e) {
throw new IOException(e);
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
}
}
} }

View File

@ -119,7 +119,7 @@ public interface ColumnInterpreter<T, S> {
/** /**
* This method should return any additional data that is needed on the * This method should return any additional data that is needed on the
* server side to construct the ColumnInterpreter. The server * server side to construct the ColumnInterpreter. The server
* will pass this to the {@link #initialize(org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.ColumnInterpreter)} * will pass this to the {@link #initialize(ByteString)}
* method. If there is no ColumnInterpreter specific data (for e.g., * method. If there is no ColumnInterpreter specific data (for e.g.,
* {@link LongColumnInterpreter}) then null should be returned. * {@link LongColumnInterpreter}) then null should be returned.
* @return the PB message * @return the PB message

View File

@ -23,15 +23,13 @@ import java.util.UUID;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.protobuf.Message;
/** /**
* Base class for RowProcessor with some default implementations. * Base class for RowProcessor with some default implementations.
*/ */
public abstract class BaseRowProcessor<T> implements RowProcessor<T> { public abstract class BaseRowProcessor<S extends Message,T extends Message>
implements RowProcessor<S,T> {
@Override
public T getResult() {
return null;
}
@Override @Override
public void preProcess(HRegion region, WALEdit walEdit) throws IOException { public void preProcess(HRegion region, WALEdit walEdit) throws IOException {

View File

@ -4302,7 +4302,7 @@ public class HRegion implements HeapSize { // , Writable{
* *
* @param processor The object defines the reads and writes to a row. * @param processor The object defines the reads and writes to a row.
*/ */
public void processRowsWithLocks(RowProcessor<?> processor) public void processRowsWithLocks(RowProcessor<?,?> processor)
throws IOException { throws IOException {
processRowsWithLocks(processor, rowProcessorTimeout); processRowsWithLocks(processor, rowProcessorTimeout);
} }
@ -4314,7 +4314,7 @@ public class HRegion implements HeapSize { // , Writable{
* @param timeout The timeout of the processor.process() execution * @param timeout The timeout of the processor.process() execution
* Use a negative number to switch off the time bound * Use a negative number to switch off the time bound
*/ */
public void processRowsWithLocks(RowProcessor<?> processor, long timeout) public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout)
throws IOException { throws IOException {
for (byte[] row : processor.getRowsToLock()) { for (byte[] row : processor.getRowsToLock()) {
@ -4453,7 +4453,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
} }
private void doProcessRowWithTimeout(final RowProcessor<?> processor, private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
final long now, final long now,
final HRegion region, final HRegion region,
final List<KeyValue> mutations, final List<KeyValue> mutations,

View File

@ -27,13 +27,16 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
/** /**
* A <code>MultiRowProcessor</code> that performs multiple puts and deletes. * A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
*/ */
class MultiRowMutationProcessor extends BaseRowProcessor<Void> { class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
MultiRowMutationProcessorResponse> {
Collection<byte[]> rowsToLock; Collection<byte[]> rowsToLock;
Collection<Mutation> mutations; Collection<Mutation> mutations;
@ -53,6 +56,11 @@ class MultiRowMutationProcessor extends BaseRowProcessor<Void> {
return false; return false;
} }
@Override
public MultiRowMutationProcessorResponse getResult() {
return MultiRowMutationProcessorResponse.getDefaultInstance();
}
@Override @Override
public void process(long now, public void process(long now,
HRegion region, HRegion region,
@ -123,4 +131,13 @@ class MultiRowMutationProcessor extends BaseRowProcessor<Void> {
} }
} }
@Override
public MultiRowMutationProcessorRequest getRequestData() {
return MultiRowMutationProcessorRequest.getDefaultInstance();
}
@Override
public void initialize(MultiRowMutationProcessorRequest msg) {
//nothing
}
} }

View File

@ -27,6 +27,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -38,10 +41,12 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
* This class performs scans and generates mutations and WAL edits. * This class performs scans and generates mutations and WAL edits.
* The locks and MVCC will be handled by HRegion. * The locks and MVCC will be handled by HRegion.
* *
* The generic type parameter T is the return type of * The RowProcessor user code could have data that needs to be
* RowProcessor.getResult(). * sent across for proper initialization at the server side. The generic type
* parameter S is the type of the request data sent to the server.
* The generic type parameter T is the return type of RowProcessor.getResult().
*/ */
public interface RowProcessor<T> { public interface RowProcessor<S extends Message, T extends Message> {
/** /**
* Rows to lock while operation. * Rows to lock while operation.
@ -51,7 +56,9 @@ public interface RowProcessor<T> {
Collection<byte[]> getRowsToLock(); Collection<byte[]> getRowsToLock();
/** /**
* Obtain the processing result * Obtain the processing result. All row processor implementations must
* implement this, even if the method is simply returning an empty
* Message.
*/ */
T getResult(); T getResult();
@ -108,4 +115,22 @@ public interface RowProcessor<T> {
* @return The name of the processor * @return The name of the processor
*/ */
String getName(); String getName();
/**
* This method should return any additional data that is needed on the
* server side to construct the RowProcessor. The server will pass this to
* the {@link #initialize(ByteString)} method. If there is no RowProcessor
* specific data then null should be returned.
* @return the PB message
* @throws IOException
*/
S getRequestData() throws IOException;
/**
* This method should initialize any field(s) of the RowProcessor with
* a parsing of the passed message bytes (used on the server side).
* @param msg
* @throws IOException
*/
void initialize(S msg) throws IOException;
} }

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -38,27 +36,39 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.sun.org.apache.commons.logging.Log; import com.sun.org.apache.commons.logging.Log;
import com.sun.org.apache.commons.logging.LogFactory; import com.sun.org.apache.commons.logging.LogFactory;
@ -100,7 +110,7 @@ public class TestRowProcessorEndpoint {
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
Configuration conf = util.getConfiguration(); Configuration conf = util.getConfiguration();
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
RowProcessorEndpoint.class.getName()); RowProcessorEndpoint.class.getName());
conf.setInt("hbase.client.retries.number", 1); conf.setInt("hbase.client.retries.number", 1);
conf.setLong("hbase.hregion.row.processor.timeout", 1000L); conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
@ -138,12 +148,18 @@ public class TestRowProcessorEndpoint {
@Test @Test
public void testDoubleScan() throws Throwable { public void testDoubleScan() throws Throwable {
prepareTestData(); prepareTestData();
RowProcessorProtocol protocol =
table.coprocessorProxy(RowProcessorProtocol.class, ROW); CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.FriendsOfFriendsProcessor processor = RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
Set<String> result = protocol.process(processor); RowProcessorService.BlockingInterface service =
RowProcessorService.newBlockingStub(channel);
RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
RowProcessorResult protoResult = service.process(null, request);
FriendsOfFriendsProcessorResponse response =
FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
Set<String> result = new HashSet<String>();
result.addAll(response.getResultList());
Set<String> expected = Set<String> expected =
new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"})); new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
Get get = new Get(ROW); Get get = new Get(ROW);
@ -176,12 +192,17 @@ public class TestRowProcessorEndpoint {
} }
private int incrementCounter(HTable table) throws Throwable { private int incrementCounter(HTable table) throws Throwable {
RowProcessorProtocol protocol = CoprocessorRpcChannel channel = table.coprocessorService(ROW);
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
RowProcessorEndpoint.IncrementCounterProcessor processor = RowProcessorEndpoint.IncrementCounterProcessor processor =
new RowProcessorEndpoint.IncrementCounterProcessor(ROW); new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
int counterValue = protocol.process(processor); RowProcessorService.BlockingInterface service =
return counterValue; RowProcessorService.newBlockingStub(channel);
RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
RowProcessorResult protoResult = service.process(null, request);
IncCounterProcessorResponse response = IncCounterProcessorResponse
.parseFrom(protoResult.getRowProcessorResult());
Integer result = response.getResponse();
return result;
} }
private void concurrentExec( private void concurrentExec(
@ -234,23 +255,27 @@ public class TestRowProcessorEndpoint {
} }
private void swapRows(HTable table) throws Throwable { private void swapRows(HTable table) throws Throwable {
RowProcessorProtocol protocol = CoprocessorRpcChannel channel = table.coprocessorService(ROW);
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
RowProcessorEndpoint.RowSwapProcessor processor = RowProcessorEndpoint.RowSwapProcessor processor =
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
protocol.process(processor); RowProcessorService.BlockingInterface service =
RowProcessorService.newBlockingStub(channel);
RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
service.process(null, request);
} }
@Test @Test
public void testTimeout() throws Throwable { public void testTimeout() throws Throwable {
prepareTestData(); prepareTestData();
RowProcessorProtocol protocol = CoprocessorRpcChannel channel = table.coprocessorService(ROW);
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
RowProcessorEndpoint.TimeoutProcessor processor = RowProcessorEndpoint.TimeoutProcessor processor =
new RowProcessorEndpoint.TimeoutProcessor(ROW); new RowProcessorEndpoint.TimeoutProcessor(ROW);
RowProcessorService.BlockingInterface service =
RowProcessorService.newBlockingStub(channel);
RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
boolean exceptionCaught = false; boolean exceptionCaught = false;
try { try {
protocol.process(processor); service.process(null, request);
} catch (Exception e) { } catch (Exception e) {
exceptionCaught = true; exceptionCaught = true;
} }
@ -264,11 +289,11 @@ public class TestRowProcessorEndpoint {
* We define the RowProcessors as the inner class of the endpoint. * We define the RowProcessors as the inner class of the endpoint.
* So they can be loaded with the endpoint on the coprocessor. * So they can be loaded with the endpoint on the coprocessor.
*/ */
public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint public static class RowProcessorEndpoint<S extends Message,T extends Message>
implements RowProcessorProtocol { extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
public static class IncrementCounterProcessor extends public static class IncrementCounterProcessor extends
BaseRowProcessor<Integer> implements Writable { BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
int counter = 0; int counter = 0;
byte[] row = new byte[0]; byte[] row = new byte[0];
@ -288,8 +313,10 @@ public class TestRowProcessorEndpoint {
} }
@Override @Override
public Integer getResult() { public IncCounterProcessorResponse getResult() {
return counter; IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
i.setResponse(counter);
return i.build();
} }
@Override @Override
@ -330,21 +357,22 @@ public class TestRowProcessorEndpoint {
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public IncCounterProcessorRequest getRequestData() throws IOException {
this.row = Bytes.readByteArray(in); IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
this.counter = in.readInt(); builder.setCounter(counter);
builder.setRow(ByteString.copyFrom(row));
return builder.build();
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void initialize(IncCounterProcessorRequest msg) {
Bytes.writeByteArray(out, row); this.row = msg.getRow().toByteArray();
out.writeInt(counter); this.counter = msg.getCounter();
} }
} }
public static class FriendsOfFriendsProcessor extends public static class FriendsOfFriendsProcessor extends
BaseRowProcessor<Set<String>> implements Writable { BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
byte[] row = null; byte[] row = null;
byte[] person = null; byte[] person = null;
final Set<String> result = new HashSet<String>(); final Set<String> result = new HashSet<String>();
@ -366,8 +394,11 @@ public class TestRowProcessorEndpoint {
} }
@Override @Override
public Set<String> getResult() { public FriendsOfFriendsProcessorResponse getResult() {
return result; FriendsOfFriendsProcessorResponse.Builder builder =
FriendsOfFriendsProcessorResponse.newBuilder();
builder.addAllResult(result);
return builder.build();
} }
@Override @Override
@ -405,29 +436,28 @@ public class TestRowProcessorEndpoint {
} }
@Override @Override
public void readFields(DataInput in) throws IOException { public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
this.person = Bytes.readByteArray(in); FriendsOfFriendsProcessorRequest.Builder builder =
this.row = Bytes.readByteArray(in); FriendsOfFriendsProcessorRequest.newBuilder();
int size = in.readInt(); builder.setPerson(ByteString.copyFrom(person));
result.clear(); builder.setRow(ByteString.copyFrom(row));
for (int i = 0; i < size; ++i) { builder.addAllResult(result);
result.add(Text.readString(in)); FriendsOfFriendsProcessorRequest f = builder.build();
} return f;
} }
@Override @Override
public void write(DataOutput out) throws IOException { public void initialize(FriendsOfFriendsProcessorRequest request)
Bytes.writeByteArray(out, person); throws IOException {
Bytes.writeByteArray(out, row); this.person = request.getPerson().toByteArray();
out.writeInt(result.size()); this.row = request.getRow().toByteArray();
for (String s : result) { result.clear();
Text.writeString(out, s); result.addAll(request.getResultList());
}
} }
} }
public static class RowSwapProcessor extends public static class RowSwapProcessor extends
BaseRowProcessor<Set<String>> implements Writable { BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
byte[] row1 = new byte[0]; byte[] row1 = new byte[0];
byte[] row2 = new byte[0]; byte[] row2 = new byte[0];
@ -455,6 +485,11 @@ public class TestRowProcessorEndpoint {
return false; return false;
} }
@Override
public RowSwapProcessorResponse getResult() {
return RowSwapProcessorResponse.getDefaultInstance();
}
@Override @Override
public void process(long now, HRegion region, public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException { List<KeyValue> mutations, WALEdit walEdit) throws IOException {
@ -501,26 +536,28 @@ public class TestRowProcessorEndpoint {
} }
} }
@Override
public void readFields(DataInput in) throws IOException {
this.row1 = Bytes.readByteArray(in);
this.row2 = Bytes.readByteArray(in);
}
@Override
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, row1);
Bytes.writeByteArray(out, row2);
}
@Override @Override
public String getName() { public String getName() {
return "swap"; return "swap";
} }
@Override
public RowSwapProcessorRequest getRequestData() throws IOException {
RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
builder.setRow1(ByteString.copyFrom(row1));
builder.setRow2(ByteString.copyFrom(row2));
return builder.build();
}
@Override
public void initialize(RowSwapProcessorRequest msg) {
this.row1 = msg.getRow1().toByteArray();
this.row2 = msg.getRow2().toByteArray();
}
} }
public static class TimeoutProcessor extends public static class TimeoutProcessor extends
BaseRowProcessor<Void> implements Writable { BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
byte[] row = new byte[0]; byte[] row = new byte[0];
@ -538,6 +575,11 @@ public class TestRowProcessorEndpoint {
return Collections.singleton(row); return Collections.singleton(row);
} }
@Override
public TimeoutProcessorResponse getResult() {
return TimeoutProcessorResponse.getDefaultInstance();
}
@Override @Override
public void process(long now, HRegion region, public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException { List<KeyValue> mutations, WALEdit walEdit) throws IOException {
@ -554,20 +596,22 @@ public class TestRowProcessorEndpoint {
return true; return true;
} }
@Override
public void readFields(DataInput in) throws IOException {
this.row = Bytes.readByteArray(in);
}
@Override
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, row);
}
@Override @Override
public String getName() { public String getName() {
return "timeout"; return "timeout";
} }
@Override
public TimeoutProcessorRequest getRequestData() throws IOException {
TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
builder.setRow(ByteString.copyFrom(row));
return builder.build();
}
@Override
public void initialize(TimeoutProcessorRequest msg) throws IOException {
this.row = msg.getRow().toByteArray();
}
} }
public static void doScan( public static void doScan(

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
option java_outer_classname = "IncrementCounterProcessorTestProtos";
option java_generate_equals_and_hash = true;
message IncCounterProcessorRequest {
required bytes row = 1;
required int32 counter = 2;
}
message IncCounterProcessorResponse {
required int32 response = 1;
}
message FriendsOfFriendsProcessorRequest {
required bytes person = 1;
required bytes row = 2;
repeated string result = 3;
}
message FriendsOfFriendsProcessorResponse {
repeated string result = 1;
}
message RowSwapProcessorRequest {
required bytes row1 = 1;
required bytes row2 = 2;
}
message RowSwapProcessorResponse {
}
message TimeoutProcessorRequest {
required bytes row = 1;
}
message TimeoutProcessorResponse {
}