HBASE-5542 Unify HRegion.mutateRowsWithLocks() and HRegion.processRow() (Scott Chen)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303490 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
41f31490ff
commit
ce36877d30
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RowProcessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class demonstrates how to implement atomic read-modify-writes
|
||||||
|
* using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor
|
||||||
|
implements RowProcessorProtocol {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pass a processor to HRegion to process multiple rows atomically.
|
||||||
|
*
|
||||||
|
* The RowProcessor implementations should be the inner classes of your
|
||||||
|
* RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
|
||||||
|
* the Coprocessor endpoint together.
|
||||||
|
*
|
||||||
|
* See {@link TestRowProcessorEndpoint} for example.
|
||||||
|
*
|
||||||
|
* @param processor The object defines the read-modify-write procedure
|
||||||
|
* @return The processing result
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public <T> T process(RowProcessor<T> processor)
|
||||||
|
throws IOException {
|
||||||
|
HRegion region =
|
||||||
|
((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
|
||||||
|
region.processRowsWithLocks(processor);
|
||||||
|
return processor.getResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RowProcessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Defines a protocol to perform multi row transactions.
|
||||||
|
* See {@link BaseRowProcessorEndpoint} for the implementation.
|
||||||
|
* See {@link HRegion#processRowsWithLocks()} for detials.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface RowProcessorProtocol extends CoprocessorProtocol {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param processor The processor defines how to process the row
|
||||||
|
*/
|
||||||
|
<T> T process(RowProcessor<T> processor) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for RowProcessor with some default implementations.
|
||||||
|
*/
|
||||||
|
public abstract class BaseRowProcessor<T> implements RowProcessor<T> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T getResult() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UUID getClusterId() {
|
||||||
|
return HConstants.DEFAULT_CLUSTER_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.FutureTask;
|
import java.util.concurrent.FutureTask;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
@ -95,7 +97,6 @@ import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RowProcessor;
|
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||||
|
@ -232,7 +233,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
final Configuration conf;
|
final Configuration conf;
|
||||||
final int rowLockWaitDuration;
|
final int rowLockWaitDuration;
|
||||||
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
||||||
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 10 * 1000L;
|
|
||||||
|
// negative number indicates infinite timeout
|
||||||
|
static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
|
||||||
|
final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
|
||||||
|
|
||||||
final HRegionInfo regionInfo;
|
final HRegionInfo regionInfo;
|
||||||
final Path regiondir;
|
final Path regiondir;
|
||||||
KeyValue.KVComparator comparator;
|
KeyValue.KVComparator comparator;
|
||||||
|
@ -486,6 +491,10 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
"hbase.hregion.keyvalue.timestamp.slop.millisecs",
|
||||||
HConstants.LATEST_TIMESTAMP);
|
HConstants.LATEST_TIMESTAMP);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for the process time in processRowsWithLocks().
|
||||||
|
* Use -1 to switch off time bound.
|
||||||
|
*/
|
||||||
this.rowProcessorTimeout = conf.getLong(
|
this.rowProcessorTimeout = conf.getLong(
|
||||||
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
"hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
|
||||||
|
|
||||||
|
@ -1676,7 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
/*
|
/*
|
||||||
* @param delete The passed delete is modified by this method. WARNING!
|
* @param delete The passed delete is modified by this method. WARNING!
|
||||||
*/
|
*/
|
||||||
private void prepareDelete(Delete delete) throws IOException {
|
void prepareDelete(Delete delete) throws IOException {
|
||||||
// Check to see if this is a deleteRow insert
|
// Check to see if this is a deleteRow insert
|
||||||
if(delete.getFamilyMap().isEmpty()){
|
if(delete.getFamilyMap().isEmpty()){
|
||||||
for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
|
for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
|
||||||
|
@ -1748,7 +1757,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @param now
|
* @param now
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
|
void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
|
Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
|
||||||
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
|
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
|
||||||
|
@ -2367,7 +2376,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
|
* Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
|
||||||
* with the provided current timestamp.
|
* with the provided current timestamp.
|
||||||
*/
|
*/
|
||||||
private void updateKVTimestamps(
|
void updateKVTimestamps(
|
||||||
final Iterable<List<KeyValue>> keyLists, final byte[] now) {
|
final Iterable<List<KeyValue>> keyLists, final byte[] now) {
|
||||||
for (List<KeyValue> keys: keyLists) {
|
for (List<KeyValue> keys: keyLists) {
|
||||||
if (keys == null) continue;
|
if (keys == null) continue;
|
||||||
|
@ -2591,7 +2600,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* Check the collection of families for validity.
|
* Check the collection of families for validity.
|
||||||
* @throws NoSuchColumnFamilyException if a family does not exist.
|
* @throws NoSuchColumnFamilyException if a family does not exist.
|
||||||
*/
|
*/
|
||||||
private void checkFamilies(Collection<byte[]> families)
|
void checkFamilies(Collection<byte[]> families)
|
||||||
throws NoSuchColumnFamilyException {
|
throws NoSuchColumnFamilyException {
|
||||||
for (byte[] family : families) {
|
for (byte[] family : families) {
|
||||||
checkFamily(family);
|
checkFamily(family);
|
||||||
|
@ -2601,7 +2610,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
checkTimestamps(p.getFamilyMap(), now);
|
checkTimestamps(p.getFamilyMap(), now);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
|
void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
|
||||||
long now) throws DoNotRetryIOException {
|
long now) throws DoNotRetryIOException {
|
||||||
if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
|
if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
|
||||||
return;
|
return;
|
||||||
|
@ -4232,42 +4241,71 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
*/
|
*/
|
||||||
public void mutateRowsWithLocks(Collection<Mutation> mutations,
|
public void mutateRowsWithLocks(Collection<Mutation> mutations,
|
||||||
Collection<byte[]> rowsToLock) throws IOException {
|
Collection<byte[]> rowsToLock) throws IOException {
|
||||||
boolean flush = false;
|
|
||||||
|
MultiRowMutationProcessor proc =
|
||||||
|
new MultiRowMutationProcessor(mutations, rowsToLock);
|
||||||
|
processRowsWithLocks(proc, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs atomic multiple reads and writes on a given row.
|
||||||
|
*
|
||||||
|
* @param processor The object defines the reads and writes to a row.
|
||||||
|
*/
|
||||||
|
public void processRowsWithLocks(RowProcessor<?> processor)
|
||||||
|
throws IOException {
|
||||||
|
processRowsWithLocks(processor, rowProcessorTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs atomic multiple reads and writes on a given row.
|
||||||
|
*
|
||||||
|
* @param processor The object defines the reads and writes to a row.
|
||||||
|
* @param timeout The timeout of the processor.process() execution
|
||||||
|
* Use a negative number to switch off the time bound
|
||||||
|
*/
|
||||||
|
public void processRowsWithLocks(RowProcessor<?> processor, long timeout)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
for (byte[] row : processor.getRowsToLock()) {
|
||||||
|
checkRow(row, "processRowsWithLocks");
|
||||||
|
}
|
||||||
|
if (!processor.readOnly()) {
|
||||||
|
checkReadOnly();
|
||||||
|
}
|
||||||
|
checkResources();
|
||||||
|
|
||||||
startRegionOperation();
|
startRegionOperation();
|
||||||
List<Integer> acquiredLocks = null;
|
|
||||||
try {
|
|
||||||
// 1. run all pre-hooks before the atomic operation
|
|
||||||
// if any pre hook indicates "bypass", bypass the entire operation
|
|
||||||
|
|
||||||
// one WALEdit is used for all edits.
|
|
||||||
WALEdit walEdit = new WALEdit();
|
WALEdit walEdit = new WALEdit();
|
||||||
if (coprocessorHost != null) {
|
|
||||||
for (Mutation m : mutations) {
|
// 1. Run pre-process hook
|
||||||
if (m instanceof Put) {
|
processor.preProcess(this, walEdit);
|
||||||
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
|
|
||||||
// by pass everything
|
// Short circuit the read only case
|
||||||
|
if (processor.readOnly()) {
|
||||||
|
try {
|
||||||
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
doProcessRowWithTimeout(
|
||||||
|
processor, now, this, null, null, timeout);
|
||||||
|
processor.postProcess(this, walEdit);
|
||||||
|
} finally {
|
||||||
|
closeRegionOperation();
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else if (m instanceof Delete) {
|
|
||||||
Delete d = (Delete) m;
|
|
||||||
prepareDelete(d);
|
|
||||||
if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
|
|
||||||
// by pass everything
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
long txid = 0;
|
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
||||||
boolean walSyncSuccessful = false;
|
|
||||||
boolean locked = false;
|
boolean locked = false;
|
||||||
|
boolean walSyncSuccessful = false;
|
||||||
// 2. acquire the row lock(s)
|
List<Integer> acquiredLocks = null;
|
||||||
|
long addedSize = 0;
|
||||||
|
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
||||||
|
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
||||||
|
try {
|
||||||
|
// 2. Acquire the row lock(s)
|
||||||
acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
|
acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
|
||||||
for (byte[] row : rowsToLock) {
|
for (byte[] row : rowsToLock) {
|
||||||
// attempt to lock all involved rows, fail if one lock times out
|
// Attempt to lock all involved rows, fail if one lock times out
|
||||||
Integer lid = getLock(null, row, true);
|
Integer lid = getLock(null, row, true);
|
||||||
if (lid == null) {
|
if (lid == null) {
|
||||||
throw new IOException("Failed to acquire lock on "
|
throw new IOException("Failed to acquire lock on "
|
||||||
|
@ -4275,213 +4313,65 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
acquiredLocks.add(lid);
|
acquiredLocks.add(lid);
|
||||||
}
|
}
|
||||||
|
// 3. Region lock
|
||||||
// 3. acquire the region lock
|
|
||||||
this.updatesLock.readLock().lock();
|
this.updatesLock.readLock().lock();
|
||||||
locked = true;
|
locked = true;
|
||||||
|
|
||||||
// 4. Get a mvcc write number
|
|
||||||
MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
|
|
||||||
|
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
byte[] byteNow = Bytes.toBytes(now);
|
|
||||||
try {
|
try {
|
||||||
// 5. Check mutations and apply edits to a single WALEdit
|
// 4. Let the processor scan the rows, generate mutations and add
|
||||||
for (Mutation m : mutations) {
|
// waledits
|
||||||
if (m instanceof Put) {
|
doProcessRowWithTimeout(
|
||||||
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
|
processor, now, this, mutations, walEdit, timeout);
|
||||||
checkFamilies(familyMap.keySet());
|
|
||||||
checkTimestamps(familyMap, now);
|
if (!mutations.isEmpty()) {
|
||||||
updateKVTimestamps(familyMap.values(), byteNow);
|
// 5. Get a mvcc write number
|
||||||
} else if (m instanceof Delete) {
|
writeEntry = mvcc.beginMemstoreInsert();
|
||||||
Delete d = (Delete) m;
|
// 6. Apply to memstore
|
||||||
prepareDelete(d);
|
for (KeyValue kv : mutations) {
|
||||||
prepareDeleteTimestamps(d, byteNow);
|
kv.setMemstoreTS(writeEntry.getWriteNumber());
|
||||||
} else {
|
byte[] family = kv.getFamily();
|
||||||
throw new DoNotRetryIOException(
|
checkFamily(family);
|
||||||
"Action must be Put or Delete. But was: "
|
addedSize += stores.get(family).add(kv);
|
||||||
+ m.getClass().getName());
|
|
||||||
}
|
|
||||||
if (m.getWriteToWAL()) {
|
|
||||||
addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. append all edits at once (don't sync)
|
long txid = 0;
|
||||||
if (walEdit.size() > 0) {
|
// 7. Append no sync
|
||||||
txid = this.log.appendNoSync(regionInfo,
|
if (!walEdit.isEmpty()) {
|
||||||
|
txid = this.log.appendNoSync(this.regionInfo,
|
||||||
this.htableDescriptor.getName(), walEdit,
|
this.htableDescriptor.getName(), walEdit,
|
||||||
HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
|
processor.getClusterId(), now, this.htableDescriptor);
|
||||||
}
|
}
|
||||||
|
// 8. Release region lock
|
||||||
// 7. apply to memstore
|
if (locked) {
|
||||||
long addedSize = 0;
|
|
||||||
for (Mutation m : mutations) {
|
|
||||||
addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
|
|
||||||
}
|
|
||||||
flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
|
|
||||||
|
|
||||||
// 8. release region and row lock(s)
|
|
||||||
this.updatesLock.readLock().unlock();
|
this.updatesLock.readLock().unlock();
|
||||||
locked = false;
|
locked = false;
|
||||||
|
}
|
||||||
|
// 9. Release row lock(s)
|
||||||
if (acquiredLocks != null) {
|
if (acquiredLocks != null) {
|
||||||
for (Integer lid : acquiredLocks) {
|
for (Integer lid : acquiredLocks) {
|
||||||
releaseRowLock(lid);
|
releaseRowLock(lid);
|
||||||
}
|
}
|
||||||
acquiredLocks = null;
|
acquiredLocks = null;
|
||||||
}
|
}
|
||||||
|
// 10. Sync edit log
|
||||||
// 9. sync WAL if required
|
if (txid != 0 &&
|
||||||
if (walEdit.size() > 0 &&
|
|
||||||
(this.regionInfo.isMetaRegion() ||
|
(this.regionInfo.isMetaRegion() ||
|
||||||
!this.htableDescriptor.isDeferredLogFlush())) {
|
!this.htableDescriptor.isDeferredLogFlush())) {
|
||||||
this.log.sync(txid);
|
this.log.sync(txid);
|
||||||
}
|
}
|
||||||
walSyncSuccessful = true;
|
walSyncSuccessful = true;
|
||||||
|
|
||||||
// 10. advance mvcc
|
|
||||||
mvcc.completeMemstoreInsert(w);
|
|
||||||
w = null;
|
|
||||||
|
|
||||||
// 11. run coprocessor post host hooks
|
|
||||||
// after the WAL is sync'ed and all locks are released
|
|
||||||
// (similar to doMiniBatchPut)
|
|
||||||
if (coprocessorHost != null) {
|
|
||||||
for (Mutation m : mutations) {
|
|
||||||
if (m instanceof Put) {
|
|
||||||
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
|
|
||||||
} else if (m instanceof Delete) {
|
|
||||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
// 12. clean up if needed
|
|
||||||
if (!walSyncSuccessful) {
|
|
||||||
int kvsRolledback = 0;
|
|
||||||
for (Mutation m : mutations) {
|
|
||||||
for (Map.Entry<byte[], List<KeyValue>> e : m.getFamilyMap()
|
|
||||||
.entrySet()) {
|
|
||||||
List<KeyValue> kvs = e.getValue();
|
|
||||||
byte[] family = e.getKey();
|
|
||||||
Store store = getStore(family);
|
|
||||||
// roll back each kv
|
|
||||||
for (KeyValue kv : kvs) {
|
|
||||||
store.rollback(kv);
|
|
||||||
kvsRolledback++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback
|
|
||||||
+ " KeyValues");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (w != null) {
|
|
||||||
mvcc.completeMemstoreInsert(w);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (locked) {
|
|
||||||
this.updatesLock.readLock().unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (acquiredLocks != null) {
|
|
||||||
for (Integer lid : acquiredLocks) {
|
|
||||||
releaseRowLock(lid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (flush) {
|
|
||||||
// 13. Flush cache if needed. Do it outside update lock.
|
|
||||||
requestFlush();
|
|
||||||
}
|
|
||||||
closeRegionOperation();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Performs atomic multiple reads and writes on a given row.
|
|
||||||
* @param processor The object defines the reads and writes to a row.
|
|
||||||
*/
|
|
||||||
public void processRow(RowProcessor<?> processor)
|
|
||||||
throws IOException {
|
|
||||||
byte[] row = processor.getRow();
|
|
||||||
checkRow(row, "processRow");
|
|
||||||
if (!processor.readOnly()) {
|
|
||||||
checkReadOnly();
|
|
||||||
}
|
|
||||||
checkResources();
|
|
||||||
|
|
||||||
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
|
||||||
|
|
||||||
startRegionOperation();
|
|
||||||
|
|
||||||
boolean locked = false;
|
|
||||||
boolean walSyncSuccessful = false;
|
|
||||||
Integer rowLockID = null;
|
|
||||||
long addedSize = 0;
|
|
||||||
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
|
||||||
try {
|
|
||||||
// 1. Row lock
|
|
||||||
rowLockID = getLock(null, row, true);
|
|
||||||
|
|
||||||
// 2. Region lock
|
|
||||||
this.updatesLock.readLock().lock();
|
|
||||||
locked = true;
|
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
|
||||||
try {
|
|
||||||
// 3. Let the processor scan the row and generate mutations
|
|
||||||
WALEdit walEdits = new WALEdit();
|
|
||||||
doProcessRowWithTimeout(processor, now, rowScanner, mutations,
|
|
||||||
walEdits, rowProcessorTimeout);
|
|
||||||
if (processor.readOnly() && !mutations.isEmpty()) {
|
|
||||||
throw new IOException(
|
|
||||||
"Processor is readOnly but generating mutations on row:" +
|
|
||||||
Bytes.toStringBinary(row));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!mutations.isEmpty()) {
|
|
||||||
// 4. Get a mvcc write number
|
|
||||||
writeEntry = mvcc.beginMemstoreInsert();
|
|
||||||
// 5. Apply to memstore and a WALEdit
|
|
||||||
for (KeyValue kv : mutations) {
|
|
||||||
kv.setMemstoreTS(writeEntry.getWriteNumber());
|
|
||||||
walEdits.add(kv);
|
|
||||||
addedSize += stores.get(kv.getFamily()).add(kv);
|
|
||||||
}
|
|
||||||
|
|
||||||
long txid = 0;
|
|
||||||
// 6. Append no sync
|
|
||||||
if (!walEdits.isEmpty()) {
|
|
||||||
txid = this.log.appendNoSync(this.regionInfo,
|
|
||||||
this.htableDescriptor.getName(), walEdits,
|
|
||||||
processor.getClusterId(), now, this.htableDescriptor);
|
|
||||||
}
|
|
||||||
// 7. Release region lock
|
|
||||||
if (locked) {
|
|
||||||
this.updatesLock.readLock().unlock();
|
|
||||||
locked = false;
|
|
||||||
}
|
|
||||||
// 8. Release row lock
|
|
||||||
if (rowLockID != null) {
|
|
||||||
releaseRowLock(rowLockID);
|
|
||||||
rowLockID = null;
|
|
||||||
}
|
|
||||||
// 9. Sync edit log
|
|
||||||
if (txid != 0) {
|
|
||||||
this.log.sync(txid);
|
|
||||||
}
|
|
||||||
walSyncSuccessful = true;
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (!mutations.isEmpty() && !walSyncSuccessful) {
|
if (!mutations.isEmpty() && !walSyncSuccessful) {
|
||||||
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
|
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
|
||||||
" memstore keyvalues for row:" + processor.getRow());
|
" memstore keyvalues for row(s):" +
|
||||||
|
processor.getRowsToLock().iterator().next() + "...");
|
||||||
for (KeyValue kv : mutations) {
|
for (KeyValue kv : mutations) {
|
||||||
stores.get(kv.getFamily()).rollback(kv);
|
stores.get(kv.getFamily()).rollback(kv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 10. Roll mvcc forward
|
// 11. Roll mvcc forward
|
||||||
if (writeEntry != null) {
|
if (writeEntry != null) {
|
||||||
mvcc.completeMemstoreInsert(writeEntry);
|
mvcc.completeMemstoreInsert(writeEntry);
|
||||||
writeEntry = null;
|
writeEntry = null;
|
||||||
|
@ -4490,11 +4380,16 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
this.updatesLock.readLock().unlock();
|
this.updatesLock.readLock().unlock();
|
||||||
locked = false;
|
locked = false;
|
||||||
}
|
}
|
||||||
if (rowLockID != null) {
|
if (acquiredLocks != null) {
|
||||||
releaseRowLock(rowLockID);
|
for (Integer lid : acquiredLocks) {
|
||||||
rowLockID = null;
|
releaseRowLock(lid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 12. Run post-process hook
|
||||||
|
processor.postProcess(this, walEdit);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
closeRegionOperation();
|
closeRegionOperation();
|
||||||
if (!mutations.isEmpty() &&
|
if (!mutations.isEmpty() &&
|
||||||
|
@ -4506,48 +4401,54 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
private void doProcessRowWithTimeout(final RowProcessor<?> processor,
|
private void doProcessRowWithTimeout(final RowProcessor<?> processor,
|
||||||
final long now,
|
final long now,
|
||||||
final RowProcessor.RowScanner scanner,
|
final HRegion region,
|
||||||
final List<KeyValue> mutations,
|
final List<KeyValue> mutations,
|
||||||
final WALEdit walEdits,
|
final WALEdit walEdit,
|
||||||
final long timeout) throws IOException {
|
final long timeout) throws IOException {
|
||||||
|
// Short circuit the no time bound case.
|
||||||
|
if (timeout < 0) {
|
||||||
|
try {
|
||||||
|
processor.process(now, region, mutations, walEdit);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("RowProcessor:" + processor.getClass().getName() +
|
||||||
|
" throws Exception on row(s):" +
|
||||||
|
Bytes.toStringBinary(
|
||||||
|
processor.getRowsToLock().iterator().next()) + "...", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Case with time bound
|
||||||
FutureTask<Void> task =
|
FutureTask<Void> task =
|
||||||
new FutureTask<Void>(new Callable<Void>() {
|
new FutureTask<Void>(new Callable<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws IOException {
|
public Void call() throws IOException {
|
||||||
processor.process(now, scanner, mutations, walEdits);
|
try {
|
||||||
|
processor.process(now, region, mutations, walEdit);
|
||||||
return null;
|
return null;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("RowProcessor:" + processor.getClass().getName() +
|
||||||
|
" throws Exception on row(s):" +
|
||||||
|
Bytes.toStringBinary(
|
||||||
|
processor.getRowsToLock().iterator().next()) + "...", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Thread t = new Thread(task);
|
rowProcessorExecutor.execute(task);
|
||||||
t.setDaemon(true);
|
|
||||||
t.start();
|
|
||||||
try {
|
try {
|
||||||
task.get(timeout, TimeUnit.MILLISECONDS);
|
task.get(timeout, TimeUnit.MILLISECONDS);
|
||||||
} catch (TimeoutException te) {
|
} catch (TimeoutException te) {
|
||||||
LOG.error("RowProcessor timeout on row:" +
|
LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
|
||||||
Bytes.toStringBinary(processor.getRow()) + " timeout:" + timeout, te);
|
Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
|
||||||
|
"...");
|
||||||
throw new IOException(te);
|
throw new IOException(te);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final private RowProcessor.RowScanner rowScanner =
|
|
||||||
new RowProcessor.RowScanner() {
|
|
||||||
@Override
|
|
||||||
public void doScan(Scan scan, List<KeyValue> result) throws IOException {
|
|
||||||
InternalScanner scanner = null;
|
|
||||||
try {
|
|
||||||
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
|
||||||
scanner = HRegion.this.getScanner(scan);
|
|
||||||
result.clear();
|
|
||||||
scanner.next(result);
|
|
||||||
} finally {
|
|
||||||
if (scanner != null) scanner.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: There's a lot of boiler plate code identical
|
// TODO: There's a lot of boiler plate code identical
|
||||||
// to increment... See how to better unify that.
|
// to increment... See how to better unify that.
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
|
||||||
|
*/
|
||||||
|
class MultiRowMutationProcessor extends BaseRowProcessor<Void> {
|
||||||
|
Collection<byte[]> rowsToLock;
|
||||||
|
Collection<Mutation> mutations;
|
||||||
|
|
||||||
|
MultiRowMutationProcessor(Collection<Mutation> mutations,
|
||||||
|
Collection<byte[]> rowsToLock) {
|
||||||
|
this.rowsToLock = rowsToLock;
|
||||||
|
this.mutations = mutations;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<byte[]> getRowsToLock() {
|
||||||
|
return rowsToLock;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean readOnly() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(long now,
|
||||||
|
HRegion region,
|
||||||
|
List<KeyValue> mutationKvs,
|
||||||
|
WALEdit walEdit) throws IOException {
|
||||||
|
byte[] byteNow = Bytes.toBytes(now);
|
||||||
|
// Check mutations and apply edits to a single WALEdit
|
||||||
|
for (Mutation m : mutations) {
|
||||||
|
if (m instanceof Put) {
|
||||||
|
Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
|
||||||
|
region.checkFamilies(familyMap.keySet());
|
||||||
|
region.checkTimestamps(familyMap, now);
|
||||||
|
region.updateKVTimestamps(familyMap.values(), byteNow);
|
||||||
|
} else if (m instanceof Delete) {
|
||||||
|
Delete d = (Delete) m;
|
||||||
|
region.prepareDelete(d);
|
||||||
|
region.prepareDeleteTimestamps(d, byteNow);
|
||||||
|
} else {
|
||||||
|
throw new DoNotRetryIOException(
|
||||||
|
"Action must be Put or Delete. But was: "
|
||||||
|
+ m.getClass().getName());
|
||||||
|
}
|
||||||
|
for (List<KeyValue> edits : m.getFamilyMap().values()) {
|
||||||
|
boolean writeToWAL = m.getWriteToWAL();
|
||||||
|
for (KeyValue kv : edits) {
|
||||||
|
mutationKvs.add(kv);
|
||||||
|
if (writeToWAL) {
|
||||||
|
walEdit.add(kv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||||
|
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||||
|
if (coprocessorHost != null) {
|
||||||
|
for (Mutation m : mutations) {
|
||||||
|
if (m instanceof Put) {
|
||||||
|
if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
|
||||||
|
// by pass everything
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else if (m instanceof Delete) {
|
||||||
|
Delete d = (Delete) m;
|
||||||
|
region.prepareDelete(d);
|
||||||
|
if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
|
||||||
|
// by pass everything
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
|
||||||
|
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||||
|
if (coprocessorHost != null) {
|
||||||
|
for (Mutation m : mutations) {
|
||||||
|
if (m instanceof Put) {
|
||||||
|
coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
|
||||||
|
} else if (m instanceof Delete) {
|
||||||
|
coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -15,30 +15,40 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.coprocessor;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.io.Writable;
|
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines the procedure to atomically perform multiple scans and mutations
|
* Defines the procedure to atomically perform multiple scans and mutations
|
||||||
* on one single row. The generic type parameter T is the return type of
|
* on a HRegion.
|
||||||
|
*
|
||||||
|
* This is invoked by {@link HRegion#processRowsWithLocks()}.
|
||||||
|
* This class performs scans and generates mutations and WAL edits.
|
||||||
|
* The locks and MVCC will be handled by HRegion.
|
||||||
|
*
|
||||||
|
* The generic type parameter T is the return type of
|
||||||
* RowProcessor.getResult().
|
* RowProcessor.getResult().
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Public
|
public interface RowProcessor<T> {
|
||||||
public interface RowProcessor<T> extends Writable {
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Which row to perform the read-write
|
* Rows to lock while operation.
|
||||||
|
* They have to be sorted with <code>RowProcessor</code>
|
||||||
|
* to avoid deadlock.
|
||||||
*/
|
*/
|
||||||
byte[] getRow();
|
Collection<byte[]> getRowsToLock();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain the processing result
|
* Obtain the processing result
|
||||||
|
@ -53,29 +63,40 @@ public interface RowProcessor<T> extends Writable {
|
||||||
boolean readOnly();
|
boolean readOnly();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HRegion calls this to process a row. You should override this to create
|
* HRegion handles the locks and MVCC and invokes this method properly.
|
||||||
* your own RowProcessor.
|
*
|
||||||
|
* You should override this to create your own RowProcessor.
|
||||||
|
*
|
||||||
|
* If you are doing read-modify-write here, you should consider using
|
||||||
|
* <code>IsolationLevel.READ_UNCOMMITTED</code> for scan because
|
||||||
|
* we advance MVCC after releasing the locks for optimization purpose.
|
||||||
*
|
*
|
||||||
* @param now the current system millisecond
|
* @param now the current system millisecond
|
||||||
* @param scanner the call back object the can be used to scan the row
|
* @param region the HRegion
|
||||||
* @param mutations the mutations for HRegion to do
|
* @param mutations the output mutations to apply to memstore
|
||||||
* @param walEdit the wal edit here allows inject some other meta data
|
* @param walEdit the output WAL edits to apply to write ahead log
|
||||||
*/
|
*/
|
||||||
void process(long now,
|
void process(long now,
|
||||||
RowProcessor.RowScanner scanner,
|
HRegion region,
|
||||||
List<KeyValue> mutations,
|
List<KeyValue> mutations,
|
||||||
WALEdit walEdit) throws IOException;
|
WALEdit walEdit) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The call back provided by HRegion to perform the scans on the row
|
* The hook to be executed before process().
|
||||||
|
*
|
||||||
|
* @param region the HRegion
|
||||||
|
* @param walEdit the output WAL edits to apply to write ahead log
|
||||||
*/
|
*/
|
||||||
public interface RowScanner {
|
void preProcess(HRegion region, WALEdit walEdit) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param scan The object defines what to read
|
* The hook to be executed after process().
|
||||||
* @param result The scan results will be added here
|
*
|
||||||
|
* @param region the HRegion
|
||||||
|
* @param walEdit the output WAL edits to apply to write ahead log
|
||||||
*/
|
*/
|
||||||
void doScan(Scan scan, List<KeyValue> result) throws IOException;
|
void postProcess(HRegion region, WALEdit walEdit) throws IOException;
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The replication cluster id.
|
* @return The replication cluster id.
|
|
@ -1,321 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.coprocessor;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
|
|
||||||
import java.io.DataInput;
|
|
||||||
import java.io.DataOutput;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.SmallTests;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
|
|
||||||
import com.sun.org.apache.commons.logging.Log;
|
|
||||||
import com.sun.org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verifies ProcessRowEndpoint works.
|
|
||||||
* The tested RowProcessor performs two scans and a read-modify-write.
|
|
||||||
*/
|
|
||||||
@Category(SmallTests.class)
|
|
||||||
public class TestProcessRowEndpoint {
|
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(TestProcessRowEndpoint.class);
|
|
||||||
|
|
||||||
private static final byte[] TABLE = Bytes.toBytes("testtable");
|
|
||||||
private static final byte[] TABLE2 = Bytes.toBytes("testtable2");
|
|
||||||
private final static byte[] ROW = Bytes.toBytes("testrow");
|
|
||||||
private final static byte[] FAM = Bytes.toBytes("friendlist");
|
|
||||||
|
|
||||||
// Column names
|
|
||||||
private final static byte[] A = Bytes.toBytes("a");
|
|
||||||
private final static byte[] B = Bytes.toBytes("b");
|
|
||||||
private final static byte[] C = Bytes.toBytes("c");
|
|
||||||
private final static byte[] D = Bytes.toBytes("d");
|
|
||||||
private final static byte[] E = Bytes.toBytes("e");
|
|
||||||
private final static byte[] F = Bytes.toBytes("f");
|
|
||||||
private final static byte[] G = Bytes.toBytes("g");
|
|
||||||
private final static byte[] REQUESTS = Bytes.toBytes("requests");
|
|
||||||
|
|
||||||
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
|
||||||
private volatile int numRequests;
|
|
||||||
|
|
||||||
private CountDownLatch startSignal;
|
|
||||||
private CountDownLatch doneSignal;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setupBeforeClass() throws Exception {
|
|
||||||
Configuration conf = util.getConfiguration();
|
|
||||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
|
||||||
FriendsOfFriendsEndpoint.class.getName());
|
|
||||||
util.startMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
util.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSingle() throws Throwable {
|
|
||||||
HTable table = prepareTestData(TABLE, util);
|
|
||||||
verifyProcessRow(table);
|
|
||||||
assertEquals(1, numRequests);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void verifyProcessRow(HTable table) throws Throwable {
|
|
||||||
|
|
||||||
FriendsOfFriendsProtocol processor =
|
|
||||||
table.coprocessorProxy(FriendsOfFriendsProtocol.class, ROW);
|
|
||||||
Result result = processor.query(ROW, A);
|
|
||||||
|
|
||||||
Set<String> friendsOfFriends = new HashSet<String>();
|
|
||||||
for (KeyValue kv : result.raw()) {
|
|
||||||
if (Bytes.equals(kv.getQualifier(), REQUESTS)) {
|
|
||||||
numRequests = Bytes.toInt(kv.getValue());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (byte val : kv.getValue()) {
|
|
||||||
friendsOfFriends.add((char)val + "");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Set<String> expected =
|
|
||||||
new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
|
|
||||||
assertEquals(expected, friendsOfFriends);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testThreads() throws Exception {
|
|
||||||
HTable table = prepareTestData(TABLE2, util);
|
|
||||||
int numThreads = 1000;
|
|
||||||
startSignal = new CountDownLatch(numThreads);
|
|
||||||
doneSignal = new CountDownLatch(numThreads);
|
|
||||||
for (int i = 0; i < numThreads; ++i) {
|
|
||||||
new Thread(new QueryRunner(table)).start();
|
|
||||||
startSignal.countDown();
|
|
||||||
}
|
|
||||||
doneSignal.await();
|
|
||||||
Get get = new Get(ROW);
|
|
||||||
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list()));
|
|
||||||
assertEquals(numThreads, numRequests);
|
|
||||||
}
|
|
||||||
|
|
||||||
class QueryRunner implements Runnable {
|
|
||||||
final HTable table;
|
|
||||||
QueryRunner(final HTable table) {
|
|
||||||
this.table = table;
|
|
||||||
}
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
startSignal.await();
|
|
||||||
verifyProcessRow(table);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
doneSignal.countDown();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static HTable prepareTestData(byte[] tableName, HBaseTestingUtility util)
|
|
||||||
throws Exception {
|
|
||||||
HTable table = util.createTable(tableName, FAM);
|
|
||||||
Put put = new Put(ROW);
|
|
||||||
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
|
||||||
put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
|
|
||||||
put.add(FAM, C, G); // G is a friend of C
|
|
||||||
table.put(put);
|
|
||||||
return table;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Coprocessor protocol that finds friends of friends of a person and
|
|
||||||
* update the number of requests.
|
|
||||||
*/
|
|
||||||
public static interface FriendsOfFriendsProtocol extends CoprocessorProtocol {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Query a person's friends of friends
|
|
||||||
*/
|
|
||||||
Result query(byte[] row, byte[] person) throws IOException;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Finds friends of friends of a person and update the number of requests.
|
|
||||||
*/
|
|
||||||
public static class FriendsOfFriendsEndpoint extends BaseEndpointCoprocessor
|
|
||||||
implements FriendsOfFriendsProtocol, RowProcessor<Result> {
|
|
||||||
byte[] row = null;
|
|
||||||
byte[] person = null;
|
|
||||||
Result result = null;
|
|
||||||
|
|
||||||
//
|
|
||||||
// FriendsOfFriendsProtocol method
|
|
||||||
//
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Result query(byte[] row, byte[] person) throws IOException {
|
|
||||||
this.row = row;
|
|
||||||
this.person = person;
|
|
||||||
HRegion region =
|
|
||||||
((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
|
|
||||||
region.processRow(this);
|
|
||||||
return this.getResult();
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// RowProcessor methods
|
|
||||||
//
|
|
||||||
|
|
||||||
FriendsOfFriendsEndpoint() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public byte[] getRow() {
|
|
||||||
return row;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Result getResult() {
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean readOnly() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(long now, RowProcessor.RowScanner scanner,
|
|
||||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
|
||||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
|
||||||
{ // First scan to get friends of the person and numRequests
|
|
||||||
Scan scan = new Scan(row, row);
|
|
||||||
scan.addColumn(FAM, person);
|
|
||||||
scan.addColumn(FAM, REQUESTS);
|
|
||||||
scanner.doScan(scan, kvs);
|
|
||||||
}
|
|
||||||
LOG.debug("first scan:" + stringifyKvs(kvs));
|
|
||||||
int numRequests = 0;
|
|
||||||
// Second scan to get friends of friends
|
|
||||||
Scan scan = new Scan(row, row);
|
|
||||||
for (KeyValue kv : kvs) {
|
|
||||||
if (Bytes.equals(kv.getQualifier(), REQUESTS)) {
|
|
||||||
numRequests = Bytes.toInt(kv.getValue());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
byte[] friends = kv.getValue();
|
|
||||||
for (byte f : friends) {
|
|
||||||
scan.addColumn(FAM, new byte[]{f});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
scanner.doScan(scan, kvs);
|
|
||||||
|
|
||||||
LOG.debug("second scan:" + stringifyKvs(kvs));
|
|
||||||
numRequests += 1;
|
|
||||||
// Construct mutations and Result
|
|
||||||
KeyValue kv = new KeyValue(
|
|
||||||
row, FAM, REQUESTS, now, Bytes.toBytes(numRequests));
|
|
||||||
mutations.clear();
|
|
||||||
mutations.add(kv);
|
|
||||||
kvs.add(kv);
|
|
||||||
LOG.debug("final result:" + stringifyKvs(kvs) +
|
|
||||||
" mutations:" + stringifyKvs(mutations));
|
|
||||||
result = new Result(kvs);
|
|
||||||
// Inject some meta data to the walEdit
|
|
||||||
KeyValue metaKv = new KeyValue(
|
|
||||||
getRow(), HLog.METAFAMILY,
|
|
||||||
Bytes.toBytes("FriendsOfFriends query"),
|
|
||||||
person);
|
|
||||||
walEdit.add(metaKv);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void readFields(DataInput in) throws IOException {
|
|
||||||
this.person = Bytes.readByteArray(in);
|
|
||||||
this.row = Bytes.readByteArray(in);
|
|
||||||
this.result = new Result();
|
|
||||||
result.readFields(in);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(DataOutput out) throws IOException {
|
|
||||||
Bytes.writeByteArray(out, person);
|
|
||||||
Bytes.writeByteArray(out, row);
|
|
||||||
if (result == null) {
|
|
||||||
new Result().write(out);
|
|
||||||
} else {
|
|
||||||
result.write(out);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public UUID getClusterId() {
|
|
||||||
return HConstants.DEFAULT_CLUSTER_ID;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static String stringifyKvs(Collection<KeyValue> kvs) {
|
|
||||||
StringBuilder out = new StringBuilder();
|
|
||||||
out.append("[");
|
|
||||||
for (KeyValue kv : kvs) {
|
|
||||||
byte[] col = kv.getQualifier();
|
|
||||||
byte[] val = kv.getValue();
|
|
||||||
if (Bytes.equals(col, REQUESTS)) {
|
|
||||||
out.append(Bytes.toStringBinary(col) + ":" +
|
|
||||||
Bytes.toInt(val) + " ");
|
|
||||||
} else {
|
|
||||||
out.append(Bytes.toStringBinary(col) + ":" +
|
|
||||||
Bytes.toStringBinary(val) + " ");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
out.append("]");
|
|
||||||
return out.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
@org.junit.Rule
|
|
||||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
|
||||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
|
||||||
}
|
|
|
@ -0,0 +1,592 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import com.sun.org.apache.commons.logging.Log;
|
||||||
|
import com.sun.org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies ProcessRowEndpoint works.
|
||||||
|
* The tested RowProcessor performs two scans and a read-modify-write.
|
||||||
|
*/
|
||||||
|
@Category(SmallTests.class)
|
||||||
|
public class TestRowProcessorEndpoint {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class);
|
||||||
|
|
||||||
|
private static final byte[] TABLE = Bytes.toBytes("testtable");
|
||||||
|
private final static byte[] ROW = Bytes.toBytes("testrow");
|
||||||
|
private final static byte[] ROW2 = Bytes.toBytes("testrow2");
|
||||||
|
private final static byte[] FAM = Bytes.toBytes("friendlist");
|
||||||
|
|
||||||
|
// Column names
|
||||||
|
private final static byte[] A = Bytes.toBytes("a");
|
||||||
|
private final static byte[] B = Bytes.toBytes("b");
|
||||||
|
private final static byte[] C = Bytes.toBytes("c");
|
||||||
|
private final static byte[] D = Bytes.toBytes("d");
|
||||||
|
private final static byte[] E = Bytes.toBytes("e");
|
||||||
|
private final static byte[] F = Bytes.toBytes("f");
|
||||||
|
private final static byte[] G = Bytes.toBytes("g");
|
||||||
|
private final static byte[] COUNTER = Bytes.toBytes("counter");
|
||||||
|
private final AtomicInteger failures = new AtomicInteger(0);
|
||||||
|
|
||||||
|
private static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
private static volatile int expectedCounter = 0;
|
||||||
|
private static int rowSize, row2Size;
|
||||||
|
|
||||||
|
private volatile static HTable table = null;
|
||||||
|
private volatile static boolean swapped = false;
|
||||||
|
private volatile CountDownLatch startSignal;
|
||||||
|
private volatile CountDownLatch doneSignal;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
Configuration conf = util.getConfiguration();
|
||||||
|
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
RowProcessorEndpoint.class.getName());
|
||||||
|
conf.setInt("hbase.client.retries.number", 1);
|
||||||
|
conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
|
||||||
|
util.startMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void prepareTestData() throws Exception {
|
||||||
|
try {
|
||||||
|
util.getHBaseAdmin().disableTable(TABLE);
|
||||||
|
util.getHBaseAdmin().deleteTable(TABLE);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore table not found
|
||||||
|
}
|
||||||
|
table = util.createTable(TABLE, FAM);
|
||||||
|
{
|
||||||
|
Put put = new Put(ROW);
|
||||||
|
put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A
|
||||||
|
put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
|
||||||
|
put.add(FAM, C, G); // G is a friend of C
|
||||||
|
table.put(put);
|
||||||
|
rowSize = put.size();
|
||||||
|
}
|
||||||
|
Put put = new Put(ROW2);
|
||||||
|
put.add(FAM, D, E);
|
||||||
|
put.add(FAM, F, G);
|
||||||
|
table.put(put);
|
||||||
|
row2Size = put.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDoubleScan() throws Throwable {
|
||||||
|
prepareTestData();
|
||||||
|
RowProcessorProtocol protocol =
|
||||||
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
|
RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
|
||||||
|
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
|
||||||
|
Set<String> result = protocol.process(processor);
|
||||||
|
|
||||||
|
Set<String> expected =
|
||||||
|
new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
|
||||||
|
Get get = new Get(ROW);
|
||||||
|
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list()));
|
||||||
|
assertEquals(expected, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadModifyWrite() throws Throwable {
|
||||||
|
prepareTestData();
|
||||||
|
failures.set(0);
|
||||||
|
int numThreads = 1000;
|
||||||
|
concurrentExec(new IncrementRunner(), numThreads);
|
||||||
|
Get get = new Get(ROW);
|
||||||
|
LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list()));
|
||||||
|
int finalCounter = incrementCounter(table);
|
||||||
|
assertEquals(numThreads + 1, finalCounter);
|
||||||
|
assertEquals(0, failures.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
class IncrementRunner implements Runnable {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
incrementCounter(table);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int incrementCounter(HTable table) throws Throwable {
|
||||||
|
RowProcessorProtocol protocol =
|
||||||
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
|
RowProcessorEndpoint.IncrementCounterProcessor processor =
|
||||||
|
new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
|
||||||
|
int counterValue = protocol.process(processor);
|
||||||
|
return counterValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void concurrentExec(
|
||||||
|
final Runnable task, final int numThreads) throws Throwable {
|
||||||
|
startSignal = new CountDownLatch(numThreads);
|
||||||
|
doneSignal = new CountDownLatch(numThreads);
|
||||||
|
for (int i = 0; i < numThreads; ++i) {
|
||||||
|
new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
startSignal.countDown();
|
||||||
|
startSignal.await();
|
||||||
|
task.run();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
failures.incrementAndGet();
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
doneSignal.countDown();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
doneSignal.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleRows() throws Throwable {
|
||||||
|
prepareTestData();
|
||||||
|
failures.set(0);
|
||||||
|
int numThreads = 1000;
|
||||||
|
concurrentExec(new SwapRowsRunner(), numThreads);
|
||||||
|
LOG.debug("row keyvalues:" +
|
||||||
|
stringifyKvs(table.get(new Get(ROW)).list()));
|
||||||
|
LOG.debug("row2 keyvalues:" +
|
||||||
|
stringifyKvs(table.get(new Get(ROW2)).list()));
|
||||||
|
assertEquals(rowSize, table.get(new Get(ROW)).list().size());
|
||||||
|
assertEquals(row2Size, table.get(new Get(ROW2)).list().size());
|
||||||
|
assertEquals(0, failures.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
class SwapRowsRunner implements Runnable {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
swapRows(table);
|
||||||
|
} catch (Throwable e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void swapRows(HTable table) throws Throwable {
|
||||||
|
RowProcessorProtocol protocol =
|
||||||
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
|
RowProcessorEndpoint.RowSwapProcessor processor =
|
||||||
|
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
|
||||||
|
protocol.process(processor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTimeout() throws Throwable {
|
||||||
|
prepareTestData();
|
||||||
|
RowProcessorProtocol protocol =
|
||||||
|
table.coprocessorProxy(RowProcessorProtocol.class, ROW);
|
||||||
|
RowProcessorEndpoint.TimeoutProcessor processor =
|
||||||
|
new RowProcessorEndpoint.TimeoutProcessor(ROW);
|
||||||
|
boolean exceptionCaught = false;
|
||||||
|
try {
|
||||||
|
protocol.process(processor);
|
||||||
|
} catch (Exception e) {
|
||||||
|
exceptionCaught = true;
|
||||||
|
}
|
||||||
|
assertTrue(exceptionCaught);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class defines two RowProcessors:
|
||||||
|
* IncrementCounterProcessor and FriendsOfFriendsProcessor.
|
||||||
|
*
|
||||||
|
* We define the RowProcessors as the inner class of the endpoint.
|
||||||
|
* So they can be loaded with the endpoint on the coprocessor.
|
||||||
|
*/
|
||||||
|
public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint
|
||||||
|
implements RowProcessorProtocol {
|
||||||
|
|
||||||
|
public static class IncrementCounterProcessor extends
|
||||||
|
BaseRowProcessor<Integer> implements Writable {
|
||||||
|
int counter = 0;
|
||||||
|
byte[] row = new byte[0];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty constructor for Writable
|
||||||
|
*/
|
||||||
|
IncrementCounterProcessor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
IncrementCounterProcessor(byte[] row) {
|
||||||
|
this.row = row;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<byte[]> getRowsToLock() {
|
||||||
|
return Collections.singleton(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer getResult() {
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean readOnly() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(long now, HRegion region,
|
||||||
|
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||||
|
// Scan current counter
|
||||||
|
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||||
|
Scan scan = new Scan(row, row);
|
||||||
|
scan.addColumn(FAM, COUNTER);
|
||||||
|
doScan(region, scan, kvs);
|
||||||
|
counter = kvs.size() == 0 ? 0 :
|
||||||
|
Bytes.toInt(kvs.iterator().next().getValue());
|
||||||
|
|
||||||
|
// Assert counter value
|
||||||
|
assertEquals(expectedCounter, counter);
|
||||||
|
|
||||||
|
// Increment counter and send it to both memstore and wal edit
|
||||||
|
counter += 1;
|
||||||
|
expectedCounter += 1;
|
||||||
|
|
||||||
|
|
||||||
|
KeyValue kv =
|
||||||
|
new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
|
||||||
|
mutations.add(kv);
|
||||||
|
walEdit.add(kv);
|
||||||
|
|
||||||
|
// We can also inject some meta data to the walEdit
|
||||||
|
KeyValue metaKv = new KeyValue(
|
||||||
|
row, HLog.METAFAMILY,
|
||||||
|
Bytes.toBytes("I just increment counter"),
|
||||||
|
Bytes.toBytes(counter));
|
||||||
|
walEdit.add(metaKv);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
this.row = Bytes.readByteArray(in);
|
||||||
|
this.counter = in.readInt();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
Bytes.writeByteArray(out, row);
|
||||||
|
out.writeInt(counter);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FriendsOfFriendsProcessor extends
|
||||||
|
BaseRowProcessor<Set<String>> implements Writable {
|
||||||
|
byte[] row = null;
|
||||||
|
byte[] person = null;
|
||||||
|
final Set<String> result = new HashSet<String>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty constructor for Writable
|
||||||
|
*/
|
||||||
|
FriendsOfFriendsProcessor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
FriendsOfFriendsProcessor(byte[] row, byte[] person) {
|
||||||
|
this.row = row;
|
||||||
|
this.person = person;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<byte[]> getRowsToLock() {
|
||||||
|
return Collections.singleton(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getResult() {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean readOnly() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(long now, HRegion region,
|
||||||
|
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||||
|
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||||
|
{ // First scan to get friends of the person
|
||||||
|
Scan scan = new Scan(row, row);
|
||||||
|
scan.addColumn(FAM, person);
|
||||||
|
doScan(region, scan, kvs);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second scan to get friends of friends
|
||||||
|
Scan scan = new Scan(row, row);
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
byte[] friends = kv.getValue();
|
||||||
|
for (byte f : friends) {
|
||||||
|
scan.addColumn(FAM, new byte[]{f});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
doScan(region, scan, kvs);
|
||||||
|
|
||||||
|
// Collect result
|
||||||
|
result.clear();
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
for (byte b : kv.getValue()) {
|
||||||
|
result.add((char)b + "");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
this.person = Bytes.readByteArray(in);
|
||||||
|
this.row = Bytes.readByteArray(in);
|
||||||
|
int size = in.readInt();
|
||||||
|
result.clear();
|
||||||
|
for (int i = 0; i < size; ++i) {
|
||||||
|
result.add(Text.readString(in));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
Bytes.writeByteArray(out, person);
|
||||||
|
Bytes.writeByteArray(out, row);
|
||||||
|
out.writeInt(result.size());
|
||||||
|
for (String s : result) {
|
||||||
|
Text.writeString(out, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class RowSwapProcessor extends
|
||||||
|
BaseRowProcessor<Set<String>> implements Writable {
|
||||||
|
byte[] row1 = new byte[0];
|
||||||
|
byte[] row2 = new byte[0];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty constructor for Writable
|
||||||
|
*/
|
||||||
|
RowSwapProcessor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
RowSwapProcessor(byte[] row1, byte[] row2) {
|
||||||
|
this.row1 = row1;
|
||||||
|
this.row2 = row2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<byte[]> getRowsToLock() {
|
||||||
|
List<byte[]> rows = new ArrayList<byte[]>();
|
||||||
|
rows.add(row1);
|
||||||
|
rows.add(row2);
|
||||||
|
return rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean readOnly() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(long now, HRegion region,
|
||||||
|
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||||
|
|
||||||
|
// Scan both rows
|
||||||
|
List<KeyValue> kvs1 = new ArrayList<KeyValue>();
|
||||||
|
List<KeyValue> kvs2 = new ArrayList<KeyValue>();
|
||||||
|
doScan(region, new Scan(row1, row1), kvs1);
|
||||||
|
doScan(region, new Scan(row2, row2), kvs2);
|
||||||
|
|
||||||
|
// Assert swapped
|
||||||
|
if (swapped) {
|
||||||
|
assertEquals(rowSize, kvs2.size());
|
||||||
|
assertEquals(row2Size, kvs1.size());
|
||||||
|
} else {
|
||||||
|
assertEquals(rowSize, kvs1.size());
|
||||||
|
assertEquals(row2Size, kvs2.size());
|
||||||
|
}
|
||||||
|
swapped = !swapped;
|
||||||
|
|
||||||
|
// Add and delete keyvalues
|
||||||
|
List<List<KeyValue>> kvs = new ArrayList<List<KeyValue>>();
|
||||||
|
kvs.add(kvs1);
|
||||||
|
kvs.add(kvs2);
|
||||||
|
byte[][] rows = new byte[][]{row1, row2};
|
||||||
|
for (int i = 0; i < kvs.size(); ++i) {
|
||||||
|
for (KeyValue kv : kvs.get(i)) {
|
||||||
|
// Delete from the current row and add to the other row
|
||||||
|
KeyValue kvDelete =
|
||||||
|
new KeyValue(rows[i], kv.getFamily(), kv.getQualifier(),
|
||||||
|
kv.getTimestamp(), KeyValue.Type.Delete);
|
||||||
|
KeyValue kvAdd =
|
||||||
|
new KeyValue(rows[1 - i], kv.getFamily(), kv.getQualifier(),
|
||||||
|
now, kv.getValue());
|
||||||
|
mutations.add(kvDelete);
|
||||||
|
walEdit.add(kvDelete);
|
||||||
|
mutations.add(kvAdd);
|
||||||
|
walEdit.add(kvAdd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
this.row1 = Bytes.readByteArray(in);
|
||||||
|
this.row2 = Bytes.readByteArray(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
Bytes.writeByteArray(out, row1);
|
||||||
|
Bytes.writeByteArray(out, row2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TimeoutProcessor extends
|
||||||
|
BaseRowProcessor<Void> implements Writable {
|
||||||
|
|
||||||
|
byte[] row = new byte[0];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty constructor for Writable
|
||||||
|
*/
|
||||||
|
public TimeoutProcessor() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public TimeoutProcessor(byte[] row) {
|
||||||
|
this.row = row;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<byte[]> getRowsToLock() {
|
||||||
|
return Collections.singleton(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(long now, HRegion region,
|
||||||
|
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
||||||
|
try {
|
||||||
|
// Sleep for a long time so it timeout
|
||||||
|
Thread.sleep(100 * 1000L);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean readOnly() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
this.row = Bytes.readByteArray(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
Bytes.writeByteArray(out, row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void doScan(
|
||||||
|
HRegion region, Scan scan, List<KeyValue> result) throws IOException {
|
||||||
|
InternalScanner scanner = null;
|
||||||
|
try {
|
||||||
|
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
|
||||||
|
scanner = region.getScanner(scan);
|
||||||
|
result.clear();
|
||||||
|
scanner.next(result);
|
||||||
|
} finally {
|
||||||
|
if (scanner != null) scanner.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static String stringifyKvs(Collection<KeyValue> kvs) {
|
||||||
|
StringBuilder out = new StringBuilder();
|
||||||
|
out.append("[");
|
||||||
|
if (kvs != null) {
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
byte[] col = kv.getQualifier();
|
||||||
|
byte[] val = kv.getValue();
|
||||||
|
if (Bytes.equals(col, COUNTER)) {
|
||||||
|
out.append(Bytes.toStringBinary(col) + ":" +
|
||||||
|
Bytes.toInt(val) + " ");
|
||||||
|
} else {
|
||||||
|
out.append(Bytes.toStringBinary(col) + ":" +
|
||||||
|
Bytes.toStringBinary(val) + " ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.append("]");
|
||||||
|
return out.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.Rule
|
||||||
|
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||||
|
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||||
|
}
|
Loading…
Reference in New Issue