HBASE-10823 Resolve LATEST_TIMESTAMP to current server time before scanning for ACLs

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1587648 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2014-04-15 17:48:30 +00:00
parent 1a89a3fede
commit 6af961c0d3
4 changed files with 192 additions and 4 deletions

View File

@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.Permission.Action; import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SimpleByteRange; import org.apache.hadoop.hbase.util.SimpleByteRange;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -476,7 +477,7 @@ public class AccessController extends BaseRegionObserver
} }
private void requireCoveringPermission(String request, RegionCoprocessorEnvironment e, private void requireCoveringPermission(String request, RegionCoprocessorEnvironment e,
byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long timestamp, byte[] row, Map<byte[], ? extends Collection<?>> familyMap, long opTs,
boolean allVersions, Action...actions) throws IOException { boolean allVersions, Action...actions) throws IOException {
User user = getActiveUser(); User user = getActiveUser();
@ -511,9 +512,10 @@ public class AccessController extends BaseRegionObserver
// Table or CF permissions do not allow, enumerate the covered KVs. We // Table or CF permissions do not allow, enumerate the covered KVs. We
// can stop at the first which does not grant access. // can stop at the first which does not grant access.
int cellsChecked = 0; int cellsChecked = 0;
opTs = opTs != HConstants.LATEST_TIMESTAMP ? opTs : 0;
long latestCellTs = 0;
if (canPersistCellACLs) { if (canPersistCellACLs) {
Get get = new Get(row); Get get = new Get(row);
if (timestamp != HConstants.LATEST_TIMESTAMP) get.setTimeStamp(timestamp);
if (allVersions) { if (allVersions) {
get.setMaxVersions(); get.setMaxVersions();
} else { } else {
@ -546,6 +548,9 @@ public class AccessController extends BaseRegionObserver
} else { } else {
get.addColumn(col, CellUtil.cloneQualifier(cell)); get.addColumn(col, CellUtil.cloneQualifier(cell));
} }
if (cell.getTimestamp() != HConstants.LATEST_TIMESTAMP) {
latestCellTs = Math.max(latestCellTs, cell.getTimestamp());
}
} }
} }
} else { } else {
@ -553,6 +558,17 @@ public class AccessController extends BaseRegionObserver
entry.getValue().getClass().getName()); entry.getValue().getClass().getName());
} }
} }
// We want to avoid looking into the future. So, if the cells of the
// operation specify a timestamp, or the operation itself specifies a
// timestamp, then we use the maximum ts found. Otherwise, we bound
// the Get to the current server time. We add 1 to the timerange since
// the upper bound of a timerange is exclusive yet we need to examine
// any cells found there inclusively.
long latestTs = Math.max(opTs, latestCellTs);
if (latestTs == 0) {
latestTs = EnvironmentEdgeManager.currentTimeMillis();
}
get.setTimeRange(0, latestTs + 1);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Scanning for cells with " + get); LOG.trace("Scanning for cells with " + get);
} }

View File

