HBASE-6786 Convert MultiRowMutationProtocol to protocol buffer service (Devaraj Das)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1399529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2012-10-18 06:38:33 +00:00
parent d6d73eb286
commit 6f3cce5220
6 changed files with 1364 additions and 87 deletions

View File

@ -18,55 +18,134 @@
package org.apache.hadoop.hbase.coprocessor; package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.WrongRegionException; import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/** /**
* This class demonstrates how to implement atomic multi row transactions using * This class demonstrates how to implement atomic multi row transactions using
* {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)} * {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
* and Coprocessor endpoints. * and Coprocessor endpoints.
*
* Defines a protocol to perform multi row transactions.
* See {@link MultiRowMutationEndpoint} for the implementation.
* </br>
* See
* {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
* for details and limitations.
* </br>
* Example:
* <code><pre>
* List<Mutation> mutations = ...;
* Put p1 = new Put(row1);
* Put p2 = new Put(row2);
* ...
* Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p1);
* Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p2);
* MultiMutateRequest.Builder mrmBuilder = MultiMutateRequest.newBuilder();
* mrmBuilder.addMutatationRequest(m1);
* mrmBuilder.addMutatationRequest(m2);
* CoprocessorRpcChannel channel = t.coprocessorService(ROW);
* MultiRowMutationService.BlockingInterface service =
* MultiRowMutationService.newBlockingStub(channel);
* MultiMutateRequest mrm = mrmBuilder.build();
* service.mutateRows(null, mrm);
* </pre></code>
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class MultiRowMutationEndpoint extends BaseEndpointCoprocessor implements public class MultiRowMutationEndpoint extends MultiRowMutationService implements
MultiRowMutationProtocol { CoprocessorService, Coprocessor {
private RegionCoprocessorEnvironment env;
@Override
public void mutateRows(RpcController controller, MultiMutateRequest request,
RpcCallback<MultiMutateResponse> done) {
MultiMutateResponse response = MultiMutateResponse.getDefaultInstance();
try {
// set of rows to lock, sorted to avoid deadlocks
SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
List<Mutate> mutateRequestList = request.getMutatationRequestList();
List<Mutation> mutations = new ArrayList<Mutation>(mutateRequestList.size());
for (Mutate m : mutateRequestList) {
mutations.add(ProtobufUtil.toMutation(m));
}
HRegionInfo regionInfo = env.getRegion().getRegionInfo();
for (Mutation m : mutations) {
// check whether rows are in range for this region
if (!HRegion.rowIsInRange(regionInfo, m.getRow())) {
String msg = "Requested row out of range '"
+ Bytes.toStringBinary(m.getRow()) + "'";
if (rowsToLock.isEmpty()) {
// if this is the first row, region might have moved,
// allow client to retry
throw new WrongRegionException(msg);
} else {
// rows are split between regions, do not retry
throw new DoNotRetryIOException(msg);
}
}
rowsToLock.add(m.getRow());
}
// call utility method on region
env.getRegion().mutateRowsWithLocks(mutations, rowsToLock);
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
}
done.run(response);
}
@Override @Override
public void mutateRows(List<Mutation> mutations) throws IOException { public Service getService() {
// get the coprocessor environment return this;
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) getEnvironment(); }
// set of rows to lock, sorted to avoid deadlocks /**
SortedSet<byte[]> rowsToLock = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); * Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
HRegionInfo regionInfo = env.getRegion().getRegionInfo(); * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
for (Mutation m : mutations) { * on a table region, so always expects this to be an instance of
// check whether rows are in range for this region * {@link RegionCoprocessorEnvironment}.
if (!HRegion.rowIsInRange(regionInfo, m.getRow())) { * @param env the environment provided by the coprocessor host
String msg = "Requested row out of range '" * @throws IOException if the provided environment is not an instance of
+ Bytes.toStringBinary(m.getRow()) + "'"; * {@code RegionCoprocessorEnvironment}
if (rowsToLock.isEmpty()) { */
// if this is the first row, region might have moved, @Override
// allow client to retry public void start(CoprocessorEnvironment env) throws IOException {
throw new WrongRegionException(msg); if (env instanceof RegionCoprocessorEnvironment) {
} else { this.env = (RegionCoprocessorEnvironment)env;
// rows are split between regions, do not retry } else {
throw new DoNotRetryIOException(msg); throw new CoprocessorException("Must be loaded on a table region!");
}
}
rowsToLock.add(m.getRow());
} }
// call utility method on region }
env.getRegion().mutateRowsWithLocks(mutations, rowsToLock);
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
} }
} }

