HBASE-26004: port HBASE-26001 (cell level tags invisible in atomic operations when access control is on)to branch-1 (#3387)
Signed-off-by: Reid Chan <reidchan@apache.org>
This commit is contained in:
parent
2e24bad826
commit
5263b8cf40
|
@ -117,7 +117,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.util.SimpleMutableByteRange;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -1947,68 +1946,36 @@ public class AccessController extends BaseMasterAndRegionObserver
|
|||
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
|
||||
// If the HFile version is insufficient to persist tags, we won't have any
|
||||
// work to do here
|
||||
if (!cellFeaturesEnabled) {
|
||||
if (!cellFeaturesEnabled || mutation.getACL() == null) {
|
||||
return newCell;
|
||||
}
|
||||
|
||||
// Collect any ACLs from the old cell
|
||||
// As Increment and Append operations have already copied the tags of oldCell to the newCell,
|
||||
// there is no need to rewrite them again. Just extract non-acl tags of newCell if we need to
|
||||
// add a new acl tag for the cell. Actually, oldCell is useless here.
|
||||
List<Tag> tags = Lists.newArrayList();
|
||||
ListMultimap<String,Permission> perms = ArrayListMultimap.create();
|
||||
if (oldCell != null) {
|
||||
if (newCell != null) {
|
||||
// Save an object allocation where we can
|
||||
if (oldCell.getTagsLength() > 0) {
|
||||
Iterator<Tag> tagIterator = CellUtil.tagsIterator(oldCell.getTagsArray(),
|
||||
oldCell.getTagsOffset(), oldCell.getTagsLength());
|
||||
if (newCell.getTagsLength() > 0) {
|
||||
Iterator<Tag> tagIterator = CellUtil
|
||||
.tagsIterator(newCell.getTagsArray(), newCell.getTagsOffset(), newCell.getTagsLength());
|
||||
while (tagIterator.hasNext()) {
|
||||
Tag tag = tagIterator.next();
|
||||
if (tag.getType() != AccessControlLists.ACL_TAG_TYPE) {
|
||||
// Not an ACL tag, just carry it through
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Carrying forward tag from " + oldCell + ": type " + tag.getType() +
|
||||
LOG.trace("Carrying forward tag from " + newCell + ": type " + tag.getType() +
|
||||
" length " + tag.getTagLength());
|
||||
}
|
||||
tags.add(tag);
|
||||
} else {
|
||||
// Merge the perms from the older ACL into the current permission set
|
||||
// TODO: The efficiency of this can be improved. Don't build just to unpack
|
||||
// again, use the builder
|
||||
AccessControlProtos.UsersAndPermissions.Builder builder =
|
||||
AccessControlProtos.UsersAndPermissions.newBuilder();
|
||||
ProtobufUtil.mergeFrom(builder, tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
|
||||
ListMultimap<String,Permission> kvPerms =
|
||||
ProtobufUtil.toUsersAndPermissions(builder.build());
|
||||
perms.putAll(kvPerms);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Do we have an ACL on the operation?
|
||||
byte[] aclBytes = mutation.getACL();
|
||||
if (aclBytes != null) {
|
||||
// Yes, use it
|
||||
tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, aclBytes));
|
||||
} else {
|
||||
// No, use what we carried forward
|
||||
if (perms != null) {
|
||||
// TODO: If we collected ACLs from more than one tag we may have a
|
||||
// List<Permission> of size > 1, this can be collapsed into a single
|
||||
// Permission
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Carrying forward ACLs from " + oldCell + ": " + perms);
|
||||
}
|
||||
tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE,
|
||||
ProtobufUtil.toUsersAndPermissions(perms).toByteArray()));
|
||||
}
|
||||
}
|
||||
|
||||
// If we have no tags to add, just return
|
||||
if (tags.isEmpty()) {
|
||||
return newCell;
|
||||
}
|
||||
|
||||
Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
|
||||
return rewriteCell;
|
||||
// We have checked mutation ACL not null.
|
||||
tags.add(new Tag(AccessControlLists.ACL_TAG_TYPE, mutation.getACL()));
|
||||
return new TagRewriteCell(newCell, Tag.fromList(tags));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,210 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TestFromClientSide;
|
||||
import org.apache.hadoop.hbase.security.access.AccessController;
|
||||
import org.apache.hadoop.hbase.security.access.Permission;
|
||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
/**
|
||||
* Test coprocessor methods
|
||||
* {@link RegionObserver#postMutationBeforeWAL(ObserverContext, RegionObserver.MutationType,
|
||||
* Mutation, Cell, Cell)}.
|
||||
* This method will change the cells which will be applied to
|
||||
* memstore and WAL. So add unit test for the case which change the cell's tags .
|
||||
*/
|
||||
@Category({ CoprocessorTests.class, MediumTests.class })
|
||||
public class TestPostMutationBeforeWAL {
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestFromClientSide.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static Connection connection;
|
||||
|
||||
private static final byte [] ROW = Bytes.toBytes("row");
|
||||
private static final String CF1 = "cf1";
|
||||
private static final byte[] CF1_BYTES = Bytes.toBytes(CF1);
|
||||
private static final String CF2 = "cf2";
|
||||
private static final byte[] CF2_BYTES = Bytes.toBytes(CF2);
|
||||
private static final byte[] CQ1 = Bytes.toBytes("cq1");
|
||||
private static final byte[] CQ2 = Bytes.toBytes("cq2");
|
||||
private static final byte[] VALUE = Bytes.toBytes("value");
|
||||
private static final byte[] VALUE2 = Bytes.toBytes("valuevalue");
|
||||
private static final String USER = "User";
|
||||
private static final Permission PERMS = new Permission(Permission.Action.READ);
|
||||
|
||||
@BeforeClass
|
||||
public static void setupBeforeClass() throws Exception {
|
||||
UTIL.startMiniCluster();
|
||||
connection = UTIL.getConnection();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
connection.close();
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private void createTableWithCoprocessor(TableName tableName, String coprocessor)
|
||||
throws IOException {
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
|
||||
tableDesc.addFamily(new HColumnDescriptor(CF1_BYTES));
|
||||
tableDesc.addFamily(new HColumnDescriptor(CF2_BYTES));
|
||||
tableDesc.addCoprocessor(coprocessor);
|
||||
connection.getAdmin().createTable(tableDesc);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementTTLWithACLTag() throws Exception {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName());
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
// Increment without TTL
|
||||
Increment firstIncrement = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1)
|
||||
.setACL(USER, PERMS);
|
||||
Result result = table.increment(firstIncrement);
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
|
||||
|
||||
// Check if the new cell can be read
|
||||
Get get = new Get(ROW).addColumn(CF1_BYTES, CQ1);
|
||||
result = table.get(get);
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
|
||||
|
||||
// Increment with TTL
|
||||
Increment secondIncrement = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1).setTTL(1000)
|
||||
.setACL(USER, PERMS);
|
||||
result = table.increment(secondIncrement);
|
||||
|
||||
// We should get value 2 here
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(2, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
|
||||
|
||||
// Wait 4s to let the second increment expire
|
||||
Thread.sleep(4000);
|
||||
get = new Get(ROW).addColumn(CF1_BYTES, CQ1);
|
||||
result = table.get(get);
|
||||
|
||||
// The value should revert to 1
|
||||
assertEquals(1, result.size());
|
||||
assertEquals(1, Bytes.toLong(result.getValue(CF1_BYTES, CQ1)));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendTTLWithACLTag() throws Exception {
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
createTableWithCoprocessor(tableName, ChangeCellWithACLTagObserver.class.getName());
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
// Append without TTL
|
||||
Append firstAppend = new Append(ROW).add(CF1_BYTES, CQ2, VALUE).setACL(USER, PERMS);
|
||||
Result result = table.append(firstAppend);
|
||||
assertEquals(1, result.size());
|
||||
assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
|
||||
|
||||
// Check if the new cell can be read
|
||||
Get get = new Get(ROW).addColumn(CF1_BYTES, CQ2);
|
||||
result = table.get(get);
|
||||
assertEquals(1, result.size());
|
||||
assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
|
||||
|
||||
// Append with TTL
|
||||
Append secondAppend = new Append(ROW).add(CF1_BYTES, CQ2, VALUE).setTTL(1000)
|
||||
.setACL(USER, PERMS);
|
||||
result = table.append(secondAppend);
|
||||
|
||||
// We should get "valuevalue""
|
||||
assertEquals(1, result.size());
|
||||
assertTrue(Bytes.equals(VALUE2, result.getValue(CF1_BYTES, CQ2)));
|
||||
|
||||
// Wait 4s to let the second append expire
|
||||
Thread.sleep(4000);
|
||||
get = new Get(ROW).addColumn(CF1_BYTES, CQ2);
|
||||
result = table.get(get);
|
||||
|
||||
// The value should revert to "value"
|
||||
assertEquals(1, result.size());
|
||||
assertTrue(Bytes.equals(VALUE, result.getValue(CF1_BYTES, CQ2)));
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean checkAclTag(byte[] acl, Cell cell) {
|
||||
Iterator<Tag> iter = CellUtil.tagsIterator(cell.getTagsArray(),
|
||||
cell.getTagsOffset(), cell.getTagsLength());
|
||||
while (iter.hasNext()) {
|
||||
Tag tag = iter.next();
|
||||
if (tag.getType() == TagType.ACL_TAG_TYPE) {
|
||||
return Bytes.equals(acl, tag.getValue());
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static class ChangeCellWithACLTagObserver extends AccessController {
|
||||
|
||||
@Override
|
||||
public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
MutationType mutationType, Mutation mutation, Cell oldCell, Cell newCell)
|
||||
throws IOException {
|
||||
Cell result = super.postMutationBeforeWAL(ctx, mutationType, mutation, oldCell, newCell);
|
||||
if (mutation.getACL() != null && !checkAclTag(mutation.getACL(), result)) {
|
||||
throw new DoNotRetryIOException("Unmatched ACL tag.");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue