HBASE-21356 bulkLoadHFile API should ensure that rs has the source hfile's write permissionls

This commit is contained in:
huzheng 2018-10-22 18:04:38 +08:00
parent 65d698439f
commit 2173770305
2 changed files with 83 additions and 35 deletions

View File

@ -54,6 +54,7 @@ import java.util.stream.LongStream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
@ -780,8 +781,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
try { try {
LOG.info("Validating hfile at " + srcPath + " for inclusion in " LOG.info("Validating hfile at " + srcPath + " for inclusion in "
+ "store " + this + " region " + this.getRegionInfo().getRegionNameAsString()); + "store " + this + " region " + this.getRegionInfo().getRegionNameAsString());
reader = HFile.createReader(srcPath.getFileSystem(conf), srcPath, cacheConf, FileSystem srcFs = srcPath.getFileSystem(conf);
isPrimaryReplicaStore(), conf); srcFs.access(srcPath, FsAction.READ_WRITE);
reader = HFile.createReader(srcFs, srcPath, cacheConf, isPrimaryReplicaStore(), conf);
reader.loadFileInfo(); reader.loadFileInfo();
Optional<byte[]> firstKey = reader.getFirstRowKey(); Optional<byte[]> firstKey = reader.getFirstRowKey();

View File

@ -150,6 +150,7 @@ public class TestAccessController extends SecureTestUtil {
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAccessController.class); HBaseClassTestRule.forClass(TestAccessController.class);
private static final FsPermission FS_PERMISSION_ALL = FsPermission.valueOf("-rwxrwxrwx");
private static final Logger LOG = LoggerFactory.getLogger(TestAccessController.class); private static final Logger LOG = LoggerFactory.getLogger(TestAccessController.class);
private static TableName TEST_TABLE = TableName.valueOf("testtable1"); private static TableName TEST_TABLE = TableName.valueOf("testtable1");
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -971,7 +972,7 @@ public class TestAccessController extends SecureTestUtil {
fs.mkdirs(dir); fs.mkdirs(dir);
// need to make it globally writable // need to make it globally writable
// so users creating HFiles have write permissions // so users creating HFiles have write permissions
fs.setPermission(dir, FsPermission.valueOf("-rwxrwxrwx")); fs.setPermission(dir, FS_PERMISSION_ALL);
AccessTestAction bulkLoadAction = new AccessTestAction() { AccessTestAction bulkLoadAction = new AccessTestAction() {
@Override @Override
@ -982,9 +983,8 @@ public class TestAccessController extends SecureTestUtil {
byte[][][] hfileRanges = { { { (byte) 0 }, { (byte) 9 } } }; byte[][][] hfileRanges = { { { (byte) 0 }, { (byte) 9 } } };
Path bulkLoadBasePath = new Path(dir, new Path(User.getCurrent().getName())); Path bulkLoadBasePath = new Path(dir, new Path(User.getCurrent().getName()));
new BulkLoadHelper(bulkLoadBasePath).bulkLoadHFile(TEST_TABLE, TEST_FAMILY, new BulkLoadHelper(bulkLoadBasePath).initHFileData(TEST_FAMILY, TEST_QUALIFIER,
TEST_QUALIFIER, hfileRanges, numRows); hfileRanges, numRows, FS_PERMISSION_ALL).bulkLoadHFile(TEST_TABLE);
return null; return null;
} }
}; };
@ -1002,6 +1002,51 @@ public class TestAccessController extends SecureTestUtil {
} }
} }
private class BulkLoadAccessTestAction implements AccessTestAction {
private FsPermission filePermission;
private Path testDataDir;
public BulkLoadAccessTestAction(FsPermission perm, Path testDataDir) {
this.filePermission = perm;
this.testDataDir = testDataDir;
}
@Override
public Object run() throws Exception {
FileSystem fs = TEST_UTIL.getTestFileSystem();
fs.mkdirs(testDataDir);
fs.setPermission(testDataDir, FS_PERMISSION_ALL);
// Making the assumption that the test table won't split between the range
byte[][][] hfileRanges = { { { (byte) 0 }, { (byte) 9 } } };
Path bulkLoadBasePath = new Path(testDataDir, new Path(User.getCurrent().getName()));
new BulkLoadHelper(bulkLoadBasePath)
.initHFileData(TEST_FAMILY, TEST_QUALIFIER, hfileRanges, 3, filePermission)
.bulkLoadHFile(TEST_TABLE);
return null;
}
}
@Test
public void testBulkLoadWithoutWritePermission() throws Exception {
// Use the USER_CREATE to initialize the source directory.
Path testDataDir0 = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadWithoutWritePermission0");
Path testDataDir1 = TEST_UTIL.getDataTestDirOnTestFS("testBulkLoadWithoutWritePermission1");
AccessTestAction bulkLoadAction1 =
new BulkLoadAccessTestAction(FsPermission.valueOf("-r-xr-xr-x"), testDataDir0);
AccessTestAction bulkLoadAction2 =
new BulkLoadAccessTestAction(FS_PERMISSION_ALL, testDataDir1);
// Test the incorrect case.
BulkLoadHelper.setPermission(TEST_UTIL.getTestFileSystem(),
TEST_UTIL.getTestFileSystem().getWorkingDirectory(), FS_PERMISSION_ALL);
try {
USER_CREATE.runAs(bulkLoadAction1);
fail("Should fail because the hbase user has no write permission on hfiles.");
} catch (IOException e) {
}
// Ensure the correct case.
USER_CREATE.runAs(bulkLoadAction2);
}
public static class BulkLoadHelper { public static class BulkLoadHelper {
private final FileSystem fs; private final FileSystem fs;
private final Path loadPath; private final Path loadPath;
@ -1017,64 +1062,65 @@ public class TestAccessController extends SecureTestUtil {
private void createHFile(Path path, private void createHFile(Path path,
byte[] family, byte[] qualifier, byte[] family, byte[] qualifier,
byte[] startKey, byte[] endKey, int numRows) throws IOException { byte[] startKey, byte[] endKey, int numRows) throws IOException {
HFile.Writer writer = null; HFile.Writer writer = null;
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
try { try {
HFileContext context = new HFileContextBuilder().build(); HFileContext context = new HFileContextBuilder().build();
writer = HFile.getWriterFactory(conf, new CacheConfig(conf)) writer = HFile.getWriterFactory(conf, new CacheConfig(conf)).withPath(fs, path)
.withPath(fs, path) .withFileContext(context).create();
.withFileContext(context)
.create();
// subtract 2 since numRows doesn't include boundary keys // subtract 2 since numRows doesn't include boundary keys
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, true, numRows-2)) { for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, true, numRows - 2)) {
KeyValue kv = new KeyValue(key, family, qualifier, now, key); KeyValue kv = new KeyValue(key, family, qualifier, now, key);
writer.append(kv); writer.append(kv);
} }
} finally { } finally {
if(writer != null) if (writer != null) {
writer.close(); writer.close();
}
} }
} }
private void bulkLoadHFile( private BulkLoadHelper initHFileData(byte[] family, byte[] qualifier, byte[][][] hfileRanges,
TableName tableName, int numRowsPerRange, FsPermission filePermission) throws Exception {
byte[] family,
byte[] qualifier,
byte[][][] hfileRanges,
int numRowsPerRange) throws Exception {
Path familyDir = new Path(loadPath, Bytes.toString(family)); Path familyDir = new Path(loadPath, Bytes.toString(family));
fs.mkdirs(familyDir); fs.mkdirs(familyDir);
int hfileIdx = 0; int hfileIdx = 0;
List<Path> hfiles = new ArrayList<>();
for (byte[][] range : hfileRanges) { for (byte[][] range : hfileRanges) {
byte[] from = range[0]; byte[] from = range[0];
byte[] to = range[1]; byte[] to = range[1];
createHFile(new Path(familyDir, "hfile_"+(hfileIdx++)), Path hfile = new Path(familyDir, "hfile_" + (hfileIdx++));
family, qualifier, from, to, numRowsPerRange); hfiles.add(hfile);
createHFile(hfile, family, qualifier, from, to, numRowsPerRange);
} }
//set global read so RegionServer can move it // set global read so RegionServer can move it
setPermission(loadPath, FsPermission.valueOf("-rwxrwxrwx")); setPermission(fs, loadPath, FS_PERMISSION_ALL);
// Ensure the file permission as requested.
for (Path hfile : hfiles) {
setPermission(fs, hfile, filePermission);
}
return this;
}
private void bulkLoadHFile(TableName tableName) throws Exception {
try (Connection conn = ConnectionFactory.createConnection(conf); try (Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin(); Admin admin = conn.getAdmin();
RegionLocator locator = conn.getRegionLocator(tableName); RegionLocator locator = conn.getRegionLocator(tableName);
Table table = conn.getTable(tableName)) { Table table = conn.getTable(tableName)) {
TEST_UTIL.waitUntilAllRegionsAssigned(tableName); TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(loadPath, admin, table, locator); loader.doBulkLoad(loadPath, admin, table, locator);
} }
} }
public void setPermission(Path dir, FsPermission perm) throws IOException { private static void setPermission(FileSystem fs, Path dir, FsPermission perm)
if(!fs.getFileStatus(dir).isDirectory()) { throws IOException {
fs.setPermission(dir,perm); if (!fs.getFileStatus(dir).isDirectory()) {
} fs.setPermission(dir, perm);
else { } else {
for(FileStatus el : fs.listStatus(dir)) { for (FileStatus el : fs.listStatus(dir)) {
fs.setPermission(el.getPath(), perm); fs.setPermission(el.getPath(), perm);
setPermission(el.getPath() , perm); setPermission(fs, el.getPath(), perm);
} }
} }
} }