HBASE-3468 Enhance checkAndPut and checkAndDelete with comparators

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1084752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2011-03-23 21:26:07 +00:00
parent 75bbdd7397
commit 5c78c1a4a7
11 changed files with 303 additions and 93 deletions

View File

@ -87,6 +87,7 @@ Release 0.91.0 - Unreleased
needs an upper bound added (Ted Yu via Stack) needs an upper bound added (Ted Yu via Stack)
HBASE-3676 Update region server load for AssignmentManager through HBASE-3676 Update region server load for AssignmentManager through
regionServerReport() (Ted Yu via Stack) regionServerReport() (Ted Yu via Stack)
HBASE-3468 Enhance checkAndPut and checkAndDelete with comparators
TASK TASK
HBASE-3559 Move report of split to master OFF the heartbeat channel HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -136,32 +138,32 @@ public abstract class BaseRegionObserverCoprocessor implements RegionObserver {
@Override @Override
public boolean preCheckAndPut(final RegionCoprocessorEnvironment e, public boolean preCheckAndPut(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
throws IOException { final Put put, final boolean result) throws IOException {
return result; return result;
} }
@Override @Override
public boolean postCheckAndPut(final RegionCoprocessorEnvironment e, public boolean postCheckAndPut(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
throws IOException { final Put put, final boolean result) throws IOException {
return result; return result;
} }
@Override @Override
public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e, public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
throws IOException { final Delete delete, final boolean result) throws IOException {
return result; return result;
} }
@Override @Override
public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e, public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
throws IOException { final Delete delete, final boolean result) throws IOException {
return result; return result;
} }

View File

@ -27,7 +27,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -285,7 +286,8 @@ public interface RegionObserver extends Coprocessor {
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds * @param put data to put if check succeeds
* @param result * @param result
* @return the return value to return to client if bypassing default * @return the return value to return to client if bypassing default
@ -294,7 +296,8 @@ public interface RegionObserver extends Coprocessor {
*/ */
public boolean preCheckAndPut(final RegionCoprocessorEnvironment e, public boolean preCheckAndPut(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Put put, final boolean result)
throws IOException; throws IOException;
/** /**
@ -306,7 +309,8 @@ public interface RegionObserver extends Coprocessor {
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds * @param put data to put if check succeeds
* @param result from the checkAndPut * @param result from the checkAndPut
* @return the possibly transformed return value to return to client * @return the possibly transformed return value to return to client
@ -314,7 +318,8 @@ public interface RegionObserver extends Coprocessor {
*/ */
public boolean postCheckAndPut(final RegionCoprocessorEnvironment e, public boolean postCheckAndPut(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Put put, final boolean result)
throws IOException; throws IOException;
/** /**
@ -328,7 +333,8 @@ public interface RegionObserver extends Coprocessor {
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds * @param delete delete to commit if check succeeds
* @param result * @param result
* @return the value to return to client if bypassing default processing * @return the value to return to client if bypassing default processing
@ -336,7 +342,8 @@ public interface RegionObserver extends Coprocessor {
*/ */
public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e, public boolean preCheckAndDelete(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Delete delete, final boolean result)
throws IOException; throws IOException;
/** /**
@ -348,7 +355,8 @@ public interface RegionObserver extends Coprocessor {
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds * @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete * @param result from the CheckAndDelete
* @return the possibly transformed returned value to return to client * @return the possibly transformed returned value to return to client
@ -356,7 +364,8 @@ public interface RegionObserver extends Coprocessor {
*/ */
public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e, public boolean postCheckAndDelete(final RegionCoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier, final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result) final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Delete delete, final boolean result)
throws IOException; throws IOException;
/** /**

View File

@ -0,0 +1,39 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.filter;
/**
* A binary comparator which lexicographically compares against the specified
* byte array using {@link org.apache.hadoop.hbase.util.Bytes#compareTo(byte[], byte[])}.
*/
public class NullComparator extends WritableByteArrayComparable {
/** Nullary constructor for Writable, do not use */
public NullComparator() {
value = new byte[0];
}
@Override
public int compareTo(byte[] value) {
return value != null ? 1 : 0;
}
}

View File

@ -46,12 +46,12 @@ public class QualifierFilter extends CompareFilter {
/** /**
* Constructor. * Constructor.
* @param qualifierCompareOp the compare op for column qualifier matching * @param op the compare op for column qualifier matching
* @param qualifierComparator the comparator for column qualifier matching * @param qualifierComparator the comparator for column qualifier matching
*/ */
public QualifierFilter(final CompareOp qualifierCompareOp, public QualifierFilter(final CompareOp op,
final WritableByteArrayComparable qualifierComparator) { final WritableByteArrayComparable qualifierComparator) {
super(qualifierCompareOp, qualifierComparator); super(op, qualifierComparator);
} }
@Override @Override

View File

@ -65,6 +65,7 @@ import org.apache.hadoop.hbase.filter.BitComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.DependentColumnFilter; import org.apache.hadoop.hbase.filter.DependentColumnFilter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.InclusiveStopFilter; import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
@ -219,6 +220,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
addToMap(Serializable.class, code++); addToMap(Serializable.class, code++);
addToMap(RandomRowFilter.class, code++); addToMap(RandomRowFilter.class, code++);
addToMap(CompareOp.class, code++);
} }
private Class<?> declaredClass; private Class<?> declaredClass;

View File

@ -40,6 +40,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.ipc.VersionedProtocol;
@ -435,4 +437,44 @@ public interface HRegionInterface extends VersionedProtocol, Stoppable, Abortabl
*/ */
ExecResult execCoprocessor(byte[] regionName, Exec call) ExecResult execCoprocessor(byte[] regionName, Exec call)
throws IOException; throws IOException;
/**
* Atomically checks if a row/family/qualifier value match the expectedValue.
* If it does, it adds the put. If passed expected value is null, then the
* check is for non-existance of the row/column.
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param compareOp
* @param comparator
* @param put
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
public boolean checkAndPut(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Put put)
throws IOException;
/**
* Atomically checks if a row/family/qualifier value match the expectedValue.
* If it does, it adds the delete. If passed expected value is null, then the
* check is for non-existance of the row/column.
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param compareOp
* @param comparator
* @param delete
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
public boolean checkAndDelete(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Delete delete)
throws IOException;
} }

View File

@ -74,8 +74,10 @@ import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCache;
@ -1635,14 +1637,16 @@ public class HRegion implements HeapSize { // , Writable{
* @param row * @param row
* @param family * @param family
* @param qualifier * @param qualifier
* @param expectedValue * @param compareOp
* @param comparator
* @param lockId * @param lockId
* @param writeToWAL * @param writeToWAL
* @throws IOException * @throws IOException
* @return true if the new put was execute, false otherwise * @return true if the new put was execute, false otherwise
*/ */
public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
byte [] expectedValue, Writable w, Integer lockId, boolean writeToWAL) CompareOp compareOp, WritableByteArrayComparable comparator, Writable w,
Integer lockId, boolean writeToWAL)
throws IOException{ throws IOException{
checkReadOnly(); checkReadOnly();
//TODO, add check for value length or maybe even better move this to the //TODO, add check for value length or maybe even better move this to the
@ -1671,12 +1675,32 @@ public class HRegion implements HeapSize { // , Writable{
boolean matches = false; boolean matches = false;
if (result.size() == 0 && if (result.size() == 0 &&
(expectedValue == null || expectedValue.length == 0)) { (comparator.getValue() == null || comparator.getValue().length == 0)) {
matches = true; matches = true;
} else if (result.size() == 1) { } else if (result.size() == 1) {
//Compare the expected value with the actual value int compareResult = comparator.compareTo(result.get(0).getValue());
byte [] actualValue = result.get(0).getValue(); switch (compareOp) {
matches = Bytes.equals(expectedValue, actualValue); case LESS:
matches = compareResult <= 0;
break;
case LESS_OR_EQUAL:
matches = compareResult < 0;
break;
case EQUAL:
matches = compareResult == 0;
break;
case NOT_EQUAL:
matches = compareResult != 0;
break;
case GREATER_OR_EQUAL:
matches = compareResult > 0;
break;
case GREATER:
matches = compareResult >= 0;
break;
default:
throw new RuntimeException("Unknown Compare op " + compareOp.name());
}
} }
//If matches put the new put or delete the new delete //If matches put the new put or delete the new delete
if (matches) { if (matches) {

View File

@ -34,7 +34,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -96,6 +95,9 @@ import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult; import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats; import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
@ -1694,8 +1696,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
} }
private boolean checkAndMutate(final byte[] regionName, final byte[] row, private boolean checkAndMutate(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final byte[] value, final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final Writable w, Integer lock) throws IOException { final WritableByteArrayComparable comparator, final Writable w,
Integer lock) throws IOException {
checkOpen(); checkOpen();
this.requestCount.incrementAndGet(); this.requestCount.incrementAndGet();
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
@ -1703,8 +1706,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
if (!region.getRegionInfo().isMetaTable()) { if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory(); this.cacheFlusher.reclaimMemStoreMemory();
} }
return region return region.checkAndMutate(row, family, qualifier, compareOp,
.checkAndMutate(row, family, qualifier, value, w, lock, true); comparator, w, lock, true);
} catch (Throwable t) { } catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t)); throw convertThrowableToIOE(cleanup(t));
} }
@ -1731,18 +1734,59 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
+ "regionName is null"); + "regionName is null");
} }
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
WritableByteArrayComparable comparator = new BinaryComparator(value);
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost() Boolean result = region.getCoprocessorHost()
.preCheckAndPut(row, family, qualifier, value, put); .preCheckAndPut(row, family, qualifier, CompareOp.EQUAL, comparator,
put);
if (result != null) { if (result != null) {
return result.booleanValue(); return result.booleanValue();
} }
} }
boolean result = checkAndMutate(regionName, row, family, qualifier, boolean result = checkAndMutate(regionName, row, family, qualifier,
value, put, getLockFromId(put.getLockId())); CompareOp.EQUAL, new BinaryComparator(value), put,
getLockFromId(put.getLockId()));
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndPut(row, family, result = region.getCoprocessorHost().postCheckAndPut(row, family,
qualifier, value, put, result); qualifier, CompareOp.EQUAL, comparator, put, result);
}
return result;
}
/**
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param compareOp
* @param comparator
* @param put
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
public boolean checkAndPut(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Put put)
throws IOException {
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to checkAndPut "
+ "regionName is null");
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost()
.preCheckAndPut(row, family, qualifier, compareOp, comparator, put);
if (result != null) {
return result.booleanValue();
}
}
boolean result = checkAndMutate(regionName, row, family, qualifier,
compareOp, comparator, put, getLockFromId(put.getLockId()));
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndPut(row, family,
qualifier, compareOp, comparator, put, result);
} }
return result; return result;
} }
@ -1769,22 +1813,62 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
+ "regionName is null"); + "regionName is null");
} }
HRegion region = getRegion(regionName); HRegion region = getRegion(regionName);
WritableByteArrayComparable comparator = new BinaryComparator(value);
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost().preCheckAndDelete(row, Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
family, qualifier, value, delete); family, qualifier, CompareOp.EQUAL, comparator, delete);
if (result != null) { if (result != null) {
return result.booleanValue(); return result.booleanValue();
} }
} }
boolean result = checkAndMutate(regionName, row, family, qualifier, value, boolean result = checkAndMutate(regionName, row, family, qualifier,
delete, getLockFromId(delete.getLockId())); CompareOp.EQUAL, comparator, delete, getLockFromId(delete.getLockId()));
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndDelete(row, family, result = region.getCoprocessorHost().postCheckAndDelete(row, family,
qualifier, value, delete, result); qualifier, CompareOp.EQUAL, comparator, delete, result);
} }
return result; return result;
} }
/**
*
* @param regionName
* @param row
* @param family
* @param qualifier
* @param compareOp
* @param comparator
* @param delete
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
public boolean checkAndDelete(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Delete delete)
throws IOException {
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to checkAndDelete "
+ "regionName is null");
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
family, qualifier, compareOp, comparator, delete);
if (result != null) {
return result.booleanValue();
}
}
boolean result = checkAndMutate(regionName, row, family, qualifier,
compareOp, comparator, delete, getLockFromId(delete.getLockId()));
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndDelete(row, family,
qualifier, compareOp, comparator, delete, result);
}
return result;
}
// //
// remote scanner interface // remote scanner interface
// //

View File

@ -24,32 +24,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.coprocessor.*; import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -619,14 +609,16 @@ public class RegionCoprocessorHost
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds * @param put data to put if check succeeds
* @return true or false to return to client if default processing should * @return true or false to return to client if default processing should
* be bypassed, or null otherwise * be bypassed, or null otherwise
* @throws IOException e * @throws IOException e
*/ */
public Boolean preCheckAndPut(final byte [] row, final byte [] family, public Boolean preCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, Put put) final byte [] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, Put put)
throws IOException throws IOException
{ {
try { try {
@ -636,7 +628,7 @@ public class RegionCoprocessorHost
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) { if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).preCheckAndPut(env, row, family, result = ((RegionObserver)env.getInstance()).preCheckAndPut(env, row, family,
qualifier, value, put, result); qualifier, compareOp, comparator, put, result);
bypass |= env.shouldBypass(); bypass |= env.shouldBypass();
if (env.shouldComplete()) { if (env.shouldComplete()) {
break; break;
@ -653,12 +645,14 @@ public class RegionCoprocessorHost
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param put data to put if check succeeds * @param put data to put if check succeeds
* @throws IOException e * @throws IOException e
*/ */
public boolean postCheckAndPut(final byte [] row, final byte [] family, public boolean postCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, final Put put, final byte [] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Put put,
boolean result) boolean result)
throws IOException throws IOException
{ {
@ -667,7 +661,7 @@ public class RegionCoprocessorHost
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) { if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).postCheckAndPut(env, row, result = ((RegionObserver)env.getInstance()).postCheckAndPut(env, row,
family, qualifier, value, put, result); family, qualifier, compareOp, comparator, put, result);
if (env.shouldComplete()) { if (env.shouldComplete()) {
break; break;
} }
@ -683,16 +677,17 @@ public class RegionCoprocessorHost
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds * @param delete delete to commit if check succeeds
* @return true or false to return to client if default processing should * @return true or false to return to client if default processing should
* be bypassed, or null otherwise * be bypassed, or null otherwise
* @throws IOException e * @throws IOException e
*/ */
public Boolean preCheckAndDelete(final byte [] row, final byte [] family, public Boolean preCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, Delete delete) final byte [] qualifier, final CompareOp compareOp,
throws IOException final WritableByteArrayComparable comparator, Delete delete)
{ throws IOException {
try { try {
boolean bypass = false; boolean bypass = false;
boolean result = false; boolean result = false;
@ -700,7 +695,7 @@ public class RegionCoprocessorHost
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) { if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).preCheckAndDelete(env, row, result = ((RegionObserver)env.getInstance()).preCheckAndDelete(env, row,
family, qualifier, value, delete, result); family, qualifier, compareOp, comparator, delete, result);
bypass |= env.shouldBypass(); bypass |= env.shouldBypass();
if (env.shouldComplete()) { if (env.shouldComplete()) {
break; break;
@ -717,12 +712,14 @@ public class RegionCoprocessorHost
* @param row row to check * @param row row to check
* @param family column family * @param family column family
* @param qualifier column qualifier * @param qualifier column qualifier
* @param value the expected value * @param compareOp the comparison operation
* @param comparator the comparator
* @param delete delete to commit if check succeeds * @param delete delete to commit if check succeeds
* @throws IOException e * @throws IOException e
*/ */
public boolean postCheckAndDelete(final byte [] row, final byte [] family, public boolean postCheckAndDelete(final byte [] row, final byte [] family,
final byte [] qualifier, final byte [] value, final Delete delete, final byte [] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Delete delete,
boolean result) boolean result)
throws IOException throws IOException
{ {
@ -730,8 +727,9 @@ public class RegionCoprocessorHost
coprocessorLock.readLock().lock(); coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) { if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).postCheckAndDelete(env, row, result = ((RegionObserver)env.getInstance())
family, qualifier, value, delete, result); .postCheckAndDelete(env, row, family, qualifier, compareOp,
comparator, delete, result);
if (env.shouldComplete()) { if (env.shouldComplete()) {
break; break;
} }

View File

@ -53,10 +53,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
@ -461,43 +461,47 @@ public class TestHRegion extends HBaseTestCase {
put.add(fam1, qf1, val1); put.add(fam1, qf1, val1);
//checkAndPut with correct value //checkAndPut with correct value
boolean res = region.checkAndMutate(row1, fam1, qf1, emptyVal, put, lockId, boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
true); new BinaryComparator(emptyVal), put, lockId, true);
assertTrue(res); assertTrue(res);
// not empty anymore // not empty anymore
res = region.checkAndMutate(row1, fam1, qf1, emptyVal, put, lockId, true); res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(emptyVal), put, lockId, true);
assertFalse(res); assertFalse(res);
Delete delete = new Delete(row1); Delete delete = new Delete(row1);
delete.deleteColumn(fam1, qf1); delete.deleteColumn(fam1, qf1);
res = region.checkAndMutate(row1, fam1, qf1, emptyVal, delete, lockId, res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
true); new BinaryComparator(emptyVal), delete, lockId, true);
assertFalse(res); assertFalse(res);
put = new Put(row1); put = new Put(row1);
put.add(fam1, qf1, val2); put.add(fam1, qf1, val2);
//checkAndPut with correct value //checkAndPut with correct value
res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true); res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(val1), put, lockId, true);
assertTrue(res); assertTrue(res);
//checkAndDelete with correct value //checkAndDelete with correct value
delete = new Delete(row1); delete = new Delete(row1);
delete.deleteColumn(fam1, qf1); delete.deleteColumn(fam1, qf1);
delete.deleteColumn(fam1, qf1); delete.deleteColumn(fam1, qf1);
res = region.checkAndMutate(row1, fam1, qf1, val2, delete, lockId, true); res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(val2), delete, lockId, true);
assertTrue(res); assertTrue(res);
delete = new Delete(row1); delete = new Delete(row1);
res = region.checkAndMutate(row1, fam1, qf1, emptyVal, delete, lockId, res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
true); new BinaryComparator(emptyVal), delete, lockId, true);
assertTrue(res); assertTrue(res);
//checkAndPut looking for a null value //checkAndPut looking for a null value
put = new Put(row1); put = new Put(row1);
put.add(fam1, qf1, val1); put.add(fam1, qf1, val1);
res = region.checkAndMutate(row1, fam1, qf1, null, put, lockId, true); res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new NullComparator(), put, lockId, true);
assertTrue(res); assertTrue(res);
} }
@ -521,13 +525,15 @@ public class TestHRegion extends HBaseTestCase {
region.put(put); region.put(put);
//checkAndPut with wrong value //checkAndPut with wrong value
boolean res = region.checkAndMutate(row1, fam1, qf1, val2, put, lockId, true); boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(val2), put, lockId, true);
assertEquals(false, res); assertEquals(false, res);
//checkAndDelete with wrong value //checkAndDelete with wrong value
Delete delete = new Delete(row1); Delete delete = new Delete(row1);
delete.deleteFamily(fam1); delete.deleteFamily(fam1);
res = region.checkAndMutate(row1, fam1, qf1, val2, delete, lockId, true); res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(val2), delete, lockId, true);
assertEquals(false, res); assertEquals(false, res);
} }
@ -549,13 +555,15 @@ public class TestHRegion extends HBaseTestCase {
region.put(put); region.put(put);
//checkAndPut with correct value //checkAndPut with correct value
boolean res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true); boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(val1), put, lockId, true);
assertEquals(true, res); assertEquals(true, res);
//checkAndDelete with correct value //checkAndDelete with correct value
Delete delete = new Delete(row1); Delete delete = new Delete(row1);
delete.deleteColumn(fam1, qf1); delete.deleteColumn(fam1, qf1);
res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true); res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(val1), put, lockId, true);
assertEquals(true, res); assertEquals(true, res);
} }
@ -590,7 +598,8 @@ public class TestHRegion extends HBaseTestCase {
Store store = region.getStore(fam1); Store store = region.getStore(fam1);
store.memstore.kvset.size(); store.memstore.kvset.size();
boolean res = region.checkAndMutate(row1, fam1, qf1, val1, put, lockId, true); boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
new BinaryComparator(val1), put, lockId, true);
assertEquals(true, res); assertEquals(true, res);
store.memstore.kvset.size(); store.memstore.kvset.size();
@ -613,8 +622,8 @@ public class TestHRegion extends HBaseTestCase {
Put put = new Put(row2); Put put = new Put(row2);
put.add(fam1, qual1, value1); put.add(fam1, qual1, value1);
try { try {
boolean res = region.checkAndMutate(row, boolean res = region.checkAndMutate(row, fam1, qual1, CompareOp.EQUAL,
fam1, qual1, value2, put, null, false); new BinaryComparator(value2), put, null, false);
fail(); fail();
} catch (DoNotRetryIOException expected) { } catch (DoNotRetryIOException expected) {
// expected exception. // expected exception.
@ -660,8 +669,8 @@ public class TestHRegion extends HBaseTestCase {
delete.deleteColumn(fam1, qf1); delete.deleteColumn(fam1, qf1);
delete.deleteColumn(fam2, qf1); delete.deleteColumn(fam2, qf1);
delete.deleteColumn(fam1, qf3); delete.deleteColumn(fam1, qf3);
boolean res = region.checkAndMutate(row1, fam1, qf1, val2, delete, lockId, boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
true); new BinaryComparator(val2), delete, lockId, true);
assertEquals(true, res); assertEquals(true, res);
Get get = new Get(row1); Get get = new Get(row1);
@ -676,8 +685,8 @@ public class TestHRegion extends HBaseTestCase {
//Family delete //Family delete
delete = new Delete(row1); delete = new Delete(row1);
delete.deleteFamily(fam2); delete.deleteFamily(fam2);
res = region.checkAndMutate(row1, fam2, qf1, emptyVal, delete, lockId, res = region.checkAndMutate(row1, fam2, qf1, CompareOp.EQUAL,
true); new BinaryComparator(emptyVal), delete, lockId, true);
assertEquals(true, res); assertEquals(true, res);
get = new Get(row1); get = new Get(row1);
@ -687,8 +696,8 @@ public class TestHRegion extends HBaseTestCase {
//Row delete //Row delete
delete = new Delete(row1); delete = new Delete(row1);
res = region.checkAndMutate(row1, fam1, qf1, val1, delete, lockId, res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
true); new BinaryComparator(val1), delete, lockId, true);
assertEquals(true, res); assertEquals(true, res);
get = new Get(row1); get = new Get(row1);
r = region.get(get, null); r = region.get(get, null);
@ -2389,7 +2398,7 @@ public class TestHRegion extends HBaseTestCase {
Scan scan = new Scan(); Scan scan = new Scan();
scan.addFamily(family); scan.addFamily(family);
scan.setFilter(new SingleColumnValueFilter(family, qual1, scan.setFilter(new SingleColumnValueFilter(family, qual1,
CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5L)))); CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(5L))));
int expectedCount = 0; int expectedCount = 0;
List<KeyValue> res = new ArrayList<KeyValue>(); List<KeyValue> res = new ArrayList<KeyValue>();
@ -2753,10 +2762,9 @@ public class TestHRegion extends HBaseTestCase {
idxScan.addFamily(family); idxScan.addFamily(family);
idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL, idxScan.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ALL,
Arrays.<Filter>asList(new SingleColumnValueFilter(family, qual1, Arrays.<Filter>asList(new SingleColumnValueFilter(family, qual1,
CompareFilter.CompareOp.GREATER_OR_EQUAL, CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes(0L))), new BinaryComparator(Bytes.toBytes(0L))),
new SingleColumnValueFilter(family, qual1, new SingleColumnValueFilter(family, qual1, CompareOp.LESS_OR_EQUAL,
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes(3L))) new BinaryComparator(Bytes.toBytes(3L)))
))); )));
InternalScanner scanner = region.getScanner(idxScan); InternalScanner scanner = region.getScanner(idxScan);