HBASE-25702 Remove RowProcessor (#3097)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
b0625984f2
commit
93b1163a8b
|
@ -1,679 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.coprocessor;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.RowProcessorService;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verifies ProcessEndpoint works.
|
|
||||||
* The tested RowProcessor performs two scans and a read-modify-write.
|
|
||||||
*/
|
|
||||||
@Category({CoprocessorTests.class, MediumTests.class})
|
|
||||||
public class TestRowProcessorEndpoint {
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestRowProcessorEndpoint.class);
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestRowProcessorEndpoint.class);
|
|
||||||
|
|
||||||
private static final TableName TABLE = TableName.valueOf("testtable");
|
|
||||||
private final static byte[] ROW = Bytes.toBytes("testrow");
|
|
||||||
private final static byte[] ROW2 = Bytes.toBytes("testrow2");
|
|
||||||
private final static byte[] FAM = Bytes.toBytes("friendlist");
|
|
||||||
|
|
||||||
// Column names
|
|
||||||
private final static byte[] A = Bytes.toBytes("a");
|
|
||||||
private final static byte[] B = Bytes.toBytes("b");
|
|
||||||
private final static byte[] C = Bytes.toBytes("c");
|
|
||||||
private final static byte[] D = Bytes.toBytes("d");
|
|
||||||
private final static byte[] E = Bytes.toBytes("e");
|
|
||||||
private final static byte[] F = Bytes.toBytes("f");
|
|
||||||
private final static byte[] G = Bytes.toBytes("g");
|
|
||||||
private final static byte[] COUNTER = Bytes.toBytes("counter");
|
|
||||||
private final static AtomicLong myTimer = new AtomicLong(0);
|
|
||||||
private final AtomicInteger failures = new AtomicInteger(0);
|
|
||||||
|
|
||||||
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
|
||||||
private static volatile int expectedCounter = 0;
|
|
||||||
private static int rowSize, row2Size;
|
|
||||||
|
|
||||||
private volatile static Table table = null;
|
|
||||||
private volatile static boolean swapped = false;
|
|
||||||
private volatile CountDownLatch startSignal;
|
|
||||||
private volatile CountDownLatch doneSignal;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupBeforeClass() throws Exception {
|
|
||||||
Configuration conf = util.getConfiguration();
|
|
||||||
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
|
||||||
RowProcessorEndpoint.class.getName());
|
|
||||||
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
|
||||||
conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
|
|
||||||
conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048);
|
|
||||||
util.startMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
util.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void prepareTestData() throws Exception {
|
|
||||||
try {
|
|
||||||
util.getAdmin().disableTable(TABLE);
|
|
||||||
util.getAdmin().deleteTable(TABLE);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// ignore table not found
|
|
||||||
}
|
|
||||||
table = util.createTable(TABLE, FAM);
|
|
||||||
{
|
|
||||||
Put put = new Put(ROW);
|
|
||||||
put.addColumn(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
|
||||||
put.addColumn(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
|
|
||||||
put.addColumn(FAM, C, G); // G is a friend of C
|
|
||||||
table.put(put);
|
|
||||||
rowSize = put.size();
|
|
||||||
}
|
|
||||||
Put put = new Put(ROW2);
|
|
||||||
put.addColumn(FAM, D, E);
|
|
||||||
put.addColumn(FAM, F, G);
|
|
||||||
table.put(put);
|
|
||||||
row2Size = put.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDoubleScan() throws Throwable {
|
|
||||||
prepareTestData();
|
|
||||||
|
|
||||||
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
|
|
||||||
RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
|
|
||||||
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
|
|
||||||
RowProcessorService.BlockingInterface service =
|
|
||||||
RowProcessorService.newBlockingStub(channel);
|
|
||||||
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
|
|
||||||
ProcessResponse protoResult = service.process(null, request);
|
|
||||||
FriendsOfFriendsProcessorResponse response =
|
|
||||||
FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
|
|
||||||
Set<String> result = new HashSet<>();
|
|
||||||
result.addAll(response.getResultList());
|
|
||||||
Set<String> expected = new HashSet<>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
|
|
||||||
Get get = new Get(ROW);
|
|
||||||
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
|
|
||||||
assertEquals(expected, result);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadModifyWrite() throws Throwable {
|
|
||||||
prepareTestData();
|
|
||||||
failures.set(0);
|
|
||||||
int numThreads = 100;
|
|
||||||
concurrentExec(new IncrementRunner(), numThreads);
|
|
||||||
Get get = new Get(ROW);
|
|
||||||
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
|
|
||||||
int finalCounter = incrementCounter(table);
|
|
||||||
int failureNumber = failures.get();
|
|
||||||
if (failureNumber > 0) {
|
|
||||||
LOG.debug("We failed " + failureNumber + " times during test");
|
|
||||||
}
|
|
||||||
assertEquals(numThreads + 1 - failureNumber, finalCounter);
|
|
||||||
}
|
|
||||||
|
|
||||||
class IncrementRunner implements Runnable {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
incrementCounter(table);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
failures.incrementAndGet();
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private int incrementCounter(Table table) throws Throwable {
|
|
||||||
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
|
|
||||||
RowProcessorEndpoint.IncrementCounterProcessor processor =
|
|
||||||
new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
|
|
||||||
RowProcessorService.BlockingInterface service =
|
|
||||||
RowProcessorService.newBlockingStub(channel);
|
|
||||||
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
|
|
||||||
ProcessResponse protoResult = service.process(null, request);
|
|
||||||
IncCounterProcessorResponse response = IncCounterProcessorResponse
|
|
||||||
.parseFrom(protoResult.getRowProcessorResult());
|
|
||||||
Integer result = response.getResponse();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void concurrentExec(final Runnable task, final int numThreads) throws Throwable {
|
|
||||||
startSignal = new CountDownLatch(numThreads);
|
|
||||||
doneSignal = new CountDownLatch(numThreads);
|
|
||||||
for (int i = 0; i < numThreads; ++i) {
|
|
||||||
new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
startSignal.countDown();
|
|
||||||
startSignal.await();
|
|
||||||
task.run();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
failures.incrementAndGet();
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
doneSignal.countDown();
|
|
||||||
}
|
|
||||||
}).start();
|
|
||||||
}
|
|
||||||
doneSignal.await();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultipleRows() throws Throwable {
|
|
||||||
prepareTestData();
|
|
||||||
failures.set(0);
|
|
||||||
int numThreads = 100;
|
|
||||||
concurrentExec(new SwapRowsRunner(), numThreads);
|
|
||||||
LOG.debug("row keyvalues:" +
|
|
||||||
stringifyKvs(table.get(new Get(ROW)).listCells()));
|
|
||||||
LOG.debug("row2 keyvalues:" +
|
|
||||||
stringifyKvs(table.get(new Get(ROW2)).listCells()));
|
|
||||||
int failureNumber = failures.get();
|
|
||||||
if (failureNumber > 0) {
|
|
||||||
LOG.debug("We failed " + failureNumber + " times during test");
|
|
||||||
}
|
|
||||||
if (!swapped) {
|
|
||||||
assertEquals(rowSize, table.get(new Get(ROW)).listCells().size());
|
|
||||||
assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size());
|
|
||||||
} else {
|
|
||||||
assertEquals(rowSize, table.get(new Get(ROW2)).listCells().size());
|
|
||||||
assertEquals(row2Size, table.get(new Get(ROW)).listCells().size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class SwapRowsRunner implements Runnable {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
swapRows(table);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
failures.incrementAndGet();
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void swapRows(Table table) throws Throwable {
|
|
||||||
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
|
|
||||||
RowProcessorEndpoint.RowSwapProcessor processor =
|
|
||||||
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
|
|
||||||
RowProcessorService.BlockingInterface service =
|
|
||||||
RowProcessorService.newBlockingStub(channel);
|
|
||||||
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
|
|
||||||
service.process(null, request);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testTimeout() throws Throwable {
|
|
||||||
prepareTestData();
|
|
||||||
CoprocessorRpcChannel channel = table.coprocessorService(ROW);
|
|
||||||
RowProcessorEndpoint.TimeoutProcessor processor =
|
|
||||||
new RowProcessorEndpoint.TimeoutProcessor(ROW);
|
|
||||||
RowProcessorService.BlockingInterface service =
|
|
||||||
RowProcessorService.newBlockingStub(channel);
|
|
||||||
ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
|
|
||||||
boolean exceptionCaught = false;
|
|
||||||
try {
|
|
||||||
service.process(null, request);
|
|
||||||
} catch (Exception e) {
|
|
||||||
exceptionCaught = true;
|
|
||||||
}
|
|
||||||
assertTrue(exceptionCaught);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class defines two RowProcessors:
|
|
||||||
* IncrementCounterProcessor and FriendsOfFriendsProcessor.
|
|
||||||
*
|
|
||||||
* We define the RowProcessors as the inner class of the endpoint.
|
|
||||||
* So they can be loaded with the endpoint on the coprocessor.
|
|
||||||
*/
|
|
||||||
public static class RowProcessorEndpoint<S extends Message,T extends Message>
|
|
||||||
extends BaseRowProcessorEndpoint<S,T> {
|
|
||||||
public static class IncrementCounterProcessor extends
|
|
||||||
BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
|
|
||||||
IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
|
|
||||||
int counter = 0;
|
|
||||||
byte[] row = new byte[0];
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empty constructor for Writable
|
|
||||||
*/
|
|
||||||
IncrementCounterProcessor() {
|
|
||||||
}
|
|
||||||
|
|
||||||
IncrementCounterProcessor(byte[] row) {
|
|
||||||
this.row = row;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<byte[]> getRowsToLock() {
|
|
||||||
return Collections.singleton(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public IncCounterProcessorResponse getResult() {
|
|
||||||
IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
|
|
||||||
i.setResponse(counter);
|
|
||||||
return i.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean readOnly() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(long now, HRegion region,
|
|
||||||
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
|
||||||
// Scan current counter
|
|
||||||
List<Cell> kvs = new ArrayList<>();
|
|
||||||
Scan scan = new Scan().withStartRow(row).withStopRow(row, true);
|
|
||||||
scan.addColumn(FAM, COUNTER);
|
|
||||||
doScan(region, scan, kvs);
|
|
||||||
counter = kvs.isEmpty() ? 0 :
|
|
||||||
Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
|
|
||||||
|
|
||||||
// Assert counter value
|
|
||||||
assertEquals(expectedCounter, counter);
|
|
||||||
|
|
||||||
// Increment counter and send it to both memstore and wal edit
|
|
||||||
counter += 1;
|
|
||||||
expectedCounter += 1;
|
|
||||||
|
|
||||||
|
|
||||||
Put p = new Put(row);
|
|
||||||
KeyValue kv =
|
|
||||||
new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
|
|
||||||
p.add(kv);
|
|
||||||
mutations.add(p);
|
|
||||||
walEdit.add(kv);
|
|
||||||
|
|
||||||
// We can also inject some meta data to the walEdit
|
|
||||||
KeyValue metaKv = new KeyValue(
|
|
||||||
row, WALEdit.METAFAMILY,
|
|
||||||
Bytes.toBytes("I just increment counter"),
|
|
||||||
Bytes.toBytes(counter));
|
|
||||||
walEdit.add(metaKv);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public IncCounterProcessorRequest getRequestData() throws IOException {
|
|
||||||
IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
|
|
||||||
builder.setCounter(counter);
|
|
||||||
builder.setRow(UnsafeByteOperations.unsafeWrap(row));
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initialize(IncCounterProcessorRequest msg) {
|
|
||||||
this.row = msg.getRow().toByteArray();
|
|
||||||
this.counter = msg.getCounter();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class FriendsOfFriendsProcessor extends
|
|
||||||
BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
|
|
||||||
byte[] row = null;
|
|
||||||
byte[] person = null;
|
|
||||||
final Set<String> result = new HashSet<>();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empty constructor for Writable
|
|
||||||
*/
|
|
||||||
FriendsOfFriendsProcessor() {
|
|
||||||
}
|
|
||||||
|
|
||||||
FriendsOfFriendsProcessor(byte[] row, byte[] person) {
|
|
||||||
this.row = row;
|
|
||||||
this.person = person;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<byte[]> getRowsToLock() {
|
|
||||||
return Collections.singleton(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FriendsOfFriendsProcessorResponse getResult() {
|
|
||||||
FriendsOfFriendsProcessorResponse.Builder builder =
|
|
||||||
FriendsOfFriendsProcessorResponse.newBuilder();
|
|
||||||
builder.addAllResult(result);
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean readOnly() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(long now, HRegion region,
|
|
||||||
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
|
||||||
List<Cell> kvs = new ArrayList<>();
|
|
||||||
{ // First scan to get friends of the person
|
|
||||||
Scan scan = new Scan().withStartRow(row).withStopRow(row, true);
|
|
||||||
scan.addColumn(FAM, person);
|
|
||||||
doScan(region, scan, kvs);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Second scan to get friends of friends
|
|
||||||
Scan scan = new Scan().withStartRow(row).withStopRow(row, true);
|
|
||||||
for (Cell kv : kvs) {
|
|
||||||
byte[] friends = CellUtil.cloneValue(kv);
|
|
||||||
for (byte f : friends) {
|
|
||||||
scan.addColumn(FAM, new byte[]{f});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
doScan(region, scan, kvs);
|
|
||||||
|
|
||||||
// Collect result
|
|
||||||
result.clear();
|
|
||||||
for (Cell kv : kvs) {
|
|
||||||
for (byte b : CellUtil.cloneValue(kv)) {
|
|
||||||
result.add((char)b + "");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
|
|
||||||
FriendsOfFriendsProcessorRequest.Builder builder =
|
|
||||||
FriendsOfFriendsProcessorRequest.newBuilder();
|
|
||||||
builder.setPerson(UnsafeByteOperations.unsafeWrap(person));
|
|
||||||
builder.setRow(UnsafeByteOperations.unsafeWrap(row));
|
|
||||||
builder.addAllResult(result);
|
|
||||||
FriendsOfFriendsProcessorRequest f = builder.build();
|
|
||||||
return f;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initialize(FriendsOfFriendsProcessorRequest request)
|
|
||||||
throws IOException {
|
|
||||||
this.person = request.getPerson().toByteArray();
|
|
||||||
this.row = request.getRow().toByteArray();
|
|
||||||
result.clear();
|
|
||||||
result.addAll(request.getResultList());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class RowSwapProcessor extends
|
|
||||||
BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
|
|
||||||
byte[] row1 = new byte[0];
|
|
||||||
byte[] row2 = new byte[0];
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empty constructor for Writable
|
|
||||||
*/
|
|
||||||
RowSwapProcessor() {
|
|
||||||
}
|
|
||||||
|
|
||||||
RowSwapProcessor(byte[] row1, byte[] row2) {
|
|
||||||
this.row1 = row1;
|
|
||||||
this.row2 = row2;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Collection<byte[]> getRowsToLock() {
|
|
||||||
List<byte[]> rows = new ArrayList<>(2);
|
|
||||||
rows.add(row1);
|
|
||||||
rows.add(row2);
|
|
||||||
return rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean readOnly() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RowSwapProcessorResponse getResult() {
|
|
||||||
return RowSwapProcessorResponse.getDefaultInstance();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(long now, HRegion region,
|
|
||||||
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
|
||||||
|
|
||||||
// Override the time to avoid race-condition in the unit test caused by
|
|
||||||
// inacurate timer on some machines
|
|
||||||
now = myTimer.getAndIncrement();
|
|
||||||
|
|
||||||
// Scan both rows
|
|
||||||
List<Cell> kvs1 = new ArrayList<>();
|
|
||||||
List<Cell> kvs2 = new ArrayList<>();
|
|
||||||
doScan(region, new Scan().withStartRow(row1).withStopRow(row1), kvs1);
|
|
||||||
doScan(region, new Scan().withStartRow(row2).withStopRow(row2), kvs2);
|
|
||||||
|
|
||||||
// Assert swapped
|
|
||||||
if (swapped) {
|
|
||||||
assertEquals(rowSize, kvs2.size());
|
|
||||||
assertEquals(row2Size, kvs1.size());
|
|
||||||
} else {
|
|
||||||
assertEquals(rowSize, kvs1.size());
|
|
||||||
assertEquals(row2Size, kvs2.size());
|
|
||||||
}
|
|
||||||
swapped = !swapped;
|
|
||||||
|
|
||||||
// Add and delete keyvalues
|
|
||||||
List<List<Cell>> kvs = new ArrayList<>(2);
|
|
||||||
kvs.add(kvs1);
|
|
||||||
kvs.add(kvs2);
|
|
||||||
byte[][] rows = new byte[][]{row1, row2};
|
|
||||||
for (int i = 0; i < kvs.size(); ++i) {
|
|
||||||
for (Cell kv : kvs.get(i)) {
|
|
||||||
// Delete from the current row and add to the other row
|
|
||||||
Delete d = new Delete(rows[i]);
|
|
||||||
KeyValue kvDelete =
|
|
||||||
new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
|
|
||||||
kv.getTimestamp(), KeyValue.Type.Delete);
|
|
||||||
d.add(kvDelete);
|
|
||||||
Put p = new Put(rows[1 - i]);
|
|
||||||
KeyValue kvAdd =
|
|
||||||
new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
|
|
||||||
now, CellUtil.cloneValue(kv));
|
|
||||||
p.add(kvAdd);
|
|
||||||
mutations.add(d);
|
|
||||||
walEdit.add(kvDelete);
|
|
||||||
mutations.add(p);
|
|
||||||
walEdit.add(kvAdd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getName() {
|
|
||||||
return "swap";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RowSwapProcessorRequest getRequestData() throws IOException {
|
|
||||||
RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
|
|
||||||
builder.setRow1(UnsafeByteOperations.unsafeWrap(row1));
|
|
||||||
builder.setRow2(UnsafeByteOperations.unsafeWrap(row2));
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initialize(RowSwapProcessorRequest msg) {
|
|
||||||
this.row1 = msg.getRow1().toByteArray();
|
|
||||||
this.row2 = msg.getRow2().toByteArray();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TimeoutProcessor extends
|
|
||||||
BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
|
|
||||||
byte[] row = new byte[0];
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empty constructor for Writable
|
|
||||||
*/
|
|
||||||
public TimeoutProcessor() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public TimeoutProcessor(byte[] row) {
|
|
||||||
this.row = row;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Collection<byte[]> getRowsToLock() {
|
|
||||||
return Collections.singleton(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TimeoutProcessorResponse getResult() {
|
|
||||||
return TimeoutProcessorResponse.getDefaultInstance();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(long now, HRegion region,
|
|
||||||
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
|
||||||
try {
|
|
||||||
// Sleep for a long time so it timeout
|
|
||||||
Thread.sleep(100 * 1000L);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean readOnly() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getName() {
|
|
||||||
return "timeout";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TimeoutProcessorRequest getRequestData() throws IOException {
|
|
||||||
TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
|
|
||||||
builder.setRow(UnsafeByteOperations.unsafeWrap(row));
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void initialize(TimeoutProcessorRequest msg) throws IOException {
|
|
||||||
this.row = msg.getRow().toByteArray();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
|
|
||||||
InternalScanner scanner = null;
|
|
||||||
try {
|
|
||||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
|
||||||
scanner = region.getScanner(scan);
|
|
||||||
result.clear();
|
|
||||||
scanner.next(result);
|
|
||||||
} finally {
|
|
||||||
if (scanner != null) {
|
|
||||||
scanner.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static String stringifyKvs(Collection<Cell> kvs) {
|
|
||||||
StringBuilder out = new StringBuilder();
|
|
||||||
out.append("[");
|
|
||||||
if (kvs != null) {
|
|
||||||
for (Cell kv : kvs) {
|
|
||||||
byte[] col = CellUtil.cloneQualifier(kv);
|
|
||||||
byte[] val = CellUtil.cloneValue(kv);
|
|
||||||
if (Bytes.equals(col, COUNTER)) {
|
|
||||||
out.append(Bytes.toStringBinary(col) + ":" +
|
|
||||||
Bytes.toInt(val) + " ");
|
|
||||||
} else {
|
|
||||||
out.append(Bytes.toStringBinary(col) + ":" +
|
|
||||||
Bytes.toStringBinary(val) + " ");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
out.append("]");
|
|
||||||
return out.toString();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,46 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
/**
|
|
||||||
* Defines a protocol to perform multi row transactions.
|
|
||||||
* See BaseRowProcessorEndpoint for the implementation.
|
|
||||||
* See HRegion#processRowsWithLocks() for details.
|
|
||||||
*/
|
|
||||||
syntax = "proto2";
|
|
||||||
package hbase.pb;
|
|
||||||
|
|
||||||
option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
|
|
||||||
option java_outer_classname = "RowProcessorProtos";
|
|
||||||
option java_generic_services = true;
|
|
||||||
option java_generate_equals_and_hash = true;
|
|
||||||
option optimize_for = SPEED;
|
|
||||||
|
|
||||||
message ProcessRequest {
|
|
||||||
required string row_processor_class_name = 1;
|
|
||||||
optional string row_processor_initializer_message_name = 2;
|
|
||||||
optional bytes row_processor_initializer_message = 3;
|
|
||||||
optional uint64 nonce_group = 4;
|
|
||||||
optional uint64 nonce = 5;
|
|
||||||
}
|
|
||||||
|
|
||||||
message ProcessResponse {
|
|
||||||
required bytes row_processor_result = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
service RowProcessorService {
|
|
||||||
rpc Process(ProcessRequest) returns (ProcessResponse);
|
|
||||||
}
|
|
|
@ -1,53 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.client.coprocessor;
|
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RowProcessor;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessRequest;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convenience class that is used to make RowProcessorEndpoint invocations.
|
|
||||||
* For example usage, refer TestRowProcessorEndpoint
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class RowProcessorClient {
|
|
||||||
public static <S extends Message, T extends Message>
|
|
||||||
ProcessRequest getRowProcessorPB(RowProcessor<S,T> r)
|
|
||||||
throws IOException {
|
|
||||||
final ProcessRequest.Builder requestBuilder =
|
|
||||||
ProcessRequest.newBuilder();
|
|
||||||
requestBuilder.setRowProcessorClassName(r.getClass().getName());
|
|
||||||
S s = r.getRequestData();
|
|
||||||
if (s != null) {
|
|
||||||
requestBuilder.setRowProcessorInitializerMessageName(s.getClass().getName());
|
|
||||||
requestBuilder.setRowProcessorInitializerMessage(s.toByteString());
|
|
||||||
}
|
|
||||||
return requestBuilder.build();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,149 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.coprocessor;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.lang.reflect.InvocationTargetException;
|
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.util.Collections;
|
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RowProcessor;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Service;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.ProcessResponse;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RowProcessorProtos.RowProcessorService;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class demonstrates how to implement atomic read-modify-writes
|
|
||||||
* using {@link Region#processRowsWithLocks} and Coprocessor endpoints.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message>
|
|
||||||
extends RowProcessorService implements RegionCoprocessor {
|
|
||||||
private RegionCoprocessorEnvironment env;
|
|
||||||
/**
|
|
||||||
* Pass a processor to region to process multiple rows atomically.
|
|
||||||
*
|
|
||||||
* The RowProcessor implementations should be the inner classes of your
|
|
||||||
* RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
|
|
||||||
* the Coprocessor endpoint together.
|
|
||||||
*
|
|
||||||
* See {@code TestRowProcessorEndpoint} for example.
|
|
||||||
*
|
|
||||||
* The request contains information for constructing processor
|
|
||||||
* (see {@link #constructRowProcessorFromRequest}. The processor object defines
|
|
||||||
* the read-modify-write procedure.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void process(RpcController controller, ProcessRequest request,
|
|
||||||
RpcCallback<ProcessResponse> done) {
|
|
||||||
ProcessResponse resultProto = null;
|
|
||||||
try {
|
|
||||||
RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
|
|
||||||
Region region = env.getRegion();
|
|
||||||
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
|
|
||||||
long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
|
|
||||||
region.processRowsWithLocks(processor, nonceGroup, nonce);
|
|
||||||
T result = processor.getResult();
|
|
||||||
ProcessResponse.Builder b = ProcessResponse.newBuilder();
|
|
||||||
b.setRowProcessorResult(result.toByteString());
|
|
||||||
resultProto = b.build();
|
|
||||||
} catch (Exception e) {
|
|
||||||
CoprocessorRpcUtils.setControllerException(controller, new IOException(e));
|
|
||||||
}
|
|
||||||
done.run(resultProto);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Iterable<Service> getServices() {
|
|
||||||
return Collections.singleton(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stores a reference to the coprocessor environment provided by the
|
|
||||||
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
|
|
||||||
* coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
|
|
||||||
* on a table region, so always expects this to be an instance of
|
|
||||||
* {@link RegionCoprocessorEnvironment}.
|
|
||||||
* @param env the environment provided by the coprocessor host
|
|
||||||
* @throws IOException if the provided environment is not an instance of
|
|
||||||
* {@code RegionCoprocessorEnvironment}
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void start(CoprocessorEnvironment env) throws IOException {
|
|
||||||
if (env instanceof RegionCoprocessorEnvironment) {
|
|
||||||
this.env = (RegionCoprocessorEnvironment)env;
|
|
||||||
} else {
|
|
||||||
throw new CoprocessorException("Must be loaded on a table region!");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
|
||||||
// nothing to do
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
RowProcessor<S,T> constructRowProcessorFromRequest(ProcessRequest request)
|
|
||||||
throws IOException {
|
|
||||||
String className = request.getRowProcessorClassName();
|
|
||||||
Class<?> cls;
|
|
||||||
try {
|
|
||||||
cls = Class.forName(className);
|
|
||||||
RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.getDeclaredConstructor().newInstance();
|
|
||||||
if (request.hasRowProcessorInitializerMessageName()) {
|
|
||||||
Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
|
|
||||||
.asSubclass(Message.class);
|
|
||||||
Method m;
|
|
||||||
try {
|
|
||||||
m = imn.getMethod("parseFrom", ByteString.class);
|
|
||||||
} catch (SecurityException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
} catch (NoSuchMethodException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
S s;
|
|
||||||
try {
|
|
||||||
s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
|
|
||||||
} catch (IllegalArgumentException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
} catch (InvocationTargetException e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
ci.initialize(s);
|
|
||||||
}
|
|
||||||
return ci;
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,71 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
|
||||||
import java.util.UUID;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Base class for RowProcessor with some default implementations.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public abstract class BaseRowProcessor<S extends Message,T extends Message>
|
|
||||||
implements RowProcessor<S,T> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void postBatchMutate(HRegion region) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<UUID> getClusterIds() {
|
|
||||||
return Collections.emptyList();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getName() {
|
|
||||||
return this.getClass().getSimpleName().toLowerCase(Locale.ROOT);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Durability useDurability() {
|
|
||||||
return Durability.USE_DEFAULT;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -55,14 +55,10 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.FutureTask;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
@ -387,10 +383,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// Number of mutations for minibatch processing.
|
// Number of mutations for minibatch processing.
|
||||||
private final int miniBatchSize;
|
private final int miniBatchSize;
|
||||||
|
|
||||||
// negative number indicates infinite timeout
|
|
||||||
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
|
|
||||||
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
|
|
||||||
|
|
||||||
final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
|
final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -676,7 +668,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
long memstoreFlushSize;
|
long memstoreFlushSize;
|
||||||
final long timestampSlop;
|
final long timestampSlop;
|
||||||
final long rowProcessorTimeout;
|
|
||||||
|
|
||||||
// Last flush time for each Store. Useful when we are flushing for each column
|
// Last flush time for each Store. Useful when we are flushing for each column
|
||||||
private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
|
private final ConcurrentMap<HStore, Long> lastStoreFlushTimeMap = new ConcurrentHashMap<>();
|
||||||
|
@ -846,13 +837,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
||||||
HConstants.LATEST_TIMESTAMP);
|
HConstants.LATEST_TIMESTAMP);
|
||||||
|
|
||||||
/**
|
|
||||||
* Timeout for the process time in processRowsWithLocks().
|
|
||||||
* Use -1 to switch off time bound.
|
|
||||||
*/
|
|
||||||
this.rowProcessorTimeout = conf.getLong(
|
|
||||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
|
||||||
|
|
||||||
this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
|
this.storeHotnessProtector = new StoreHotnessProtector(this, conf);
|
||||||
|
|
||||||
boolean forceSync = conf.getBoolean(WAL_HSYNC_CONF_KEY, DEFAULT_WAL_HSYNC);
|
boolean forceSync = conf.getBoolean(WAL_HSYNC_CONF_KEY, DEFAULT_WAL_HSYNC);
|
||||||
|
@ -5124,7 +5108,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
|
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
|
||||||
* set; when set we will run operations that make sense in the increment/append scenario
|
* set; when set we will run operations that make sense in the increment/append scenario
|
||||||
* but that do not make sense otherwise.
|
* but that do not make sense otherwise.
|
||||||
* @see #applyToMemStore(HStore, Cell, MemStoreSizing)
|
|
||||||
*/
|
*/
|
||||||
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
|
private void applyToMemStore(HStore store, List<Cell> cells, boolean delta,
|
||||||
MemStoreSizing memstoreAccounting) throws IOException {
|
MemStoreSizing memstoreAccounting) throws IOException {
|
||||||
|
@ -5137,19 +5120,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @see #applyToMemStore(HStore, List, boolean, MemStoreSizing)
|
|
||||||
*/
|
|
||||||
private void applyToMemStore(HStore store, Cell cell, MemStoreSizing memstoreAccounting)
|
|
||||||
throws IOException {
|
|
||||||
// Any change in how we update Store/MemStore needs to also be done in other applyToMemStore!!!!
|
|
||||||
if (store == null) {
|
|
||||||
checkFamily(CellUtil.cloneFamily(cell));
|
|
||||||
// Unreachable because checkFamily will throw exception
|
|
||||||
}
|
|
||||||
store.add(cell, memstoreAccounting);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkFamilies(Collection<byte[]> families, Durability durability)
|
private void checkFamilies(Collection<byte[]> families, Durability durability)
|
||||||
throws NoSuchColumnFamilyException, InvalidMutationDurabilityException {
|
throws NoSuchColumnFamilyException, InvalidMutationDurabilityException {
|
||||||
for (byte[] family : families) {
|
for (byte[] family : families) {
|
||||||
|
@ -7673,227 +7643,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
return stats.build();
|
return stats.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException {
|
|
||||||
processRowsWithLocks(processor, rowProcessorTimeout, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
|
|
||||||
throws IOException {
|
|
||||||
processRowsWithLocks(processor, rowProcessorTimeout, nonceGroup, nonce);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout,
|
|
||||||
long nonceGroup, long nonce) throws IOException {
|
|
||||||
for (byte[] row : processor.getRowsToLock()) {
|
|
||||||
checkRow(row, "processRowsWithLocks");
|
|
||||||
}
|
|
||||||
if (!processor.readOnly()) {
|
|
||||||
checkReadOnly();
|
|
||||||
}
|
|
||||||
checkResources();
|
|
||||||
startRegionOperation();
|
|
||||||
WALEdit walEdit = new WALEdit();
|
|
||||||
|
|
||||||
// STEP 1. Run pre-process hook
|
|
||||||
preProcess(processor, walEdit);
|
|
||||||
// Short circuit the read only case
|
|
||||||
if (processor.readOnly()) {
|
|
||||||
try {
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
doProcessRowWithTimeout(processor, now, this, null, null, timeout);
|
|
||||||
processor.postProcess(this, walEdit, true);
|
|
||||||
} finally {
|
|
||||||
closeRegionOperation();
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean locked = false;
|
|
||||||
List<RowLock> acquiredRowLocks = null;
|
|
||||||
List<Mutation> mutations = new ArrayList<>();
|
|
||||||
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
|
||||||
// This is assigned by mvcc either explicity in the below or in the guts of the WAL append
|
|
||||||
// when it assigns the edit a sequencedid (A.K.A the mvcc write number).
|
|
||||||
WriteEntry writeEntry = null;
|
|
||||||
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
|
|
||||||
|
|
||||||
// Check for thread interrupt status in case we have been signaled from
|
|
||||||
// #interruptRegionOperation.
|
|
||||||
checkInterrupt();
|
|
||||||
|
|
||||||
try {
|
|
||||||
boolean success = false;
|
|
||||||
try {
|
|
||||||
// STEP 2. Acquire the row lock(s)
|
|
||||||
acquiredRowLocks = new ArrayList<>(rowsToLock.size());
|
|
||||||
RowLock prevRowLock = null;
|
|
||||||
for (byte[] row : rowsToLock) {
|
|
||||||
// Attempt to lock all involved rows, throw if any lock times out
|
|
||||||
// use a writer lock for mixed reads and writes
|
|
||||||
RowLock rowLock = getRowLockInternal(row, false, prevRowLock);
|
|
||||||
if (rowLock != prevRowLock) {
|
|
||||||
acquiredRowLocks.add(rowLock);
|
|
||||||
prevRowLock = rowLock;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for thread interrupt status in case we have been signaled from
|
|
||||||
// #interruptRegionOperation. Do it before we take the lock and disable interrupts for
|
|
||||||
// the WAL append.
|
|
||||||
checkInterrupt();
|
|
||||||
|
|
||||||
// STEP 3. Region lock
|
|
||||||
lock(this.updatesLock.readLock(), acquiredRowLocks.isEmpty() ? 1 : acquiredRowLocks.size());
|
|
||||||
locked = true;
|
|
||||||
|
|
||||||
// From this point until memstore update this operation should not be interrupted.
|
|
||||||
disableInterrupts();
|
|
||||||
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
|
|
||||||
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
|
|
||||||
if (!mutations.isEmpty()) {
|
|
||||||
writeRequestsCount.add(mutations.size());
|
|
||||||
// STEP 5. Call the preBatchMutate hook
|
|
||||||
processor.preBatchMutate(this, walEdit);
|
|
||||||
|
|
||||||
// STEP 6. Append and sync if walEdit has data to write out.
|
|
||||||
if (!walEdit.isEmpty()) {
|
|
||||||
writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()),
|
|
||||||
processor.getClusterIds(), now, nonceGroup, nonce);
|
|
||||||
} else {
|
|
||||||
// We are here if WAL is being skipped.
|
|
||||||
writeEntry = this.mvcc.begin();
|
|
||||||
}
|
|
||||||
|
|
||||||
// STEP 7. Apply to memstore
|
|
||||||
long sequenceId = writeEntry.getWriteNumber();
|
|
||||||
for (Mutation m : mutations) {
|
|
||||||
// Handle any tag based cell features.
|
|
||||||
// TODO: Do we need to call rewriteCellTags down in applyToMemStore()? Why not before
|
|
||||||
// so tags go into WAL?
|
|
||||||
rewriteCellTags(m.getFamilyCellMap(), m);
|
|
||||||
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
|
|
||||||
Cell cell = cellScanner.current();
|
|
||||||
if (walEdit.isEmpty()) {
|
|
||||||
// If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id.
|
|
||||||
// If no WAL, need to stamp it here.
|
|
||||||
PrivateCellUtil.setSequenceId(cell, sequenceId);
|
|
||||||
}
|
|
||||||
applyToMemStore(getStore(cell), cell, memstoreAccounting);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// STEP 8. call postBatchMutate hook
|
|
||||||
processor.postBatchMutate(this);
|
|
||||||
|
|
||||||
// STEP 9. Complete mvcc.
|
|
||||||
mvcc.completeAndWait(writeEntry);
|
|
||||||
writeEntry = null;
|
|
||||||
|
|
||||||
// STEP 10. Release region lock
|
|
||||||
if (locked) {
|
|
||||||
this.updatesLock.readLock().unlock();
|
|
||||||
locked = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// STEP 11. Release row lock(s)
|
|
||||||
releaseRowLocks(acquiredRowLocks);
|
|
||||||
|
|
||||||
if (rsServices != null && rsServices.getMetrics() != null) {
|
|
||||||
rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor.
|
|
||||||
getTableName(), mutations.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
success = true;
|
|
||||||
} finally {
|
|
||||||
// Call complete rather than completeAndWait because we probably had error if walKey != null
|
|
||||||
if (writeEntry != null) mvcc.complete(writeEntry);
|
|
||||||
if (locked) {
|
|
||||||
this.updatesLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
// release locks if some were acquired but another timed out
|
|
||||||
releaseRowLocks(acquiredRowLocks);
|
|
||||||
|
|
||||||
enableInterrupts();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 12. Run post-process hook
|
|
||||||
processor.postProcess(this, walEdit, success);
|
|
||||||
} finally {
|
|
||||||
closeRegionOperation();
|
|
||||||
if (!mutations.isEmpty()) {
|
|
||||||
this.incMemStoreSize(memstoreAccounting.getMemStoreSize());
|
|
||||||
requestFlushIfNeeded();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void preProcess(final RowProcessor<?,?> processor, final WALEdit walEdit)
|
|
||||||
throws IOException {
|
|
||||||
try {
|
|
||||||
processor.preProcess(this, walEdit);
|
|
||||||
} catch (IOException e) {
|
|
||||||
closeRegionOperation();
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
|
|
||||||
final long now,
|
|
||||||
final HRegion region,
|
|
||||||
final List<Mutation> mutations,
|
|
||||||
final WALEdit walEdit,
|
|
||||||
final long timeout) throws IOException {
|
|
||||||
// Short circuit the no time bound case.
|
|
||||||
if (timeout < 0) {
|
|
||||||
try {
|
|
||||||
processor.process(now, region, mutations, walEdit);
|
|
||||||
} catch (IOException e) {
|
|
||||||
String row = processor.getRowsToLock().isEmpty() ? "" :
|
|
||||||
" on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
|
|
||||||
LOG.warn("RowProcessor: {}, in region {}, throws Exception {}",
|
|
||||||
processor.getClass().getName(), getRegionInfo().getRegionNameAsString(), row, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Case with time bound
|
|
||||||
FutureTask<Void> task = new FutureTask<>(new Callable<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void call() throws IOException {
|
|
||||||
try {
|
|
||||||
processor.process(now, region, mutations, walEdit);
|
|
||||||
return null;
|
|
||||||
} catch (IOException e) {
|
|
||||||
String row = processor.getRowsToLock().isEmpty() ? "" :
|
|
||||||
" on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
|
|
||||||
LOG.warn("RowProcessor: {}, in region {}, throws Exception {}",
|
|
||||||
processor.getClass().getName(), getRegionInfo().getRegionNameAsString(), row, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
rowProcessorExecutor.execute(task);
|
|
||||||
try {
|
|
||||||
task.get(timeout, TimeUnit.MILLISECONDS);
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
throw throwOnInterrupt(ie);
|
|
||||||
} catch (TimeoutException te) {
|
|
||||||
String row = processor.getRowsToLock().isEmpty() ? "" :
|
|
||||||
" on row(s):" + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + "...";
|
|
||||||
LOG.error("RowProcessor timeout: {} ms, in region {}, {}", timeout,
|
|
||||||
getRegionInfo().getRegionNameAsString(), row);
|
|
||||||
throw new IOException(te);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result append(Append append) throws IOException {
|
public Result append(Append append) throws IOException {
|
||||||
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
|
@ -7928,12 +7677,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
|
|
||||||
long now, long nonceGroup, long nonce) throws IOException {
|
|
||||||
return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
|
|
||||||
SequenceId.NO_SEQUENCE_ID);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return writeEntry associated with this append
|
* @return writeEntry associated with this append
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -257,7 +257,6 @@ public interface Region extends ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* Get a row lock for the specified row. All locks are reentrant.
|
* Get a row lock for the specified row. All locks are reentrant.
|
||||||
*
|
*
|
||||||
* Before calling this function make sure that a region operation has already been
|
* Before calling this function make sure that a region operation has already been
|
||||||
|
@ -275,8 +274,6 @@ public interface Region extends ConfigurationObserver {
|
||||||
* @see #startRegionOperation()
|
* @see #startRegionOperation()
|
||||||
* @see #startRegionOperation(Operation)
|
* @see #startRegionOperation(Operation)
|
||||||
*/
|
*/
|
||||||
// TODO this needs to be exposed as we have RowProcessor now. If RowProcessor is removed, we can
|
|
||||||
// remove this too..
|
|
||||||
RowLock getRowLock(byte[] row, boolean readLock) throws IOException;
|
RowLock getRowLock(byte[] row, boolean readLock) throws IOException;
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -555,52 +552,10 @@ public interface Region extends ConfigurationObserver {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
// TODO Should not be exposing with params nonceGroup, nonce. Change when doing the jira for
|
// TODO Should not be exposing with params nonceGroup, nonce. Change when doing the jira for
|
||||||
// Changing processRowsWithLocks and RowProcessor
|
// Changing processRowsWithLocks
|
||||||
void mutateRowsWithLocks(Collection<Mutation> mutations, Collection<byte[]> rowsToLock,
|
void mutateRowsWithLocks(Collection<Mutation> mutations, Collection<byte[]> rowsToLock,
|
||||||
long nonceGroup, long nonce) throws IOException;
|
long nonceGroup, long nonce) throws IOException;
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs atomic multiple reads and writes on a given row.
|
|
||||||
*
|
|
||||||
* @param processor The object defines the reads and writes to a row.
|
|
||||||
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
|
|
||||||
* Coprocessors instead.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
void processRowsWithLocks(RowProcessor<?,?> processor) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs atomic multiple reads and writes on a given row.
|
|
||||||
*
|
|
||||||
* @param processor The object defines the reads and writes to a row.
|
|
||||||
* @param nonceGroup Optional nonce group of the operation (client Id)
|
|
||||||
* @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
|
|
||||||
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
|
|
||||||
* Coprocessors instead.
|
|
||||||
*/
|
|
||||||
// TODO Should not be exposing with params nonceGroup, nonce. Change when doing the jira for
|
|
||||||
// Changing processRowsWithLocks and RowProcessor
|
|
||||||
@Deprecated
|
|
||||||
void processRowsWithLocks(RowProcessor<?,?> processor, long nonceGroup, long nonce)
|
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs atomic multiple reads and writes on a given row.
|
|
||||||
*
|
|
||||||
* @param processor The object defines the reads and writes to a row.
|
|
||||||
* @param timeout The timeout of the processor.process() execution
|
|
||||||
* Use a negative number to switch off the time bound
|
|
||||||
* @param nonceGroup Optional nonce group of the operation (client Id)
|
|
||||||
* @param nonce Optional nonce of the operation (unique random id to ensure "more idempotence")
|
|
||||||
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
|
|
||||||
* Coprocessors instead.
|
|
||||||
*/
|
|
||||||
// TODO Should not be exposing with params nonceGroup, nonce. Change when doing the jira for
|
|
||||||
// Changing processRowsWithLocks and RowProcessor
|
|
||||||
@Deprecated
|
|
||||||
void processRowsWithLocks(RowProcessor<?,?> processor, long timeout, long nonceGroup, long nonce)
|
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Puts some data in the table.
|
* Puts some data in the table.
|
||||||
* @param put
|
* @param put
|
||||||
|
|
|
@ -1,159 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.UUID;
|
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Defines the procedures to atomically perform multiple scans and mutations
|
|
||||||
* on a HRegion.
|
|
||||||
*
|
|
||||||
* This is invoked by {@link Region#processRowsWithLocks(RowProcessor)}.
|
|
||||||
* This class performs scans and generates mutations and WAL edits.
|
|
||||||
* The locks and MVCC will be handled by HRegion.
|
|
||||||
*
|
|
||||||
* The RowProcessor user code could have data that needs to be
|
|
||||||
* sent across for proper initialization at the server side. The generic type
|
|
||||||
* parameter S is the type of the request data sent to the server.
|
|
||||||
* The generic type parameter T is the return type of RowProcessor.getResult().
|
|
||||||
*
|
|
||||||
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. For customization, use
|
|
||||||
* Coprocessors instead.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public interface RowProcessor<S extends Message, T extends Message> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Rows to lock while operation.
|
|
||||||
* They have to be sorted with <code>RowProcessor</code>
|
|
||||||
* to avoid deadlock.
|
|
||||||
*/
|
|
||||||
Collection<byte[]> getRowsToLock();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Obtain the processing result. All row processor implementations must
|
|
||||||
* implement this, even if the method is simply returning an empty
|
|
||||||
* Message.
|
|
||||||
*/
|
|
||||||
T getResult();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Is this operation read only? If this is true, process() should not add
|
|
||||||
* any mutations or it throws IOException.
|
|
||||||
* @return ture if read only operation
|
|
||||||
*/
|
|
||||||
boolean readOnly();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* HRegion handles the locks and MVCC and invokes this method properly.
|
|
||||||
*
|
|
||||||
* You should override this to create your own RowProcessor.
|
|
||||||
*
|
|
||||||
* If you are doing read-modify-write here, you should consider using
|
|
||||||
* <code>IsolationLevel.READ_UNCOMMITTED</code> for scan because
|
|
||||||
* we advance MVCC after releasing the locks for optimization purpose.
|
|
||||||
*
|
|
||||||
* @param now the current system millisecond
|
|
||||||
* @param region the HRegion
|
|
||||||
* @param mutations the output mutations to apply to memstore
|
|
||||||
* @param walEdit the output WAL edits to apply to write ahead log
|
|
||||||
*/
|
|
||||||
void process(long now,
|
|
||||||
HRegion region,
|
|
||||||
List<Mutation> mutations,
|
|
||||||
WALEdit walEdit) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The hook to be executed before process().
|
|
||||||
*
|
|
||||||
* @param region the HRegion
|
|
||||||
* @param walEdit the output WAL edits to apply to write ahead log
|
|
||||||
*/
|
|
||||||
void preProcess(HRegion region, WALEdit walEdit) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The hook to be executed after the process() but before applying the Mutations to region. Also
|
|
||||||
* by the time this hook is called, mvcc transaction have started.
|
|
||||||
* @param walEdit the output WAL edits to apply to write ahead log
|
|
||||||
*/
|
|
||||||
void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The hook to be executed after the process() and applying the Mutations to region. The
|
|
||||||
* difference of this one with {@link #postProcess(HRegion, WALEdit, boolean)} is this hook will
|
|
||||||
* be executed before the mvcc transaction completion.
|
|
||||||
*/
|
|
||||||
void postBatchMutate(HRegion region) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The hook to be executed after process() and applying the Mutations to region.
|
|
||||||
*
|
|
||||||
* @param region the HRegion
|
|
||||||
* @param walEdit the output WAL edits to apply to write ahead log
|
|
||||||
* @param success true if batch operation is successful otherwise false.
|
|
||||||
*/
|
|
||||||
void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return The cluster ids that have the change.
|
|
||||||
*/
|
|
||||||
List<UUID> getClusterIds();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Human readable name of the processor
|
|
||||||
* @return The name of the processor
|
|
||||||
*/
|
|
||||||
String getName();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method should return any additional data that is needed on the
|
|
||||||
* server side to construct the RowProcessor. The server will pass this to
|
|
||||||
* the {@link #initialize(Message msg)} method. If there is no RowProcessor
|
|
||||||
* specific data then null should be returned.
|
|
||||||
* @return the PB message
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
S getRequestData() throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This method should initialize any field(s) of the RowProcessor with
|
|
||||||
* a parsing of the passed message bytes (used on the server side).
|
|
||||||
* @param msg
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
void initialize(S msg) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return The {@link Durability} to use
|
|
||||||
*/
|
|
||||||
Durability useDurability();
|
|
||||||
}
|
|
Loading…
Reference in New Issue