@ -352,7 +352,7 @@ public class TableAuthManager {
try { try {
List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell); List<Permission> perms = AccessControlLists.getCellPermissionsForUser(user, cell);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Found perms for user " + user.getShortName() + " in cell " + LOG.trace("Perms for user " + user.getShortName() + " in cell " +
cell + ": " + perms); cell + ": " + perms);
} }
for (Permission p: perms) { for (Permission p: perms) {

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.TestTableName; import org.apache.hadoop.hbase.util.TestTableName;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -71,12 +72,16 @@ public class TestCellACLWithMultipleVersions extends SecureTestUtil {
private static final byte[] TEST_FAMILY = Bytes.toBytes("f1"); private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
private static final byte[] TEST_ROW = Bytes.toBytes("cellpermtest"); private static final byte[] TEST_ROW = Bytes.toBytes("cellpermtest");
private static final byte[] TEST_Q1 = Bytes.toBytes("q1"); private static final byte[] TEST_Q1 = Bytes.toBytes("q1");
private static final byte[] TEST_Q2 = Bytes.toBytes("q2");
private static final byte[] ZERO = Bytes.toBytes(0L); private static final byte[] ZERO = Bytes.toBytes(0L);
private static final byte[] ONE = Bytes.toBytes(1L);
private static final byte[] TWO = Bytes.toBytes(2L);
private static Configuration conf; private static Configuration conf;
private static User USER_OWNER; private static User USER_OWNER;
private static User USER_OTHER; private static User USER_OTHER;
private static User USER_OTHER2;
@BeforeClass @BeforeClass
public static void setupBeforeClass() throws Exception { public static void setupBeforeClass() throws Exception {
@ -112,6 +117,7 @@ public class TestCellACLWithMultipleVersions extends SecureTestUtil {
// create a set of test users // create a set of test users
USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]); USER_OWNER = User.createUserForTesting(conf, "owner", new String[0]);
USER_OTHER = User.createUserForTesting(conf, "other", new String[0]); USER_OTHER = User.createUserForTesting(conf, "other", new String[0]);
USER_OTHER2 = User.createUserForTesting(conf, "other2", new String[0]);
} }
@AfterClass @AfterClass
@ -349,6 +355,173 @@ public class TestCellACLWithMultipleVersions extends SecureTestUtil {
}); });
} }
@Test
public void testDeleteWithFutureTimestamp() throws Exception {
// Store two values, one in the future
verifyAllowed(new AccessTestAction() {
@Override
public Object run() throws Exception {
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
// Store read only ACL at a future time
Put p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q1,
EnvironmentEdgeManager.currentTimeMillis() + 1000000,
ZERO);
p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ));
t.put(p);
// Store a read write ACL without a timestamp, server will use current time
p = new Put(TEST_ROW).add(TEST_FAMILY, TEST_Q2, ONE);
p.setACL(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
Permission.Action.WRITE));
t.put(p);
} finally {
t.close();
}
return null;
}
}, USER_OWNER);
// Confirm stores are visible
AccessTestAction getQ1 = new AccessTestAction() {
@Override
public Object run() throws Exception {
Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY, TEST_Q1);
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
return t.get(get).listCells();
} finally {
t.close();
}
}
};
AccessTestAction getQ2 = new AccessTestAction() {
@Override
public Object run() throws Exception {
Get get = new Get(TEST_ROW).addColumn(TEST_FAMILY, TEST_Q2);
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
return t.get(get).listCells();
} finally {
t.close();
}
}
};
verifyAllowed(getQ1, USER_OWNER, USER_OTHER);
verifyAllowed(getQ2, USER_OWNER, USER_OTHER);
// Issue a DELETE for the family, should succeed because the future ACL is
// not considered
AccessTestAction deleteFamily = new AccessTestAction() {
@Override
public Object run() throws Exception {
Delete delete = new Delete(TEST_ROW).deleteFamily(TEST_FAMILY);
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
t.delete(delete);
} finally {
t.close();
}
return null;
}
};
verifyAllowed(deleteFamily, USER_OTHER);
// The future put should still exist
verifyAllowed(getQ1, USER_OWNER, USER_OTHER);
// The other put should be covered by the tombstone
verifyDenied(getQ2, USER_OTHER);
}
@Test
public void testCellPermissionsWithDeleteWithUserTs() throws Exception {
USER_OWNER.runAs(new AccessTestAction() {
@Override
public Object run() throws Exception {
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
// This version (TS = 123) with rw ACL for USER_OTHER and USER_OTHER2
Put p = new Put(TEST_ROW);
p.add(TEST_FAMILY, TEST_Q1, 123L, ZERO);
p.add(TEST_FAMILY, TEST_Q2, 123L, ZERO);
Map<String, Permission> perms = new HashMap<String, Permission>();
perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
Permission.Action.WRITE));
perms.put(USER_OTHER2.getShortName(), new Permission(Permission.Action.READ,
Permission.Action.WRITE));
p.setACL(perms);
t.put(p);
// This version (TS = 125) with rw ACL for USER_OTHER
p = new Put(TEST_ROW);
p.add(TEST_FAMILY, TEST_Q1, 125L, ONE);
p.add(TEST_FAMILY, TEST_Q2, 125L, ONE);
perms = new HashMap<String, Permission>();
perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
Permission.Action.WRITE));
p.setACL(perms);
t.put(p);
// This version (TS = 127) with rw ACL for USER_OTHER
p = new Put(TEST_ROW);
p.add(TEST_FAMILY, TEST_Q1, 127L, TWO);
p.add(TEST_FAMILY, TEST_Q2, 127L, TWO);
perms = new HashMap<String, Permission>();
perms.put(USER_OTHER.getShortName(), new Permission(Permission.Action.READ,
Permission.Action.WRITE));
p.setACL(perms);
t.put(p);
return null;
} finally {
t.close();
}
}
});
// USER_OTHER2 should be allowed to delete the column f1:q1 versions older than TS 124L
USER_OTHER2.runAs(new AccessTestAction() {
@Override
public Object run() throws Exception {
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
Delete d = new Delete(TEST_ROW, 124L);
d.deleteColumns(TEST_FAMILY, TEST_Q1);
t.delete(d);
} finally {
t.close();
}
return null;
}
});
// USER_OTHER2 should be allowed to delete the column f1:q2 versions older than TS 124L
USER_OTHER2.runAs(new AccessTestAction() {
@Override
public Object run() throws Exception {
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
Delete d = new Delete(TEST_ROW);
d.deleteColumns(TEST_FAMILY, TEST_Q2, 124L);
t.delete(d);
} finally {
t.close();
}
return null;
}
});
}
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
// Clean the _acl_ table // Clean the _acl_ table

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.TestTableName; import org.apache.hadoop.hbase.util.TestTableName;
import org.apache.log4j.Level; import org.apache.log4j.Level;