View File

@ -1,54 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* Defines a protocol to perform multi row transactions.
* See {@link MultiRowMutationEndpoint} for the implementation.
* </br>
* See
* {@link HRegion#mutateRowsWithLocks(java.util.Collection, java.util.Collection)}
* for details and limitations.
* </br>
* Example:
* <code><pre>
* List<Mutation> mutations = ...;
* Put p1 = new Put(row1);
* Put p2 = new Put(row2);
* ...
* mutations.add(p1);
* mutations.add(p2);
* MultiRowMutationProtocol mrOp = t.coprocessorProxy(
* MultiRowMutationProtocol.class, row1);
* mrOp.mutateRows(mutations);
* </pre></code>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface MultiRowMutationProtocol extends CoprocessorProtocol {
public void mutateRows(List<Mutation> mutations) throws IOException;
}

View File

@ -530,6 +530,27 @@ public final class ProtobufUtil {
return append; return append;
} }
/**
* Convert a MutateRequest to Mutation
*
* @param proto the protocol buffer Mutate to convert
* @return the converted Mutation
* @throws IOException
*/
public static Mutation toMutation(final Mutate proto) throws IOException {
MutateType type = proto.getMutateType();
if (type == MutateType.APPEND) {
return toAppend(proto);
}
if (type == MutateType.DELETE) {
return toDelete(proto);
}
if (type == MutateType.PUT) {
return toPut(proto);
}
throw new IOException("Not an understood mutate type " + type);
}
/** /**
* Convert a protocol buffer Mutate to an Increment * Convert a protocol buffer Mutate to an Increment
* *

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import "Client.proto";
option java_package = "org.apache.hadoop.hbase.protobuf.generated";
option java_outer_classname = "MultiRowMutation";
option java_generate_equals_and_hash = true;
option java_generic_services = true;
option optimize_for = SPEED;
message MultiMutateRequest {
repeated Mutate mutatationRequest = 1;
}
message MultiMutateResponse {
}
service MultiRowMutationService {
rpc mutateRows(MultiMutateRequest)
returns(MultiMutateResponse);
}

View File

@ -46,10 +46,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationProtocol;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@ -64,6 +64,12 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiMutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutation.MultiRowMutationService;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
@ -4175,16 +4181,22 @@ public class TestFromClientSide {
final byte [] ROW1 = Bytes.toBytes("testRow1"); final byte [] ROW1 = Bytes.toBytes("testRow1");
HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY); HTable t = TEST_UTIL.createTable(TABLENAME, FAMILY);
List<Mutation> mrm = new ArrayList<Mutation>();
Put p = new Put(ROW); Put p = new Put(ROW);
p.add(FAMILY, QUALIFIER, VALUE); p.add(FAMILY, QUALIFIER, VALUE);
mrm.add(p); Mutate m1 = ProtobufUtil.toMutate(MutateType.PUT, p);
p = new Put(ROW1); p = new Put(ROW1);
p.add(FAMILY, QUALIFIER, VALUE); p.add(FAMILY, QUALIFIER, VALUE);
mrm.add(p); Mutate m2 = ProtobufUtil.toMutate(MutateType.PUT, p);
MultiRowMutationProtocol mr = t.coprocessorProxy(
MultiRowMutationProtocol.class, ROW); MultiMutateRequest.Builder mrmBuilder = MultiMutateRequest.newBuilder();
mr.mutateRows(mrm); mrmBuilder.addMutatationRequest(m1);
mrmBuilder.addMutatationRequest(m2);
MultiMutateRequest mrm = mrmBuilder.build();
CoprocessorRpcChannel channel = t.coprocessorService(ROW);
MultiRowMutationService.BlockingInterface service =
MultiRowMutationService.newBlockingStub(channel);
service.mutateRows(null, mrm);
Get g = new Get(ROW); Get g = new Get(ROW);
Result r = t.get(g); Result r = t.get(g);
assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER))); assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIER)));