HBASE-3440 Clean out load_table.rb and make sure all roads lead to completebulkload tool
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1082246 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a8beafc5cf
commit
60c70403f8
|
@ -113,6 +113,8 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-2495 Allow record filtering with selected row key values in HBase
|
||||
Export (Subbu M Iyer via Stack)
|
||||
HBASE-3600 Update our jruby to 1.6.0
|
||||
HBASE-3440 Clean out load_table.rb and make sure all roads lead to
|
||||
completebulkload tool (Vidhyashankar Venkataraman via Stack)
|
||||
|
||||
TASK
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.util.Deque;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import java.util.TreeMap;
|
||||
|
||||
|
||||
/**
|
||||
* Tool to load the output of HFileOutputFormat into an existing table.
|
||||
|
@ -58,14 +64,21 @@ import org.apache.hadoop.util.ToolRunner;
|
|||
*/
|
||||
public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||
|
||||
static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
|
||||
private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
|
||||
private static final int TABLE_CREATE_MAX_RETRIES = 20;
|
||||
private static final long TABLE_CREATE_SLEEP = 60000;
|
||||
private HBaseAdmin hbAdmin;
|
||||
|
||||
public static String NAME = "completebulkload";
|
||||
|
||||
public LoadIncrementalHFiles(Configuration conf) {
|
||||
public LoadIncrementalHFiles(Configuration conf) throws Exception {
|
||||
super(conf);
|
||||
this.hbAdmin = new HBaseAdmin(conf);
|
||||
}
|
||||
|
||||
/* This constructor does not add HBase configuration.
|
||||
* Explicit addition is necessary. Do we need this constructor?
|
||||
*/
|
||||
public LoadIncrementalHFiles() {
|
||||
super();
|
||||
}
|
||||
|
@ -299,6 +312,125 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
return !HFile.isReservedFileInfoKey(key);
|
||||
}
|
||||
|
||||
private boolean doesTableExist(String tableName) throws Exception {
|
||||
return hbAdmin.tableExists(tableName);
|
||||
}
|
||||
|
||||
/*
|
||||
* Infers region boundaries for a new table.
|
||||
* Parameter:
|
||||
* bdryMap is a map between keys to an integer belonging to {+1, -1}
|
||||
* If a key is a start key of a file, then it maps to +1
|
||||
* If a key is an end key of a file, then it maps to -1
|
||||
* Algo:
|
||||
* 1) Poll on the keys in order:
|
||||
* a) Keep adding the mapped values to these keys (runningSum)
|
||||
* b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to a boundary list.
|
||||
* 2) Return the boundary list.
|
||||
*/
|
||||
public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
|
||||
ArrayList<byte[]> keysArray = new ArrayList<byte[]>();
|
||||
int runningValue = 0;
|
||||
byte[] currStartKey = null;
|
||||
boolean firstBoundary = true;
|
||||
|
||||
for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
|
||||
if (runningValue == 0) currStartKey = item.getKey();
|
||||
runningValue += item.getValue();
|
||||
if (runningValue == 0) {
|
||||
if (!firstBoundary) keysArray.add(currStartKey);
|
||||
firstBoundary = false;
|
||||
}
|
||||
}
|
||||
|
||||
return keysArray.toArray(new byte[0][0]);
|
||||
}
|
||||
|
||||
/*
|
||||
* If the table is created for the first time, then "completebulkload" reads the files twice.
|
||||
* More modifications necessary if we want to avoid doing it.
|
||||
*/
|
||||
private void createTable(String tableName, String dirPath) throws Exception {
|
||||
Path hfofDir = new Path(dirPath);
|
||||
FileSystem fs = hfofDir.getFileSystem(getConf());
|
||||
|
||||
if (!fs.exists(hfofDir)) {
|
||||
throw new FileNotFoundException("HFileOutputFormat dir " +
|
||||
hfofDir + " not found");
|
||||
}
|
||||
|
||||
FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
|
||||
if (familyDirStatuses == null) {
|
||||
throw new FileNotFoundException("No families found in " + hfofDir);
|
||||
}
|
||||
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = null;
|
||||
|
||||
// Add column families
|
||||
// Build a set of keys
|
||||
byte[][] keys = null;
|
||||
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
for (FileStatus stat : familyDirStatuses) {
|
||||
if (!stat.isDir()) {
|
||||
LOG.warn("Skipping non-directory " + stat.getPath());
|
||||
continue;
|
||||
}
|
||||
Path familyDir = stat.getPath();
|
||||
// Skip _logs, etc
|
||||
if (familyDir.getName().startsWith("_")) continue;
|
||||
byte[] family = familyDir.getName().getBytes();
|
||||
|
||||
hcd = new HColumnDescriptor(family);
|
||||
htd.addFamily(hcd);
|
||||
|
||||
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
|
||||
for (Path hfile : hfiles) {
|
||||
if (hfile.getName().startsWith("_")) continue;
|
||||
|
||||
HFile.Reader reader = new HFile.Reader(fs, hfile, null, false, false);
|
||||
final byte[] first, last;
|
||||
try {
|
||||
reader.loadFileInfo();
|
||||
first = reader.getFirstRowKey();
|
||||
last = reader.getLastRowKey();
|
||||
|
||||
LOG.info("Trying to figure out region boundaries hfile=" + hfile +
|
||||
" first=" + Bytes.toStringBinary(first) +
|
||||
" last=" + Bytes.toStringBinary(last));
|
||||
|
||||
// To eventually infer start key-end key boundaries
|
||||
Integer value = map.containsKey(first)?(Integer)map.get(first):0;
|
||||
map.put(first, value+1);
|
||||
|
||||
value = map.containsKey(last)?(Integer)map.get(last):0;
|
||||
map.put(last, value-1);
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
keys = LoadIncrementalHFiles.inferBoundaries(map);
|
||||
try {
|
||||
this.hbAdmin.createTableAsync(htd, keys);
|
||||
} catch (java.net.SocketTimeoutException e) {
|
||||
System.err.println("Caught Socket timeout.. Mostly caused by a slow region assignment by master!");
|
||||
}
|
||||
|
||||
HTable table = new HTable(tableName);
|
||||
|
||||
HConnection conn = table.getConnection();
|
||||
int ctr = 0;
|
||||
while (!conn.isTableAvailable(table.getTableName()) && (ctr<TABLE_CREATE_MAX_RETRIES)) {
|
||||
LOG.info("Table " + tableName + "not yet available... Sleeping for 60 more seconds...");
|
||||
/* Every TABLE_CREATE_SLEEP milliseconds, wakes up and checks if the table is available*/
|
||||
Thread.sleep(TABLE_CREATE_SLEEP);
|
||||
ctr++;
|
||||
}
|
||||
LOG.info("Table "+ tableName +" is finally available!!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
|
@ -307,15 +439,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
return -1;
|
||||
}
|
||||
|
||||
Path hfofDir = new Path(args[0]);
|
||||
HTable table = new HTable(this.getConf(), args[1]);
|
||||
String dirPath = args[0];
|
||||
String tableName = args[1];
|
||||
|
||||
boolean tableExists = this.doesTableExist(tableName);
|
||||
if (!tableExists) this.createTable(tableName,dirPath);
|
||||
|
||||
Path hfofDir = new Path(dirPath);
|
||||
HTable table = new HTable(tableName);
|
||||
|
||||
doBulkLoad(hfofDir, table);
|
||||
return 0;
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ToolRunner.run(new LoadIncrementalHFiles(), args);
|
||||
ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.mapreduce;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -186,4 +188,70 @@ public class TestLoadIncrementalHFiles {
|
|||
writer.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
|
||||
Integer value = map.containsKey(first)?(Integer)map.get(first):0;
|
||||
map.put(first, value+1);
|
||||
|
||||
value = map.containsKey(last)?(Integer)map.get(last):0;
|
||||
map.put(last, value-1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInferBoundaries() {
|
||||
TreeMap<byte[], Integer> map = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/* Toy example
|
||||
* c---------i o------p s---------t v------x
|
||||
* a------e g-----k m-------------q r----s u----w
|
||||
*
|
||||
* Should be inferred as:
|
||||
* a-----------------k m-------------q r--------------t u---------x
|
||||
*
|
||||
* The output should be (m,r,u)
|
||||
*/
|
||||
|
||||
String first;
|
||||
String last;
|
||||
|
||||
first = "a"; last = "e";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "r"; last = "s";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "o"; last = "p";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "g"; last = "k";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "v"; last = "x";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "c"; last = "i";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "m"; last = "q";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "s"; last = "t";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
first = "u"; last = "w";
|
||||
addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
|
||||
|
||||
byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
|
||||
byte[][] compare = new byte[3][];
|
||||
compare[0] = "m".getBytes();
|
||||
compare[1] = "r".getBytes();
|
||||
compare[2] = "u".getBytes();
|
||||
|
||||
assertEquals(keysArray.length, 3);
|
||||
|
||||
for (int row = 0; row<keysArray.length; row++){
|
||||
assertArrayEquals(keysArray[row], compare[row]);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue