HBASE-11423 Visibility label and per cell ACL feature not working with HTable#mutateRow() and MultiRowMutationEndpoint. (Anoop)

This commit is contained in:
anoopsjohn 2014-07-11 12:15:48 +05:30
parent d289eecd2f
commit 77554df881
6 changed files with 181 additions and 47 deletions

View File

@ -38,7 +38,15 @@ implements RowProcessor<S,T> {
}
@Override
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
}
@Override
public void postBatchMutate(HRegion region) throws IOException {
}
@Override
public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
}
@Override

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -4902,7 +4903,7 @@ public class HRegion implements HeapSize { // , Writable{
long now = EnvironmentEdgeManager.currentTimeMillis();
doProcessRowWithTimeout(
processor, now, this, null, null, timeout);
processor.postProcess(this, walEdit);
processor.postProcess(this, walEdit, true);
} catch (IOException e) {
throw e;
} finally {
@ -4916,7 +4917,7 @@ public class HRegion implements HeapSize { // , Writable{
boolean walSyncSuccessful = false;
List<RowLock> acquiredRowLocks;
long addedSize = 0;
List<KeyValue> mutations = new ArrayList<KeyValue>();
List<Mutation> mutations = new ArrayList<Mutation>();
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0;
@ -4931,6 +4932,7 @@ public class HRegion implements HeapSize { // , Writable{
// 3. Region lock
lock(this.updatesLock.readLock(), acquiredRowLocks.size());
locked = true;
// Get a mvcc write number
mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId);
long now = EnvironmentEdgeManager.currentTimeMillis();
@ -4941,23 +4943,28 @@ public class HRegion implements HeapSize { // , Writable{
processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
// 5. Get a mvcc write number
// 5. Start mvcc transaction
writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum);
// 6. Apply to memstore
for (KeyValue kv : mutations) {
kv.setSequenceId(mvccNum);
Store store = getStore(kv);
if (store == null) {
checkFamily(CellUtil.cloneFamily(kv));
// unreachable
// 6. Call the preBatchMutate hook
processor.preBatchMutate(this, walEdit);
// 7. Apply to memstore
for (Mutation m : mutations) {
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
kv.setSequenceId(mvccNum);
Store store = getStore(kv);
if (store == null) {
checkFamily(CellUtil.cloneFamily(kv));
// unreachable
}
Pair<Long, Cell> ret = store.add(kv);
addedSize += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
}
Pair<Long, Cell> ret = store.add(kv);
addedSize += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
}
long txid = 0;
// 7. Append no sync
// 8. Append no sync
if (!walEdit.isEmpty()) {
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now,
@ -4971,31 +4978,36 @@ public class HRegion implements HeapSize { // , Writable{
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
}
// 8. Release region lock
// 9. Release region lock
if (locked) {
this.updatesLock.readLock().unlock();
locked = false;
}
// 9. Release row lock(s)
// 10. Release row lock(s)
releaseRowLocks(acquiredRowLocks);
// 10. Sync edit log
// 11. Sync edit log
if (txid != 0) {
syncOrDefer(txid, getEffectiveDurability(processor.useDurability()));
}
walSyncSuccessful = true;
// 12. call postBatchMutate hook
processor.postBatchMutate(this);
}
} finally {
if (!mutations.isEmpty() && !walSyncSuccessful) {
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
" memstore keyvalues for row(s):" + StringUtils.byteToHexString(
processor.getRowsToLock().iterator().next()) + "...");
for (KeyValue kv : mutations) {
getStore(kv).rollback(kv);
for (Mutation m : mutations) {
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
getStore(kv).rollback(kv);
}
}
}
// 11. Roll mvcc forward
// 13. Roll mvcc forward
if (writeEntry != null) {
mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey);
}
@ -5006,8 +5018,8 @@ public class HRegion implements HeapSize { // , Writable{
releaseRowLocks(acquiredRowLocks);
}
// 12. Run post-process hook
processor.postProcess(this, walEdit);
// 14. Run post-process hook
processor.postProcess(this, walEdit, walSyncSuccessful);
} catch (IOException e) {
throw e;
@ -5023,7 +5035,7 @@ public class HRegion implements HeapSize { // , Writable{
private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
final long now,
final HRegion region,
final List<KeyValue> mutations,
final List<Mutation> mutations,
final WALEdit walEdit,
final long timeout) throws IOException {
// Short circuit the no time bound case.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@ -42,6 +43,7 @@ class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcess
MultiRowMutationProcessorResponse> {
Collection<byte[]> rowsToLock;
Collection<Mutation> mutations;
MiniBatchOperationInProgress<Mutation> miniBatch;
MultiRowMutationProcessor(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock) {
@ -67,11 +69,11 @@ MultiRowMutationProcessorResponse> {
@Override
public void process(long now,
HRegion region,
List<KeyValue> mutationKvs,
List<Mutation> mutationsToApply,
WALEdit walEdit) throws IOException {
byte[] byteNow = Bytes.toBytes(now);
// Check mutations and apply edits to a single WALEdit
for (Mutation m : mutations) {
// Check mutations
for (Mutation m : this.mutations) {
if (m instanceof Put) {
Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
region.checkFamilies(familyMap.keySet());
@ -82,18 +84,18 @@ MultiRowMutationProcessorResponse> {
region.prepareDelete(d);
region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow);
} else {
throw new DoNotRetryIOException(
"Action must be Put or Delete. But was: "
throw new DoNotRetryIOException("Action must be Put or Delete. But was: "
+ m.getClass().getName());
}
for (List<Cell> cells: m.getFamilyCellMap().values()) {
mutationsToApply.add(m);
}
// Apply edits to a single WALEdit
for (Mutation m : mutations) {
for (List<Cell> cells : m.getFamilyCellMap().values()) {
boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
for (Cell cell : cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
mutationKvs.add(kv);
if (writeToWAL) {
walEdit.add(kv);
}
if (writeToWAL) walEdit.add(kv);
}
}
}
@ -122,7 +124,46 @@ MultiRowMutationProcessorResponse> {
}
@Override
public void postProcess(HRegion region, WALEdit walEdit) throws IOException {
public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException {
// TODO we should return back the status of this hook run to HRegion so that those Mutations
// with OperationStatus as SUCCESS or FAILURE should not get applied to memstore.
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
OperationStatus[] opStatus = new OperationStatus[mutations.size()];
Arrays.fill(opStatus, OperationStatus.NOT_RUN);
WALEdit[] walEditsFromCP = new WALEdit[mutations.size()];
if (coprocessorHost != null) {
miniBatch = new MiniBatchOperationInProgress<Mutation>(
mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
mutations.size());
coprocessorHost.preBatchMutate(miniBatch);
}
// Apply edits to a single WALEdit
for (int i = 0; i < mutations.size(); i++) {
if (opStatus[i] == OperationStatus.NOT_RUN) {
// Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook
// itself. No need to apply again to region
if (walEditsFromCP[i] != null) {
// Add the WALEdit created by CP hook
for (KeyValue walKv : walEditsFromCP[i].getKeyValues()) {
walEdit.add(walKv);
}
}
}
}
}
@Override
public void postBatchMutate(HRegion region) throws IOException {
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
if (coprocessorHost != null) {
assert miniBatch != null;
// Use the same miniBatch state used to call the preBatchMutate()
coprocessorHost.postBatchMutate(miniBatch);
}
}
@Override
public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException {
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
if (coprocessorHost != null) {
for (Mutation m : mutations) {
@ -132,6 +173,12 @@ MultiRowMutationProcessorResponse> {
coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
}
}
// At the end call the CP hook postBatchMutateIndispensably
if (miniBatch != null) {
// Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a
// read only process. Then no need to call this batch based CP hook also.
coprocessorHost.postBatchMutateIndispensably(miniBatch, success);
}
}
}

View File

@ -23,8 +23,8 @@ import java.util.List;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.protobuf.Message;
@ -83,7 +83,7 @@ public interface RowProcessor<S extends Message, T extends Message> {
*/
void process(long now,
HRegion region,
List<KeyValue> mutations,
List<Mutation> mutations,
WALEdit walEdit) throws IOException;
/**
@ -95,13 +95,31 @@ public interface RowProcessor<S extends Message, T extends Message> {
void preProcess(HRegion region, WALEdit walEdit) throws IOException;
/**
* The hook to be executed after process().
* The hook to be executed after the process() but before applying the Mutations to region. Also
* by the time this hook is been called, mvcc transaction is started.
* @param region
* @param walEdit the output WAL edits to apply to write ahead log
* @throws IOException
*/
void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException;
/**
* The hook to be executed after the process() and applying the Mutations to region. The
* difference of this one with {@link #postProcess(HRegion, WALEdit, boolean)} is this hook will
* be executed before the mvcc transaction completion.
* @param region
* @throws IOException
*/
void postBatchMutate(HRegion region) throws IOException;
/**
* The hook to be executed after process() and applying the Mutations to region.
*
* @param region the HRegion
* @param walEdit the output WAL edits to apply to write ahead log
* @param success true if batch operation is successful otherwise false.
*/
void postProcess(HRegion region, WALEdit walEdit) throws IOException;
void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException;
/**
* @return The cluster ids that have the change.

View File

@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
@ -328,7 +330,7 @@ public class TestRowProcessorEndpoint {
@Override
public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
List<Mutation> mutations, WALEdit walEdit) throws IOException {
// Scan current counter
List<Cell> kvs = new ArrayList<Cell>();
Scan scan = new Scan(row, row);
@ -345,9 +347,11 @@ public class TestRowProcessorEndpoint {
expectedCounter += 1;
Put p = new Put(row);
KeyValue kv =
new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
mutations.add(kv);
p.add(kv);
mutations.add(p);
walEdit.add(kv);
// We can also inject some meta data to the walEdit
@ -410,7 +414,7 @@ public class TestRowProcessorEndpoint {
@Override
public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
List<Mutation> mutations, WALEdit walEdit) throws IOException {
List<Cell> kvs = new ArrayList<Cell>();
{ // First scan to get friends of the person
Scan scan = new Scan(row, row);
@ -494,7 +498,7 @@ public class TestRowProcessorEndpoint {
@Override
public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
List<Mutation> mutations, WALEdit walEdit) throws IOException {
// Override the time to avoid race-condition in the unit test caused by
// inacurate timer on some machines
@ -524,15 +528,19 @@ public class TestRowProcessorEndpoint {
for (int i = 0; i < kvs.size(); ++i) {
for (Cell kv : kvs.get(i)) {
// Delete from the current row and add to the other row
Delete d = new Delete(rows[i]);
KeyValue kvDelete =
new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
kv.getTimestamp(), KeyValue.Type.Delete);
d.addDeleteMarker(kvDelete);
Put p = new Put(rows[1 - i]);
KeyValue kvAdd =
new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
now, CellUtil.cloneValue(kv));
mutations.add(kvDelete);
p.add(kvAdd);
mutations.add(d);
walEdit.add(kvDelete);
mutations.add(kvAdd);
mutations.add(p);
walEdit.add(kvAdd);
}
}
@ -584,7 +592,7 @@ public class TestRowProcessorEndpoint {
@Override
public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
List<Mutation> mutations, WALEdit walEdit) throws IOException {
try {
// Sleep for a long time so it timeout
Thread.sleep(100 * 1000L);

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LA
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsResponse;
@ -862,6 +864,45 @@ public class TestVisibilityLabels {
}
}
@Test
public void testMutateRow() throws Exception {
final byte[] qual2 = Bytes.toBytes("qual2");
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor col = new HColumnDescriptor(fam);
desc.addFamily(col);
TEST_UTIL.getHBaseAdmin().createTable(desc);
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
try {
Put p1 = new Put(row1);
p1.add(fam, qual, value);
p1.setCellVisibility(new CellVisibility(CONFIDENTIAL));
Put p2 = new Put(row1);
p2.add(fam, qual2, value);
p2.setCellVisibility(new CellVisibility(SECRET));
RowMutations rm = new RowMutations(row1);
rm.add(p1);
rm.add(p2);
table.mutateRow(rm);
Get get = new Get(row1);
get.setAuthorizations(new Authorizations(CONFIDENTIAL));
Result result = table.get(get);
assertTrue(result.containsColumn(fam, qual));
assertFalse(result.containsColumn(fam, qual2));
get.setAuthorizations(new Authorizations(SECRET));
result = table.get(get);
assertFalse(result.containsColumn(fam, qual));
assertTrue(result.containsColumn(fam, qual2));
} finally {
table.close();
}
}
private static HTable createTableAndWriteDataWithLabels(TableName tableName, String... labelExps)
throws Exception {
HTable table = null;