diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java index 38e0c0f49ba..9e607c0ac22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -38,7 +38,15 @@ implements RowProcessor { } @Override - public void postProcess(HRegion region, WALEdit walEdit) throws IOException { + public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { + } + + @Override + public void postBatchMutate(HRegion region) throws IOException { + } + + @Override + public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 7c940ad562c..6bce6a857dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; @@ -4902,7 +4903,7 @@ public class HRegion implements HeapSize { // , Writable{ long now = EnvironmentEdgeManager.currentTimeMillis(); doProcessRowWithTimeout( processor, now, this, null, null, timeout); - processor.postProcess(this, walEdit); + processor.postProcess(this, walEdit, true); } catch (IOException e) { throw e; } finally { @@ -4916,7 +4917,7 @@ public class HRegion implements HeapSize { // , Writable{ boolean walSyncSuccessful = false; List acquiredRowLocks; long addedSize = 0; - List mutations = new ArrayList(); + List mutations = new ArrayList(); List memstoreCells = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); long mvccNum = 0; @@ -4931,6 +4932,7 @@ public class HRegion implements HeapSize { // , Writable{ // 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size()); locked = true; + // Get a mvcc write number mvccNum = MultiVersionConsistencyControl.getPreAssignedWriteNumber(this.sequenceId); long now = EnvironmentEdgeManager.currentTimeMillis(); @@ -4941,23 +4943,28 @@ public class HRegion implements HeapSize { // , Writable{ processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { - // 5. Get a mvcc write number + // 5. Start mvcc transaction writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - // 6. Apply to memstore - for (KeyValue kv : mutations) { - kv.setSequenceId(mvccNum); - Store store = getStore(kv); - if (store == null) { - checkFamily(CellUtil.cloneFamily(kv)); - // unreachable + // 6. Call the preBatchMutate hook + processor.preBatchMutate(this, walEdit); + // 7. Apply to memstore + for (Mutation m : mutations) { + for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current()); + kv.setSequenceId(mvccNum); + Store store = getStore(kv); + if (store == null) { + checkFamily(CellUtil.cloneFamily(kv)); + // unreachable + } + Pair ret = store.add(kv); + addedSize += ret.getFirst(); + memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); } - Pair ret = store.add(kv); - addedSize += ret.getFirst(); - memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); } long txid = 0; - // 7. Append no sync + // 8. Append no sync if (!walEdit.isEmpty()) { walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, @@ -4971,31 +4978,36 @@ public class HRegion implements HeapSize { // , Writable{ walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } - // 8. Release region lock + // 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // 9. Release row lock(s) + // 10. Release row lock(s) releaseRowLocks(acquiredRowLocks); - // 10. Sync edit log + // 11. Sync edit log if (txid != 0) { syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); } walSyncSuccessful = true; + // 12. call postBatchMutate hook + processor.postBatchMutate(this); } } finally { if (!mutations.isEmpty() && !walSyncSuccessful) { LOG.warn("Wal sync failed. Roll back " + mutations.size() + " memstore keyvalues for row(s):" + StringUtils.byteToHexString( processor.getRowsToLock().iterator().next()) + "..."); - for (KeyValue kv : mutations) { - getStore(kv).rollback(kv); + for (Mutation m : mutations) { + for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current()); + getStore(kv).rollback(kv); + } } } - // 11. Roll mvcc forward + // 13. Roll mvcc forward if (writeEntry != null) { mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } @@ -5006,8 +5018,8 @@ public class HRegion implements HeapSize { // , Writable{ releaseRowLocks(acquiredRowLocks); } - // 12. Run post-process hook - processor.postProcess(this, walEdit); + // 14. Run post-process hook + processor.postProcess(this, walEdit, walSyncSuccessful); } catch (IOException e) { throw e; @@ -5023,7 +5035,7 @@ public class HRegion implements HeapSize { // , Writable{ private void doProcessRowWithTimeout(final RowProcessor processor, final long now, final HRegion region, - final List mutations, + final List mutations, final WALEdit walEdit, final long timeout) throws IOException { // Short circuit the no time bound case. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java index 9cfa3265b63..a3fc21e4537 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -42,6 +43,7 @@ class MultiRowMutationProcessor extends BaseRowProcessor { Collection rowsToLock; Collection mutations; + MiniBatchOperationInProgress miniBatch; MultiRowMutationProcessor(Collection mutations, Collection rowsToLock) { @@ -67,11 +69,11 @@ MultiRowMutationProcessorResponse> { @Override public void process(long now, HRegion region, - List mutationKvs, + List mutationsToApply, WALEdit walEdit) throws IOException { byte[] byteNow = Bytes.toBytes(now); - // Check mutations and apply edits to a single WALEdit - for (Mutation m : mutations) { + // Check mutations + for (Mutation m : this.mutations) { if (m instanceof Put) { Map> familyMap = m.getFamilyCellMap(); region.checkFamilies(familyMap.keySet()); @@ -82,18 +84,18 @@ MultiRowMutationProcessorResponse> { region.prepareDelete(d); region.prepareDeleteTimestamps(d, d.getFamilyCellMap(), byteNow); } else { - throw new DoNotRetryIOException( - "Action must be Put or Delete. But was: " + throw new DoNotRetryIOException("Action must be Put or Delete. But was: " + m.getClass().getName()); } - for (List cells: m.getFamilyCellMap().values()) { + mutationsToApply.add(m); + } + // Apply edits to a single WALEdit + for (Mutation m : mutations) { + for (List cells : m.getFamilyCellMap().values()) { boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL; for (Cell cell : cells) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - mutationKvs.add(kv); - if (writeToWAL) { - walEdit.add(kv); - } + if (writeToWAL) walEdit.add(kv); } } } @@ -122,7 +124,46 @@ MultiRowMutationProcessorResponse> { } @Override - public void postProcess(HRegion region, WALEdit walEdit) throws IOException { + public void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException { + // TODO we should return back the status of this hook run to HRegion so that those Mutations + // with OperationStatus as SUCCESS or FAILURE should not get applied to memstore. + RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + OperationStatus[] opStatus = new OperationStatus[mutations.size()]; + Arrays.fill(opStatus, OperationStatus.NOT_RUN); + WALEdit[] walEditsFromCP = new WALEdit[mutations.size()]; + if (coprocessorHost != null) { + miniBatch = new MiniBatchOperationInProgress( + mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0, + mutations.size()); + coprocessorHost.preBatchMutate(miniBatch); + } + // Apply edits to a single WALEdit + for (int i = 0; i < mutations.size(); i++) { + if (opStatus[i] == OperationStatus.NOT_RUN) { + // Other OperationStatusCode means that Mutation is already succeeded or failed in CP hook + // itself. No need to apply again to region + if (walEditsFromCP[i] != null) { + // Add the WALEdit created by CP hook + for (KeyValue walKv : walEditsFromCP[i].getKeyValues()) { + walEdit.add(walKv); + } + } + } + } + } + + @Override + public void postBatchMutate(HRegion region) throws IOException { + RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + if (coprocessorHost != null) { + assert miniBatch != null; + // Use the same miniBatch state used to call the preBatchMutate() + coprocessorHost.postBatchMutate(miniBatch); + } + } + + @Override + public void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException { RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); if (coprocessorHost != null) { for (Mutation m : mutations) { @@ -132,6 +173,12 @@ MultiRowMutationProcessorResponse> { coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability()); } } + // At the end call the CP hook postBatchMutateIndispensably + if (miniBatch != null) { + // Directly calling this hook, with out calling pre/postBatchMutate() when Processor do a + // read only process. Then no need to call this batch based CP hook also. + coprocessorHost.postBatchMutateIndispensably(miniBatch, success); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java index fb1bc3da467..2c4f076336a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -23,8 +23,8 @@ import java.util.List; import java.util.UUID; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import com.google.protobuf.Message; @@ -83,7 +83,7 @@ public interface RowProcessor { */ void process(long now, HRegion region, - List mutations, + List mutations, WALEdit walEdit) throws IOException; /** @@ -95,13 +95,31 @@ public interface RowProcessor { void preProcess(HRegion region, WALEdit walEdit) throws IOException; /** - * The hook to be executed after process(). + * The hook to be executed after the process() but before applying the Mutations to region. Also + * by the time this hook is been called, mvcc transaction is started. + * @param region + * @param walEdit the output WAL edits to apply to write ahead log + * @throws IOException + */ + void preBatchMutate(HRegion region, WALEdit walEdit) throws IOException; + + /** + * The hook to be executed after the process() and applying the Mutations to region. The + * difference of this one with {@link #postProcess(HRegion, WALEdit, boolean)} is this hook will + * be executed before the mvcc transaction completion. + * @param region + * @throws IOException + */ + void postBatchMutate(HRegion region) throws IOException; + + /** + * The hook to be executed after process() and applying the Mutations to region. * * @param region the HRegion * @param walEdit the output WAL edits to apply to write ahead log + * @param success true if batch operation is successful otherwise false. */ - void postProcess(HRegion region, WALEdit walEdit) throws IOException; - + void postProcess(HRegion region, WALEdit walEdit, boolean success) throws IOException; /** * @return The cluster ids that have the change. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java index 28db1aaf0b7..8a76967ec94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -41,9 +41,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient; @@ -328,7 +330,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit) throws IOException { // Scan current counter List kvs = new ArrayList(); Scan scan = new Scan(row, row); @@ -345,9 +347,11 @@ public class TestRowProcessorEndpoint { expectedCounter += 1; + Put p = new Put(row); KeyValue kv = new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter)); - mutations.add(kv); + p.add(kv); + mutations.add(p); walEdit.add(kv); // We can also inject some meta data to the walEdit @@ -410,7 +414,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit) throws IOException { List kvs = new ArrayList(); { // First scan to get friends of the person Scan scan = new Scan(row, row); @@ -494,7 +498,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit) throws IOException { // Override the time to avoid race-condition in the unit test caused by // inacurate timer on some machines @@ -524,15 +528,19 @@ public class TestRowProcessorEndpoint { for (int i = 0; i < kvs.size(); ++i) { for (Cell kv : kvs.get(i)) { // Delete from the current row and add to the other row + Delete d = new Delete(rows[i]); KeyValue kvDelete = new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp(), KeyValue.Type.Delete); + d.addDeleteMarker(kvDelete); + Put p = new Put(rows[1 - i]); KeyValue kvAdd = new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), now, CellUtil.cloneValue(kv)); - mutations.add(kvDelete); + p.add(kvAdd); + mutations.add(d); walEdit.add(kvDelete); - mutations.add(kvAdd); + mutations.add(p); walEdit.add(kvAdd); } } @@ -584,7 +592,7 @@ public class TestRowProcessorEndpoint { @Override public void process(long now, HRegion region, - List mutations, WALEdit walEdit) throws IOException { + List mutations, WALEdit walEdit) throws IOException { try { // Sleep for a long time so it timeout Thread.sleep(100 * 1000L); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java index 86f5c9815c7..06f52e767e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LA import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME; import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsResponse; @@ -862,6 +864,45 @@ public class TestVisibilityLabels { } } + @Test + public void testMutateRow() throws Exception { + final byte[] qual2 = Bytes.toBytes("qual2"); + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor col = new HColumnDescriptor(fam); + desc.addFamily(col); + TEST_UTIL.getHBaseAdmin().createTable(desc); + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + try { + Put p1 = new Put(row1); + p1.add(fam, qual, value); + p1.setCellVisibility(new CellVisibility(CONFIDENTIAL)); + + Put p2 = new Put(row1); + p2.add(fam, qual2, value); + p2.setCellVisibility(new CellVisibility(SECRET)); + + RowMutations rm = new RowMutations(row1); + rm.add(p1); + rm.add(p2); + + table.mutateRow(rm); + + Get get = new Get(row1); + get.setAuthorizations(new Authorizations(CONFIDENTIAL)); + Result result = table.get(get); + assertTrue(result.containsColumn(fam, qual)); + assertFalse(result.containsColumn(fam, qual2)); + + get.setAuthorizations(new Authorizations(SECRET)); + result = table.get(get); + assertFalse(result.containsColumn(fam, qual)); + assertTrue(result.containsColumn(fam, qual2)); + } finally { + table.close(); + } + } + private static HTable createTableAndWriteDataWithLabels(TableName tableName, String... labelExps) throws Exception { HTable table = null;