HBASE-22859 [HBCK2] Fix the orphan regions on filesystem
Signed-off-by: Peter Somogyi <psomogyi@apache.org> Signed-off-by: Guanghao Zhang <zghao@apache.org> Add section to the bulk load complete tool on how it can be used 'adopting' stray 'orphan' data turned up by hbck2 or the new reporting facility in the Master UI. Did a cleanup of BulkLoadHFileTool mostly around usage pointing back to this new documentation.
This commit is contained in:
parent
ac8fe1627a
commit
018396d84c
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -112,6 +112,10 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
|
||||
|
||||
public static final String NAME = "completebulkload";
|
||||
/**
|
||||
* Whether to run validation on hfiles before loading.
|
||||
*/
|
||||
private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile";
|
||||
|
||||
// We use a '.' prefix which is ignored when walking directory trees
|
||||
// above. It is invalid family name.
|
||||
|
@ -175,8 +179,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
*/
|
||||
private static void validateFamiliesInHFiles(TableDescriptor tableDesc,
|
||||
Deque<LoadQueueItem> queue, boolean silence) throws IOException {
|
||||
Set<String> familyNames = Arrays.asList(tableDesc.getColumnFamilies()).stream()
|
||||
.map(f -> f.getNameAsString()).collect(Collectors.toSet());
|
||||
Set<String> familyNames = Arrays.stream(tableDesc.getColumnFamilies())
|
||||
.map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet());
|
||||
List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
|
||||
.filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
|
||||
if (unmatchedFamilies.size() > 0) {
|
||||
|
@ -207,8 +211,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
|
||||
/**
|
||||
* Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
|
||||
* skip non-valid hfiles by default, or skip this validation by setting
|
||||
* 'hbase.loadincremental.validate.hfile' to false.
|
||||
* skip non-valid hfiles by default, or skip this validation by setting {@link #VALIDATE_HFILES}
|
||||
* to false.
|
||||
*/
|
||||
private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
|
||||
BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
|
||||
|
@ -281,7 +285,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
}
|
||||
|
||||
@Override
|
||||
public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
|
||||
public void bulkHFile(final byte[] family, final FileStatus hfile) {
|
||||
long length = hfile.getLen();
|
||||
if (length > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
|
||||
HConstants.DEFAULT_MAX_FILE_SIZE)) {
|
||||
|
@ -479,14 +483,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
final LoadQueueItem item = queue.remove();
|
||||
|
||||
final Callable<Pair<List<LoadQueueItem>, String>> call =
|
||||
new Callable<Pair<List<LoadQueueItem>, String>>() {
|
||||
@Override
|
||||
public Pair<List<LoadQueueItem>, String> call() throws Exception {
|
||||
Pair<List<LoadQueueItem>, String> splits =
|
||||
groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
|
||||
return splits;
|
||||
}
|
||||
};
|
||||
() -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
|
||||
splittingFutures.add(pool.submit(call));
|
||||
}
|
||||
// get all the results. All grouping and splitting must finish before
|
||||
|
@ -523,7 +520,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
}
|
||||
|
||||
private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc,
|
||||
byte[] startKey, byte[] splitKey) throws IOException {
|
||||
byte[] splitKey) throws IOException {
|
||||
Path hfilePath = item.getFilePath();
|
||||
byte[] family = item.getFamily();
|
||||
Path tmpDir = hfilePath.getParent();
|
||||
|
@ -611,7 +608,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
}
|
||||
int indexForCallable = idx;
|
||||
|
||||
/**
|
||||
/*
|
||||
* we can consider there is a region hole in following conditions. 1) if idx < 0,then first
|
||||
* region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
|
||||
* region. 3) if the endkey of the last region is not empty.
|
||||
|
@ -637,7 +634,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
Pair<byte[], byte[]> startEndKey = startEndKeys.get(indexForCallable);
|
||||
List<LoadQueueItem> lqis =
|
||||
splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)),
|
||||
startEndKey.getFirst(), startEndKey.getSecond());
|
||||
startEndKey.getSecond());
|
||||
return new Pair<>(lqis, null);
|
||||
}
|
||||
|
||||
|
@ -793,7 +790,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
|
||||
|
||||
// To eventually infer start key-end key boundaries
|
||||
Integer value = map.containsKey(first) ? map.get(first) : 0;
|
||||
Integer value = map.getOrDefault(first, 0);
|
||||
map.put(first, value + 1);
|
||||
|
||||
value = map.containsKey(last) ? map.get(last) : 0;
|
||||
|
@ -856,10 +853,6 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
// that we can atomically pull out the groups we want to retry.
|
||||
}
|
||||
|
||||
if (!queue.isEmpty()) {
|
||||
throw new RuntimeException(
|
||||
"Bulk load aborted with some files not yet loaded." + "Please check log for more details.");
|
||||
}
|
||||
return item2RegionMap;
|
||||
}
|
||||
|
||||
|
@ -885,20 +878,17 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
}
|
||||
|
||||
/**
|
||||
* Perform a bulk load of the given directory into the given pre-existing table. This method is
|
||||
* not threadsafe.
|
||||
* Perform a bulk load of the given map of families to hfiles into the given pre-existing table.
|
||||
* This method is not threadsafe.
|
||||
* @param map map of family to List of hfiles
|
||||
* @param tableName table to load the hfiles
|
||||
* @param silence true to ignore unmatched column families
|
||||
* @param copyFile always copy hfiles if true
|
||||
* @throws TableNotFoundException if table does not yet exist
|
||||
*/
|
||||
private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
|
||||
TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile)
|
||||
throws TableNotFoundException, IOException {
|
||||
if (!FutureUtils.get(conn.getAdmin().isTableAvailable(tableName))) {
|
||||
throw new TableNotFoundException("Table " + tableName + " is not currently available.");
|
||||
}
|
||||
throws IOException {
|
||||
tableExists(conn, tableName);
|
||||
// LQI queue does not need to be threadsafe -- all operations on this queue
|
||||
// happen in this thread
|
||||
Deque<LoadQueueItem> queue = new ArrayDeque<>();
|
||||
|
@ -924,20 +914,17 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
* HFileOutputFormat
|
||||
* @param silence true to ignore unmatched column families
|
||||
* @param copyFile always copy hfiles if true
|
||||
* @throws TableNotFoundException if table does not yet exist
|
||||
*/
|
||||
private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
|
||||
TableName tableName, Path hfofDir, boolean silence, boolean copyFile)
|
||||
throws TableNotFoundException, IOException {
|
||||
if (!FutureUtils.get(conn.getAdmin().isTableAvailable(tableName))) {
|
||||
throw new TableNotFoundException("Table " + tableName + " is not currently available.");
|
||||
}
|
||||
throws IOException {
|
||||
tableExists(conn, tableName);
|
||||
|
||||
/*
|
||||
* Checking hfile format is a time-consuming operation, we should have an option to skip this
|
||||
* step when bulkloading millions of HFiles. See HBASE-13985.
|
||||
*/
|
||||
boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
|
||||
boolean validateHFile = getConf().getBoolean(VALIDATE_HFILES, true);
|
||||
if (!validateHFile) {
|
||||
LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
|
||||
"are not correct. If you fail to read data from your table after using this " +
|
||||
|
@ -967,21 +954,16 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
|
||||
@Override
|
||||
public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
|
||||
Map<byte[], List<Path>> family2Files) throws TableNotFoundException, IOException {
|
||||
try (AsyncClusterConnection conn = ClusterConnectionFactory
|
||||
.createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
|
||||
if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) {
|
||||
String errorMsg = format("Table '%s' does not exist.", tableName);
|
||||
LOG.error(errorMsg);
|
||||
throw new TableNotFoundException(errorMsg);
|
||||
}
|
||||
Map<byte[], List<Path>> family2Files) throws IOException {
|
||||
try (AsyncClusterConnection conn = ClusterConnectionFactory.
|
||||
createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
|
||||
return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
|
||||
throws TableNotFoundException, IOException {
|
||||
throws IOException {
|
||||
try (AsyncClusterConnection conn = ClusterConnectionFactory
|
||||
.createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
|
||||
AsyncAdmin admin = conn.getAdmin();
|
||||
|
@ -989,30 +971,46 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
if (isCreateTable()) {
|
||||
createTable(tableName, dir, admin);
|
||||
} else {
|
||||
String errorMsg = format("Table '%s' does not exist.", tableName);
|
||||
LOG.error(errorMsg);
|
||||
throw new TableNotFoundException(errorMsg);
|
||||
throwAndLogTableNotFoundException(tableName);
|
||||
}
|
||||
}
|
||||
return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws TableNotFoundException if table does not exist.
|
||||
*/
|
||||
private void tableExists(AsyncClusterConnection conn, TableName tableName) throws IOException {
|
||||
if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) {
|
||||
throwAndLogTableNotFoundException(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
private void throwAndLogTableNotFoundException(TableName tn) throws TableNotFoundException {
|
||||
String errorMsg = format("Table '%s' does not exist.", tn);
|
||||
LOG.error(errorMsg);
|
||||
throw new TableNotFoundException(errorMsg);
|
||||
}
|
||||
|
||||
public void setBulkToken(String bulkToken) {
|
||||
this.bulkToken = bulkToken;
|
||||
}
|
||||
|
||||
private void usage() {
|
||||
System.err.println("usage: " + "bin/hbase completebulkload <-Dargs> "
|
||||
+ "</path/to/hfileoutputformat-output> <tablename>\n"
|
||||
+ "\t-D" + CREATE_TABLE_CONF_KEY + "=no can be used to avoid creation "
|
||||
+ "of a table by this tool.\n"
|
||||
+ "\t Note: if you set this to 'no', then target table must already exist.\n"
|
||||
+ "\t-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes can be used to ignore "
|
||||
+ "unmatched column families.\n"
|
||||
+ "\t-loadTable switch implies your baseDirectory to store file has a "
|
||||
+ "depth of 3, table must exist\n"
|
||||
+ "\t and -loadTable switch is the last option on the command line.\n\n");
|
||||
System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
|
||||
+ "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
|
||||
+ "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- "
|
||||
+ "into an hbase table.\n"
|
||||
+ "OPTIONS (for other -D options, see source code):\n"
|
||||
+ " -D" + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target "
|
||||
+ "table must exist.\n"
|
||||
+ " -D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes to ignore unmatched column families.\n"
|
||||
+ " -loadTable for when directory of files to load has a depth of 3; target table must "
|
||||
+ "exist;\n"
|
||||
+ " must be last of the options on command line.\n"
|
||||
+ "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for "
|
||||
+ "documentation.\n");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1023,7 +1021,6 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
}
|
||||
Path dirPath = new Path(args[0]);
|
||||
TableName tableName = TableName.valueOf(args[1]);
|
||||
|
||||
if (args.length == 2) {
|
||||
return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1;
|
||||
} else {
|
||||
|
@ -1049,5 +1046,4 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
|
||||
System.exit(ret);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2430,8 +2430,8 @@ To control these for stripe compactions, use `hbase.store.stripe.compaction.minF
|
|||
HBase includes several methods of loading data into tables.
|
||||
The most straightforward method is to either use the `TableOutputFormat` class from a MapReduce job, or use the normal client APIs; however, these are not always the most efficient methods.
|
||||
|
||||
The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly loads the generated StoreFiles into a running cluster.
|
||||
Using bulk load will use less CPU and network resources than simply using the HBase API.
|
||||
The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly load the generated StoreFiles into a running cluster.
|
||||
Using bulk load will use less CPU and network resources than loading via the HBase API.
|
||||
|
||||
[[arch.bulk.load.arch]]
|
||||
=== Bulk Load Architecture
|
||||
|
@ -2442,7 +2442,7 @@ The HBase bulk load process consists of two main steps.
|
|||
==== Preparing data via a MapReduce job
|
||||
|
||||
The first step of a bulk load is to generate HBase data files (StoreFiles) from a MapReduce job using `HFileOutputFormat2`.
|
||||
This output format writes out data in HBase's internal storage format so that they can be later loaded very efficiently into the cluster.
|
||||
This output format writes out data in HBase's internal storage format so that they can be later loaded efficiently into the cluster.
|
||||
|
||||
In order to function efficiently, `HFileOutputFormat2` must be configured such that each output HFile fits within a single region.
|
||||
In order to do this, jobs whose output will be bulk loaded into HBase use Hadoop's `TotalOrderPartitioner` class to partition the map output into disjoint ranges of the key space, corresponding to the key ranges of the regions in the table.
|
||||
|
@ -2459,6 +2459,7 @@ It then contacts the appropriate RegionServer which adopts the HFile, moving it
|
|||
If the region boundaries have changed during the course of bulk load preparation, or between the preparation and completion steps, the `completebulkload` utility will automatically split the data files into pieces corresponding to the new boundaries.
|
||||
This process is not optimally efficient, so users should take care to minimize the delay between preparing a bulk load and importing it into the cluster, especially if other clients are simultaneously loading data through other means.
|
||||
|
||||
[[arch.bulk.load.complete.help]]
|
||||
[source,bash]
|
||||
----
|
||||
$ hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable
|
||||
|
@ -2466,15 +2467,12 @@ $ hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config
|
|||
|
||||
The `-c config-file` option can be used to specify a file containing the appropriate hbase parameters (e.g., hbase-site.xml) if not supplied already on the CLASSPATH (In addition, the CLASSPATH must contain the directory that has the zookeeper configuration file if zookeeper is NOT managed by HBase).
|
||||
|
||||
NOTE: If the target table does not already exist in HBase, this tool will create the table automatically.
|
||||
|
||||
|
||||
[[arch.bulk.load.also]]
|
||||
=== See Also
|
||||
|
||||
For more information about the referenced utilities, see <<importtsv>> and <<completebulkload>>.
|
||||
|
||||
See link:http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/[How-to: Use HBase Bulk Loading, and Why] for a recent blog on current state of bulk loading.
|
||||
See link:http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/[How-to: Use HBase Bulk Loading, and Why] for an old blog post on loading.
|
||||
|
||||
[[arch.bulk.load.adv]]
|
||||
=== Advanced Usage
|
||||
|
@ -2485,6 +2483,53 @@ To get started doing so, dig into `ImportTsv.java` and check the JavaDoc for HFi
|
|||
The import step of the bulk load can also be done programmatically.
|
||||
See the `LoadIncrementalHFiles` class for more information.
|
||||
|
||||
[[arch.bulk.load.complete.strays]]
|
||||
==== 'Adopting' Stray Data
|
||||
Should an HBase cluster lose account of regions or files during an outage or error, you can use
|
||||
the `completebulkload` tool to add back the dropped data. HBase operator tooling such as
|
||||
link:https://github.com/apache/hbase-operator-tools/tree/master/hbase-hbck2[HBCK2] or
|
||||
the reporting added to the Master's UI under the `HBCK Report` (Since HBase 2.0.6/2.1.6/2.2.1)
|
||||
can identify such 'orphan' directories.
|
||||
|
||||
Before you begin the 'adoption', ensure the `hbase:meta` table is in a healthy state.
|
||||
Run the `CatalogJanitor` by executing the `catalogjanitor_run` command on the HBase shell.
|
||||
When finished, check the `HBCK Report` page on the Master UI. Work on fixing any
|
||||
inconsistencies, holes, or overlaps found before proceeding. The `hbase:meta` table
|
||||
is the authority on where all data is to be found and must be consistent for
|
||||
the `completebulkload` tool to work properly.
|
||||
|
||||
The `completebulkload` tool takes a directory and a `tablename`.
|
||||
The directory has subdirectories named for column families of the targeted `tablename`.
|
||||
In these subdirectories are `hfiles` to load. Given this structure, you can pass
|
||||
errant region directories (and the table name to which the region directory belongs)
|
||||
and the tool will bring the data files back into the fold by moving them under the
|
||||
approprate serving directory. If stray files, then you will need to mock up this
|
||||
structure before invoking the `completebulkload` tool; you may have to look at the
|
||||
file content using the <<hfile.tool>> to see what the column family to use is.
|
||||
When the tool completes its run, you will notice that the
|
||||
source errant directory has had its storefiles moved/removed. It is now desiccated
|
||||
since its data has been drained, and the pointed-to directory can be safely
|
||||
removed. It may still have `.regioninfo` files and other
|
||||
subdirectories but they are of no relevance now (There may be content still
|
||||
under the _recovered_edits_ directory; a TODO is tooling to replay the
|
||||
content of _recovered_edits_ if needed; see
|
||||
link:https://issues.apache.org/jira/browse/HBASE-22976[Add RecoveredEditsPlayer]).
|
||||
If you pass `completebulkload` a directory without store files, it will run and
|
||||
note the directory is storefile-free. Just remove such 'empty' directories.
|
||||
|
||||
For example, presuming a directory at the top level in HDFS named
|
||||
`eb3352fb5c9c9a05feeb2caba101e1cc` has data we need to re-add to the
|
||||
HBase `TestTable`:
|
||||
|
||||
[source,bash]
|
||||
----
|
||||
$ ${HBASE_HOME}/bin/hbase --config ~/hbase-conf completebulkload hdfs://server.example.org:9000/eb3352fb5c9c9a05feeb2caba101e1cc TestTable
|
||||
----
|
||||
|
||||
After it successfully completes, any files that were in `eb3352fb5c9c9a05feeb2caba101e1cc` have been moved
|
||||
under hbase and the `eb3352fb5c9c9a05feeb2caba101e1cc` directory can be deleted (Check content
|
||||
before and after by running `ls -r` on the HDFS directory).
|
||||
|
||||
[[arch.bulk.load.replication]]
|
||||
=== Bulk Loading Replication
|
||||
HBASE-13153 adds replication support for bulk loaded HFiles, available since HBase 1.3/2.0. This feature is enabled by setting `hbase.replication.bulkload.enabled` to `true` (default is `false`).
|
||||
|
|
Loading…
Reference in New Issue