HBASE-13833 LoadIncrementalHFile.doBulkLoad(Path,HTable) doesn't handle unmanaged connections when using SecureBulkLoad

This commit is contained in:
Nick Dimiduk 2015-06-14 15:44:08 -07:00
parent e949f0c6b0
commit 35818ad2cd
2 changed files with 72 additions and 43 deletions

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -291,15 +292,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws TableNotFoundException, IOException
{
Admin admin = null;
Table t = table;
Connection conn = table.getConnection();
boolean closeConnWhenFinished = false;
try {
try {
admin = table.getConnection().getAdmin();
} catch (NeedUnmanagedConnectionException ex) {
admin = new HBaseAdmin(table.getConfiguration());
if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
LOG.warn("managed connection cannot be used for bulkload. Creating unmanaged connection.");
// can only use unmanaged connections from here on out.
conn = ConnectionFactory.createConnection(table.getConfiguration());
t = conn.getTable(table.getName());
closeConnWhenFinished = true;
if (conn instanceof ClusterConnection && ((ClusterConnection) conn).isManaged()) {
throw new RuntimeException("Failed to create unmanaged connection.");
}
admin = conn.getAdmin();
} else {
admin = conn.getAdmin();
}
try (RegionLocator rl = conn.getRegionLocator(t.getName())) {
doBulkLoad(hfofDir, admin, t, rl);
}
doBulkLoad(hfofDir, admin, table, table.getRegionLocator());
} finally {
admin.close();
if (admin != null) admin.close();
if (closeConnWhenFinished) {
t.close();
conn.close();
}
}
}

View File

@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
@ -232,47 +234,56 @@ public class TestLoadIncrementalHFiles {
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
}
int expectedRows = hfileIdx * 1000;
for (boolean managed : new boolean[] { true, false }) {
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
if (preCreateTable) {
util.getHBaseAdmin().createTable(htd, tableSplitKeys);
}
final TableName tableName = htd.getTableName();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
String [] args= {dir.toString(), tableName.toString()};
loader.run(args);
Table table = new HTable(util.getConfiguration(), tableName);
try {
assertEquals(expectedRows, util.countRows(table));
} finally {
table.close();
}
// verify staging folder has been cleaned up
Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
if(fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath);
for(FileStatus file : files) {
assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
file.getPath().getName() != "DONOTERASE");
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
}
}
int expectedRows = hfileIdx * 1000;
util.deleteTable(tableName);
if (preCreateTable) {
util.getHBaseAdmin().createTable(htd, tableSplitKeys);
}
final TableName tableName = htd.getTableName();
if (!util.getHBaseAdmin().tableExists(tableName)) {
util.getHBaseAdmin().createTable(htd);
}
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
if (managed) {
try (HTable table = new HTable(util.getConfiguration(), tableName)) {
loader.doBulkLoad(dir, table);
assertEquals(expectedRows, util.countRows(table));
}
} else {
try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
HTable table = (HTable) conn.getTable(tableName)) {
loader.doBulkLoad(dir, table);
}
}
// verify staging folder has been cleaned up
Path stagingBasePath = SecureBulkLoadUtil.getBaseStagingDir(util.getConfiguration());
if (fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath);
for (FileStatus file : files) {
assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
file.getPath().getName() != "DONOTERASE");
}
}
util.deleteTable(tableName);
}
}
/**