HBASE-11423 Visibility label and per cell ACL feature not working with HTable#mutateRow() and MultiRowMutationEndpoint. (Anoop)
This commit is contained in:
parent
3d37252a32
commit
3cdbe2ae56
|
@ -38,7 +38,15 @@ implements RowProcessor<S,T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
|
public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postBatchMutate(HRegion region) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||||
|
import org.apache.hadoop.hbase.CellScanner;
|
||||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
@ -4902,7 +4903,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
doProcessRowWithTimeout(
|
doProcessRowWithTimeout(
|
||||||
processor, now, this, null, null, timeout);
|
processor, now, this, null, null, timeout);
|
||||||
processor.postProcess(this, walEdit);
|
processor.postProcess(this, walEdit, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -4916,7 +4917,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
boolean walSyncSuccessful = false;
|
boolean walSyncSuccessful = false;
|
||||||
List<RowLock> acquiredRowLocks;
|
List<RowLock> acquiredRowLocks;
|
||||||
long addedSize = 0;
|
long addedSize = 0;
|
||||||
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
List<Mutation> mutations = new ArrayList<Mutation>();
|
||||||
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
|
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
|
||||||
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
||||||
long mvccNum = 0;
|
long mvccNum = 0;
|
||||||
|
@ -4931,6 +4932,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// 3. Region lock
|
// 3. Region lock
|
||||||
lock(this.updatesLock.readLock(), acquiredRowLocks.size());
|
lock(this.updatesLock.readLock(), acquiredRowLocks.size());
|
||||||
locked = true;
|
locked = true;
|
||||||
|
// Get a mvcc write number
|
||||||
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
|
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
|
||||||
|
|
||||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
@ -4941,10 +4943,14 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
processor, now, this, mutations, walEdit, timeout);
|
processor, now, this, mutations, walEdit, timeout);
|
||||||
|
|
||||||
if (!mutations.isEmpty()) {
|
if (!mutations.isEmpty()) {
|
||||||
// 5. Get a mvcc write number
|
// 5. Start mvcc transaction
|
||||||
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
|
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
|
||||||
// 6. Apply to memstore
|
// 6. Call the preBatchMutate hook
|
||||||
for (KeyValue kv : mutations) {
|
processor.preBatchMutate(this, walEdit);
|
||||||
|
// 7. Apply to memstore
|
||||||
|
for (Mutation m : mutations) {
|
||||||
|
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
|
||||||
|
KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
|
||||||
kv.setSequenceId(mvccNum);
|
kv.setSequenceId(mvccNum);
|
||||||
Store store = getStore(kv);
|
Store store = getStore(kv);
|
||||||
if (store == null) {
|
if (store == null) {
|
||||||
|
@ -4955,9 +4961,10 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
addedSize += ret.getFirst();
|
addedSize += ret.getFirst();
|
||||||
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
|
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long txid = 0;
|
long txid = 0;
|
||||||
// 7. Append no sync
|
// 8. Append no sync
|
||||||
if (!walEdit.isEmpty()) {
|
if (!walEdit.isEmpty()) {
|
||||||
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||||
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
|
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
|
||||||
|
@ -4971,31 +4978,36 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
|
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 8. Release region lock
|
// 9. Release region lock
|
||||||
if (locked) {
|
if (locked) {
|
||||||
this.updatesLock.readLock().unlock();
|
this.updatesLock.readLock().unlock();
|
||||||
locked = false;
|
locked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 9. Release row lock(s)
|
// 10. Release row lock(s)
|
||||||
releaseRowLocks(acquiredRowLocks);
|
releaseRowLocks(acquiredRowLocks);
|
||||||
|
|
||||||
// 10. Sync edit log
|
// 11. Sync edit log
|
||||||
if (txid != 0) {
|
if (txid != 0) {
|
||||||
syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
|
syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
|
||||||
}
|
}
|
||||||
walSyncSuccessful = true;
|
walSyncSuccessful = true;
|
||||||
|
// 12. call postBatchMutate hook
|
||||||
|
processor.postBatchMutate(this);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (!mutations.isEmpty() && !walSyncSuccessful) {
|
if (!mutations.isEmpty() && !walSyncSuccessful) {
|
||||||
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
|
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
|
||||||
" memstore keyvalues for row(s):" + StringUtils.byteToHexString(
|
" memstore keyvalues for row(s):" + StringUtils.byteToHexString(
|
||||||
processor.getRowsToLock().iterator().next()) + "...");
|
processor.getRowsToLock().iterator().next()) + "...");
|
||||||
for (KeyValue kv : mutations) {
|
for (Mutation m : mutations) {
|
||||||
|
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
|
||||||
|
KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
|
||||||
getStore(kv).rollback(kv);
|
getStore(kv).rollback(kv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 11. Roll mvcc forward
|
}
|
||||||
|
// 13. Roll mvcc forward
|
||||||
if (writeEntry != null) {
|
if (writeEntry != null) {
|
||||||
mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
|
mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
|
||||||
}
|
}
|
||||||
|
@ -5006,8 +5018,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
releaseRowLocks(acquiredRowLocks);
|
releaseRowLocks(acquiredRowLocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 12. Run post-process hook
|
// 14. Run post-process hook
|
||||||
processor.postProcess(this, walEdit);
|
processor.postProcess(this, walEdit, walSyncSuccessful);
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -5023,7 +5035,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
|
private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
|
||||||
final long now,
|
final long now,
|
||||||
final HRegion region,
|
final HRegion region,
|
||||||
final List<KeyValue> mutations,
|
final List<Mutation> mutations,
|
||||||
final WALEdit walEdit,
|
final WALEdit walEdit,
|
||||||
final long timeout) throws IOException {
|
final long timeout) throws IOException {
|
||||||
// Short circuit the no time bound case.
|
// Short circuit the no time bound case.
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -42,6 +43,7 @@ class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcess
|
||||||
MultiRowMutationProcessorResponse> {
|
MultiRowMutationProcessorResponse> {
|
||||||
Collection<byte[]> rowsToLock;
|
Collection<byte[]> rowsToLock;
|
||||||
Collection<Mutation> mutations;
|
Collection<Mutation> mutations;
|
||||||
|
MiniBatchOperationInProgress<Mutation> miniBatch;
|
||||||
|
|
||||||
MultiRowMutationProcessor(Collection<Mutation> mutations,
|
MultiRowMutationProcessor(Collection<Mutation> mutations,
|
||||||
Collection<byte[]> rowsToLock) {
|
Collection<byte[]> rowsToLock) {
|
||||||
|
@ -67,11 +69,11 @@ MultiRowMutationProcessorResponse> {
|
||||||
@Override
|
@Override
|
||||||
public void process(long now,
|
public void process(long now,
|
||||||
HRegion region,
|
HRegion region,
|
||||||
List<KeyValue> mutationKvs,
|
List<Mutation> mutationsToApply,
|
||||||
WALEdit walEdit) throws IOException {
|
WALEdit walEdit) throws IOException {
|
||||||
byte[] byteNow = Bytes.toBytes(now);
|
byte[] byteNow = Bytes.toBytes(now);
|
||||||
// Check mutations and apply edits to a single WALEdit
|
// Check mutations
|
||||||
for (Mutation m : mutations) {
|
for (Mutation m : this.mutations) {
|
||||||
if (m instanceof Put) {
|
if (m instanceof Put) {
|
||||||
Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
|
Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
|
||||||
region.checkFamilies(familyMap.keySet());
|
region.checkFamilies(familyMap.keySet());
|
||||||
|
@ -82,18 +84,18 @@ MultiRowMutationProcessorResponse> {
|
||||||
region.prepareDelete(d);
|
region.prepareDelete(d);
|
||||||
region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
|
region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
|
||||||
} else {
|
} else {
|
||||||
throw new DoNotRetryIOException(
|
throw new DoNotRetryIOException("Action must be Put or Delete. But was: "
|
||||||
"Action must be Put or Delete. But was: "
|
|
||||||
+ m.getClass().getName());
|
+ m.getClass().getName());
|
||||||
}
|
}
|
||||||
for (List<Cell> cells: m.getFamilyCellMap().values()) {
|
mutationsToApply.add(m);
|
||||||
|
}
|
||||||
|
// Apply edits to a single WALEdit
|
||||||
|
for (Mutation m : mutations) {
|
||||||
|
for (List<Cell> cells : m.getFamilyCellMap().values()) {
|
||||||
boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
|
boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
|
||||||
for (Cell cell : cells) {
|
for (Cell cell : cells) {
|
||||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||||
mutationKvs.add(kv);
|
if (writeToWAL) walEdit.add(kv);
|
||||||
if (writeToWAL) {
|
|
||||||
walEdit.add(kv);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -122,7 +124,46 @@ MultiRowMutationProcessorResponse> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
|
public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
|
||||||
|
// TODO we should return back the status of this hook run to HRegion so that those Mutations
|
||||||
|
// with OperationStatus as SUCCESS or FAILURE should not get applied to memstore.
|
||||||
|
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||||
|
OperationStatus[] opStatus = new OperationStatus[mutations.size()];
|
||||||
|
Arrays.fill(opStatus, OperationStatus.NOT_RUN);
|
||||||
|
WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
|
||||||
|
if (coprocessorHost != null) {
|
||||||
|
miniBatch = new MiniBatchOperationInProgress<Mutation>(
|
||||||
|
mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
|
||||||
|
mutations.size());
|
||||||
|
coprocessorHost.preBatchMutate(miniBatch);
|
||||||
|
}
|
||||||
|
// Apply edits to a single WALEdit
|
||||||
|
for (int i = 0; i < mutations.size(); i++) {
|
||||||
|
if (opStatus[i] == OperationStatus.NOT_RUN) {
|
||||||
|
// Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook
|
||||||
|
// itself. No need to apply again to region
|
||||||
|
if (walEditsFromCP[i] != null) {
|
||||||
|
// Add the WALEdit created by CP hook
|
||||||
|
for (KeyValue walKv : walEditsFromCP[i].getKeyValues()) {
|
||||||
|
walEdit.add(walKv);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postBatchMutate(HRegion region) throws IOException {
|
||||||
|
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||||
|
if (coprocessorHost != null) {
|
||||||
|
assert miniBatch != null;
|
||||||
|
// Use the same miniBatch state used to call the preBatchMutate()
|
||||||
|
coprocessorHost.postBatchMutate(miniBatch);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
|
||||||
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
|
||||||
if (coprocessorHost != null) {
|
if (coprocessorHost != null) {
|
||||||
for (Mutation m : mutations) {
|
for (Mutation m : mutations) {
|
||||||
|
@ -132,6 +173,12 @@ MultiRowMutationProcessorResponse> {
|
||||||
coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
|
coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// At the end call the CP hook postBatchMutateIndispensably
|
||||||
|
if (miniBatch != null) {
|
||||||
|
// Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a
|
||||||
|
// read only process. Then no need to call this batch based CP hook also.
|
||||||
|
coprocessorHost.postBatchMutateIndispensably(miniBatch, success);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,8 +23,8 @@ import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
|
||||||
import com.google.protobuf.Message;
|
import com.google.protobuf.Message;
|
||||||
|
@ -83,7 +83,7 @@ public interface RowProcessor<S extends Message, T extends Message> {
|
||||||
*/
|
*/
|
||||||
void process(long now,
|
void process(long now,
|
||||||
HRegion region,
|
HRegion region,
|
||||||
List<KeyValue> mutations,
|
List<Mutation> mutations,
|
||||||
WALEdit walEdit) throws IOException;
|
WALEdit walEdit) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -95,13 +95,31 @@ public interface RowProcessor<S extends Message, T extends Message> {
|
||||||
void preProcess(HRegion region, WALEdit walEdit) throws IOException;
|
void preProcess(HRegion region, WALEdit walEdit) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The hook to be executed after process().
|
* The hook to be executed after the process() but before applying the Mutations to region. Also
|
||||||
|
* by the time this hook is been called, mvcc transaction is started.
|
||||||
|
* @param region
|
||||||
|
* @param walEdit the output WAL edits to apply to write ahead log
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The hook to be executed after the process() and applying the Mutations to region. The
|
||||||
|
* difference of this one with {@link #postProcess(HRegion, WALEdit, boolean)} is this hook will
|
||||||
|
* be executed before the mvcc transaction completion.
|
||||||
|
* @param region
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void postBatchMutate(HRegion region) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The hook to be executed after process() and applying the Mutations to region.
|
||||||
*
|
*
|
||||||
* @param region the HRegion
|
* @param region the HRegion
|
||||||
* @param walEdit the output WAL edits to apply to write ahead log
|
* @param walEdit the output WAL edits to apply to write ahead log
|
||||||
|
* @param success true if batch operation is successful otherwise false.
|
||||||
*/
|
*/
|
||||||
void postProcess(HRegion region, WALEdit walEdit) throws IOException;
|
void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The cluster ids that have the change.
|
* @return The cluster ids that have the change.
|
||||||
|
|
|
@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.MediumTests;
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.IsolationLevel;
|
import org.apache.hadoop.hbase.client.IsolationLevel;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
|
import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
|
||||||
|
@ -328,7 +330,7 @@ public class TestRowProcessorEndpoint {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(long now, HRegion region,
|
public void process(long now, HRegion region,
|
||||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
||||||
// Scan current counter
|
// Scan current counter
|
||||||
List<Cell> kvs = new ArrayList<Cell>();
|
List<Cell> kvs = new ArrayList<Cell>();
|
||||||
Scan scan = new Scan(row, row);
|
Scan scan = new Scan(row, row);
|
||||||
|
@ -345,9 +347,11 @@ public class TestRowProcessorEndpoint {
|
||||||
expectedCounter += 1;
|
expectedCounter += 1;
|
||||||
|
|
||||||
|
|
||||||
|
Put p = new Put(row);
|
||||||
KeyValue kv =
|
KeyValue kv =
|
||||||
new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
|
new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
|
||||||
mutations.add(kv);
|
p.add(kv);
|
||||||
|
mutations.add(p);
|
||||||
walEdit.add(kv);
|
walEdit.add(kv);
|
||||||
|
|
||||||
// We can also inject some meta data to the walEdit
|
// We can also inject some meta data to the walEdit
|
||||||
|
@ -410,7 +414,7 @@ public class TestRowProcessorEndpoint {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(long now, HRegion region,
|
public void process(long now, HRegion region,
|
||||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
||||||
List<Cell> kvs = new ArrayList<Cell>();
|
List<Cell> kvs = new ArrayList<Cell>();
|
||||||
{ // First scan to get friends of the person
|
{ // First scan to get friends of the person
|
||||||
Scan scan = new Scan(row, row);
|
Scan scan = new Scan(row, row);
|
||||||
|
@ -494,7 +498,7 @@ public class TestRowProcessorEndpoint {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(long now, HRegion region,
|
public void process(long now, HRegion region,
|
||||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
||||||
|
|
||||||
// Override the time to avoid race-condition in the unit test caused by
|
// Override the time to avoid race-condition in the unit test caused by
|
||||||
// inacurate timer on some machines
|
// inacurate timer on some machines
|
||||||
|
@ -524,15 +528,19 @@ public class TestRowProcessorEndpoint {
|
||||||
for (int i = 0; i < kvs.size(); ++i) {
|
for (int i = 0; i < kvs.size(); ++i) {
|
||||||
for (Cell kv : kvs.get(i)) {
|
for (Cell kv : kvs.get(i)) {
|
||||||
// Delete from the current row and add to the other row
|
// Delete from the current row and add to the other row
|
||||||
|
Delete d = new Delete(rows[i]);
|
||||||
KeyValue kvDelete =
|
KeyValue kvDelete =
|
||||||
new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
|
new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
|
||||||
kv.getTimestamp(), KeyValue.Type.Delete);
|
kv.getTimestamp(), KeyValue.Type.Delete);
|
||||||
|
d.addDeleteMarker(kvDelete);
|
||||||
|
Put p = new Put(rows[1 - i]);
|
||||||
KeyValue kvAdd =
|
KeyValue kvAdd =
|
||||||
new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
|
new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
|
||||||
now, CellUtil.cloneValue(kv));
|
now, CellUtil.cloneValue(kv));
|
||||||
mutations.add(kvDelete);
|
p.add(kvAdd);
|
||||||
|
mutations.add(d);
|
||||||
walEdit.add(kvDelete);
|
walEdit.add(kvDelete);
|
||||||
mutations.add(kvAdd);
|
mutations.add(p);
|
||||||
walEdit.add(kvAdd);
|
walEdit.add(kvAdd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -584,7 +592,7 @@ public class TestRowProcessorEndpoint {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(long now, HRegion region,
|
public void process(long now, HRegion region,
|
||||||
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
|
List<Mutation> mutations, WALEdit walEdit) throws IOException {
|
||||||
try {
|
try {
|
||||||
// Sleep for a long time so it timeout
|
// Sleep for a long time so it timeout
|
||||||
Thread.sleep(100 * 1000L);
|
Thread.sleep(100 * 1000L);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LA
|
||||||
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
|
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
|
||||||
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
|
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.Increment;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsResponse;
|
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsResponse;
|
||||||
|
@ -862,6 +864,45 @@ public class TestVisibilityLabels {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMutateRow() throws Exception {
|
||||||
|
final byte[] qual2 = Bytes.toBytes("qual2");
|
||||||
|
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor col = new HColumnDescriptor(fam);
|
||||||
|
desc.addFamily(col);
|
||||||
|
TEST_UTIL.getHBaseAdmin().createTable(desc);
|
||||||
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
|
try {
|
||||||
|
Put p1 = new Put(row1);
|
||||||
|
p1.add(fam, qual, value);
|
||||||
|
p1.setCellVisibility(new CellVisibility(CONFIDENTIAL));
|
||||||
|
|
||||||
|
Put p2 = new Put(row1);
|
||||||
|
p2.add(fam, qual2, value);
|
||||||
|
p2.setCellVisibility(new CellVisibility(SECRET));
|
||||||
|
|
||||||
|
RowMutations rm = new RowMutations(row1);
|
||||||
|
rm.add(p1);
|
||||||
|
rm.add(p2);
|
||||||
|
|
||||||
|
table.mutateRow(rm);
|
||||||
|
|
||||||
|
Get get = new Get(row1);
|
||||||
|
get.setAuthorizations(new Authorizations(CONFIDENTIAL));
|
||||||
|
Result result = table.get(get);
|
||||||
|
assertTrue(result.containsColumn(fam, qual));
|
||||||
|
assertFalse(result.containsColumn(fam, qual2));
|
||||||
|
|
||||||
|
get.setAuthorizations(new Authorizations(SECRET));
|
||||||
|
result = table.get(get);
|
||||||
|
assertFalse(result.containsColumn(fam, qual));
|
||||||
|
assertTrue(result.containsColumn(fam, qual2));
|
||||||
|
} finally {
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static HTable createTableAndWriteDataWithLabels(TableName tableName, String... labelExps)
|
private static HTable createTableAndWriteDataWithLabels(TableName tableName, String... labelExps)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
HTable table = null;
|
HTable table = null;
|
||||||
|
|
Loading…
Reference in New Issue