HBASE-17825: Backup further optimizations
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
62ee7d9502
commit
6cfa208add
|
@ -374,14 +374,17 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
|
Set<TableName> tableSet = backupManager.getIncrementalBackupTableSet();
|
||||||
// filter missing files out (they have been copied by previous backups)
|
// filter missing files out (they have been copied by previous backups)
|
||||||
incrBackupFileList = filterMissingFiles(incrBackupFileList);
|
incrBackupFileList = filterMissingFiles(incrBackupFileList);
|
||||||
|
List<String> tableList = new ArrayList<String>();
|
||||||
for (TableName table : tableSet) {
|
for (TableName table : tableSet) {
|
||||||
// Check if table exists
|
// Check if table exists
|
||||||
if (tableExists(table, conn)) {
|
if (tableExists(table, conn)) {
|
||||||
walToHFiles(incrBackupFileList, table);
|
tableList.add(table.getNameAsString());
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
|
LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
walToHFiles(incrBackupFileList, tableList);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean tableExists(TableName table, Connection conn) throws IOException {
|
protected boolean tableExists(TableName table, Connection conn) throws IOException {
|
||||||
|
@ -390,20 +393,21 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void walToHFiles(List<String> dirPaths, TableName tableName) throws IOException {
|
protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
|
||||||
Tool player = new WALPlayer();
|
Tool player = new WALPlayer();
|
||||||
|
|
||||||
// Player reads all files in arbitrary directory structure and creates
|
// Player reads all files in arbitrary directory structure and creates
|
||||||
// a Map task for each file. We use ';' as separator
|
// a Map task for each file. We use ';' as separator
|
||||||
// because WAL file names contains ','
|
// because WAL file names contains ','
|
||||||
String dirs = StringUtils.join(dirPaths, ';');
|
String dirs = StringUtils.join(dirPaths, ';');
|
||||||
String jobname = "Incremental_Backup-" + backupId + "-" + tableName.getNameAsString();
|
String jobname = "Incremental_Backup-" + backupId ;
|
||||||
|
|
||||||
Path bulkOutputPath = getBulkOutputDirForTable(tableName);
|
Path bulkOutputPath = getBulkOutputDir();
|
||||||
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
|
conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
|
||||||
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
|
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
|
||||||
|
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
|
||||||
conf.set(JOB_NAME_CONF_KEY, jobname);
|
conf.set(JOB_NAME_CONF_KEY, jobname);
|
||||||
String[] playerArgs = { dirs, tableName.getNameAsString() };
|
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
|
||||||
|
|
||||||
try {
|
try {
|
||||||
player.setConf(conf);
|
player.setConf(conf);
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.mapreduce;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
|
import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -29,7 +30,9 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.backup.BackupInfo;
|
import org.apache.hadoop.hbase.backup.BackupInfo;
|
||||||
import org.apache.hadoop.hbase.backup.BackupMergeJob;
|
import org.apache.hadoop.hbase.backup.BackupMergeJob;
|
||||||
|
@ -40,6 +43,8 @@ import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
|
||||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -113,6 +118,7 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||||
// Find input directories for table
|
// Find input directories for table
|
||||||
Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
|
Path[] dirPaths = findInputDirectories(fs, backupRoot, tableNames[i], backupIds);
|
||||||
String dirs = StringUtils.join(dirPaths, ",");
|
String dirs = StringUtils.join(dirPaths, ",");
|
||||||
|
|
||||||
Path bulkOutputPath =
|
Path bulkOutputPath =
|
||||||
BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
|
BackupUtils.getBulkOutputDir(BackupUtils.getFileNameCompatibleString(tableNames[i]),
|
||||||
getConf(), false);
|
getConf(), false);
|
||||||
|
@ -243,21 +249,61 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||||
protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath,
|
protected void moveData(FileSystem fs, String backupRoot, Path bulkOutputPath,
|
||||||
TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException {
|
TableName tableName, String mergedBackupId) throws IllegalArgumentException, IOException {
|
||||||
Path dest =
|
Path dest =
|
||||||
new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, mergedBackupId, tableName));
|
new Path(HBackupFileSystem.getTableBackupDir(backupRoot, mergedBackupId, tableName));
|
||||||
|
|
||||||
// Delete all in dest
|
// Delete all *data* files in dest
|
||||||
if (!fs.delete(dest, true)) {
|
if (!deleteData(fs, dest)) {
|
||||||
throw new IOException("Could not delete " + dest);
|
throw new IOException("Could not delete " + dest);
|
||||||
}
|
}
|
||||||
|
|
||||||
FileStatus[] fsts = fs.listStatus(bulkOutputPath);
|
FileStatus[] fsts = fs.listStatus(bulkOutputPath);
|
||||||
for (FileStatus fst : fsts) {
|
for (FileStatus fst : fsts) {
|
||||||
if (fst.isDirectory()) {
|
if (fst.isDirectory()) {
|
||||||
fs.rename(fst.getPath().getParent(), dest);
|
String family = fst.getPath().getName();
|
||||||
|
Path newDst = new Path(dest, family);
|
||||||
|
if (fs.exists(newDst)) {
|
||||||
|
if (!fs.delete(newDst, true)) {
|
||||||
|
throw new IOException("failed to delete :"+ newDst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fs.rename(fst.getPath(), dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes only data files and keeps all META
|
||||||
|
* @param fs file system instance
|
||||||
|
* @param dest destination location
|
||||||
|
* @return true, if success, false - otherwise
|
||||||
|
* @throws FileNotFoundException exception
|
||||||
|
* @throws IOException exception
|
||||||
|
*/
|
||||||
|
private boolean deleteData(FileSystem fs, Path dest) throws FileNotFoundException, IOException {
|
||||||
|
RemoteIterator<LocatedFileStatus> it = fs.listFiles(dest, true);
|
||||||
|
List<Path> toDelete = new ArrayList<Path>();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Path p = it.next().getPath();
|
||||||
|
if (fs.isDirectory(p)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Keep meta
|
||||||
|
String fileName = p.toString();
|
||||||
|
if (fileName.indexOf(FSTableDescriptors.TABLEINFO_DIR) > 0 ||
|
||||||
|
fileName.indexOf(HRegionFileSystem.REGION_INFO_FILE) > 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
toDelete.add(p);
|
||||||
|
}
|
||||||
|
for (Path p : toDelete) {
|
||||||
|
boolean result = fs.delete(p, false);
|
||||||
|
if (!result) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
protected String findMostRecentBackupId(String[] backupIds) {
|
protected String findMostRecentBackupId(String[] backupIds) {
|
||||||
long recentTimestamp = Long.MIN_VALUE;
|
long recentTimestamp = Long.MIN_VALUE;
|
||||||
for (String backupId : backupIds) {
|
for (String backupId : backupIds) {
|
||||||
|
@ -291,12 +337,12 @@ public class MapReduceBackupMergeJob implements BackupMergeJob {
|
||||||
|
|
||||||
for (String backupId : backupIds) {
|
for (String backupId : backupIds) {
|
||||||
Path fileBackupDirPath =
|
Path fileBackupDirPath =
|
||||||
new Path(HBackupFileSystem.getTableBackupDataDir(backupRoot, backupId, tableName));
|
new Path(HBackupFileSystem.getTableBackupDir(backupRoot, backupId, tableName));
|
||||||
if (fs.exists(fileBackupDirPath)) {
|
if (fs.exists(fileBackupDirPath)) {
|
||||||
dirs.add(fileBackupDirPath);
|
dirs.add(fileBackupDirPath);
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.trace("File: " + fileBackupDirPath + " does not exist.");
|
LOG.debug("File: " + fileBackupDirPath + " does not exist.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -254,8 +254,7 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
|
||||||
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
|
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
|
||||||
String backupIdIncMultiple2 = client.backupTables(request);
|
String backupIdIncMultiple2 = client.backupTables(request);
|
||||||
assertTrue(checkSucceeded(backupIdIncMultiple2));
|
assertTrue(checkSucceeded(backupIdIncMultiple2));
|
||||||
|
// #4 Merge backup images with failures
|
||||||
// #4 Merge backup images with failures
|
|
||||||
|
|
||||||
for (FailurePhase phase : FailurePhase.values()) {
|
for (FailurePhase phase : FailurePhase.values()) {
|
||||||
Configuration conf = conn.getConfiguration();
|
Configuration conf = conn.getConfiguration();
|
||||||
|
@ -294,7 +293,6 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
|
||||||
LOG.debug("Expected :"+ e.getMessage());
|
LOG.debug("Expected :"+ e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now merge w/o failures
|
// Now merge w/o failures
|
||||||
Configuration conf = conn.getConfiguration();
|
Configuration conf = conn.getConfiguration();
|
||||||
conf.unset(FAILURE_PHASE_KEY);
|
conf.unset(FAILURE_PHASE_KEY);
|
||||||
|
|
|
@ -258,15 +258,19 @@ public class HFileOutputFormat2
|
||||||
} else {
|
} else {
|
||||||
tableNameBytes = Bytes.toBytes(writeTableNames);
|
tableNameBytes = Bytes.toBytes(writeTableNames);
|
||||||
}
|
}
|
||||||
|
String tableName = Bytes.toString(tableNameBytes);
|
||||||
|
Path tableRelPath = getTableRelativePath(tableNameBytes);
|
||||||
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
|
byte[] tableAndFamily = getTableNameSuffixedWithFamily(tableNameBytes, family);
|
||||||
|
|
||||||
WriterLength wl = this.writers.get(tableAndFamily);
|
WriterLength wl = this.writers.get(tableAndFamily);
|
||||||
|
|
||||||
// If this is a new column family, verify that the directory exists
|
// If this is a new column family, verify that the directory exists
|
||||||
if (wl == null) {
|
if (wl == null) {
|
||||||
Path writerPath = null;
|
Path writerPath = null;
|
||||||
if (writeMultipleTables) {
|
if (writeMultipleTables) {
|
||||||
writerPath = new Path(outputDir, new Path(Bytes.toString(tableNameBytes), Bytes
|
writerPath = new Path(outputDir,new Path(tableRelPath, Bytes
|
||||||
.toString(family)));
|
.toString(family)));
|
||||||
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
writerPath = new Path(outputDir, Bytes.toString(family));
|
writerPath = new Path(outputDir, Bytes.toString(family));
|
||||||
|
@ -289,7 +293,6 @@ public class HFileOutputFormat2
|
||||||
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
|
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
|
||||||
HRegionLocation loc = null;
|
HRegionLocation loc = null;
|
||||||
|
|
||||||
String tableName = Bytes.toString(tableNameBytes);
|
|
||||||
if (tableName != null) {
|
if (tableName != null) {
|
||||||
try (Connection connection = ConnectionFactory.createConnection(conf);
|
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||||
RegionLocator locator =
|
RegionLocator locator =
|
||||||
|
@ -341,6 +344,15 @@ public class HFileOutputFormat2
|
||||||
this.previousRow = rowKey;
|
this.previousRow = rowKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Path getTableRelativePath(byte[] tableNameBytes) {
|
||||||
|
String tableName = Bytes.toString(tableNameBytes);
|
||||||
|
String[] tableNameParts = tableName.split(":");
|
||||||
|
Path tableRelPath = new Path(tableName.split(":")[0]);
|
||||||
|
if (tableNameParts.length > 1) {
|
||||||
|
tableRelPath = new Path(tableRelPath, tableName.split(":")[1]);
|
||||||
|
}
|
||||||
|
return tableRelPath;
|
||||||
|
}
|
||||||
private void rollWriters(WriterLength writerLength) throws IOException {
|
private void rollWriters(WriterLength writerLength) throws IOException {
|
||||||
if (writerLength != null) {
|
if (writerLength != null) {
|
||||||
closeWriter(writerLength);
|
closeWriter(writerLength);
|
||||||
|
@ -376,7 +388,7 @@ public class HFileOutputFormat2
|
||||||
Path familydir = new Path(outputDir, Bytes.toString(family));
|
Path familydir = new Path(outputDir, Bytes.toString(family));
|
||||||
if (writeMultipleTables) {
|
if (writeMultipleTables) {
|
||||||
familydir = new Path(outputDir,
|
familydir = new Path(outputDir,
|
||||||
new Path(Bytes.toString(tableName), Bytes.toString(family)));
|
new Path(getTableRelativePath(tableName), Bytes.toString(family)));
|
||||||
}
|
}
|
||||||
WriterLength wl = new WriterLength();
|
WriterLength wl = new WriterLength();
|
||||||
Algorithm compression = compressionMap.get(tableAndFamily);
|
Algorithm compression = compressionMap.get(tableAndFamily);
|
||||||
|
|
|
@ -20,7 +20,11 @@ package org.apache.hadoop.hbase.mapreduce;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -38,6 +42,7 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.TableInfo;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
|
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
|
||||||
|
@ -72,7 +77,9 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
|
public final static String TABLE_MAP_KEY = "wal.input.tablesmap";
|
||||||
public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
|
public final static String INPUT_FILES_SEPARATOR_KEY = "wal.input.separator";
|
||||||
public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
|
public final static String IGNORE_MISSING_FILES = "wal.input.ignore.missing.files";
|
||||||
|
public final static String MULTI_TABLES_SUPPORT = "wal.multi.tables.support";
|
||||||
|
|
||||||
|
protected static final String tableSeparator = ";";
|
||||||
|
|
||||||
// This relies on Hadoop Configuration to handle warning about deprecated configs and
|
// This relies on Hadoop Configuration to handle warning about deprecated configs and
|
||||||
// to set the correct non-deprecated configs when an old one shows up.
|
// to set the correct non-deprecated configs when an old one shows up.
|
||||||
|
@ -84,7 +91,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
|
|
||||||
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
|
||||||
|
|
||||||
public WALPlayer(){
|
public WALPlayer() {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected WALPlayer(final Configuration c) {
|
protected WALPlayer(final Configuration c) {
|
||||||
|
@ -92,26 +99,27 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mapper that just writes out KeyValues.
|
* A mapper that just writes out KeyValues. This one can be used together with
|
||||||
* This one can be used together with {@link CellSortReducer}
|
* {@link CellSortReducer}
|
||||||
*/
|
*/
|
||||||
static class WALKeyValueMapper
|
static class WALKeyValueMapper extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> {
|
||||||
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell> {
|
private Set<String> tableSet = new HashSet<String>();
|
||||||
private byte[] table;
|
private boolean multiTableSupport = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void map(WALKey key, WALEdit value,
|
public void map(WALKey key, WALEdit value, Context context) throws IOException {
|
||||||
Context context)
|
|
||||||
throws IOException {
|
|
||||||
try {
|
try {
|
||||||
// skip all other tables
|
// skip all other tables
|
||||||
if (Bytes.equals(table, key.getTableName().getName())) {
|
TableName table = key.getTableName();
|
||||||
|
if (tableSet.contains(table.getNameAsString())) {
|
||||||
for (Cell cell : value.getCells()) {
|
for (Cell cell : value.getCells()) {
|
||||||
if (WALEdit.isMetaEditFamily(cell)) {
|
if (WALEdit.isMetaEditFamily(cell)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
context.write(new ImmutableBytesWritable(CellUtil.cloneRow(cell)),
|
byte[] outKey = multiTableSupport
|
||||||
new MapReduceExtendedCell(cell));
|
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
|
||||||
|
: CellUtil.cloneRow(cell);
|
||||||
|
context.write(new ImmutableBytesWritable(outKey), new MapReduceExtendedCell(cell));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -121,34 +129,28 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setup(Context context) throws IOException {
|
public void setup(Context context) throws IOException {
|
||||||
// only a single table is supported when HFiles are generated with HFileOutputFormat
|
Configuration conf = context.getConfiguration();
|
||||||
String[] tables = context.getConfiguration().getStrings(TABLES_KEY);
|
String[] tables = conf.getStrings(TABLES_KEY);
|
||||||
if (tables == null || tables.length != 1) {
|
this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false);
|
||||||
// this can only happen when WALMapper is used directly by a class other than WALPlayer
|
for (String table : tables) {
|
||||||
throw new IOException("Exactly one table must be specified for bulk HFile case.");
|
tableSet.add(table);
|
||||||
}
|
}
|
||||||
table = Bytes.toBytes(tables[0]);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A mapper that writes out {@link Mutation} to be directly applied to
|
* A mapper that writes out {@link Mutation} to be directly applied to a running HBase instance.
|
||||||
* a running HBase instance.
|
|
||||||
*/
|
*/
|
||||||
protected static class WALMapper
|
protected static class WALMapper
|
||||||
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
|
extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
|
||||||
private Map<TableName, TableName> tables = new TreeMap<>();
|
private Map<TableName, TableName> tables = new TreeMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void map(WALKey key, WALEdit value, Context context)
|
public void map(WALKey key, WALEdit value, Context context) throws IOException {
|
||||||
throws IOException {
|
|
||||||
try {
|
try {
|
||||||
if (tables.isEmpty() || tables.containsKey(key.getTableName())) {
|
if (tables.isEmpty() || tables.containsKey(key.getTableName())) {
|
||||||
TableName targetTable = tables.isEmpty() ?
|
TableName targetTable =
|
||||||
key.getTableName() :
|
tables.isEmpty() ? key.getTableName() : tables.get(key.getTableName());
|
||||||
tables.get(key.getTableName());
|
|
||||||
ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
|
ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
|
||||||
Put put = null;
|
Put put = null;
|
||||||
Delete del = null;
|
Delete del = null;
|
||||||
|
@ -228,8 +230,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
if (tablesToUse != null) {
|
if (tablesToUse != null) {
|
||||||
for (String table : tablesToUse) {
|
for (String table : tablesToUse) {
|
||||||
tables.put(TableName.valueOf(table),
|
tables.put(TableName.valueOf(table), TableName.valueOf(tableMap[i++]));
|
||||||
TableName.valueOf(tableMap[i++]));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -249,9 +250,9 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
// then see if just a number of ms's was specified
|
// then see if just a number of ms's was specified
|
||||||
ms = Long.parseLong(val);
|
ms = Long.parseLong(val);
|
||||||
} catch (NumberFormatException nfe) {
|
} catch (NumberFormatException nfe) {
|
||||||
throw new IOException(option
|
throw new IOException(
|
||||||
+ " must be specified either in the form 2001-02-20T16:35:06.99 "
|
option + " must be specified either in the form 2001-02-20T16:35:06.99 "
|
||||||
+ "or as number of milliseconds");
|
+ "or as number of milliseconds");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conf.setLong(option, ms);
|
conf.setLong(option, ms);
|
||||||
|
@ -259,8 +260,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up the actual job.
|
* Sets up the actual job.
|
||||||
*
|
* @param args The command line parameters.
|
||||||
* @param args The command line parameters.
|
|
||||||
* @return The newly created job.
|
* @return The newly created job.
|
||||||
* @throws IOException When setting up the job fails.
|
* @throws IOException When setting up the job fails.
|
||||||
*/
|
*/
|
||||||
|
@ -283,7 +283,8 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
conf.setStrings(TABLES_KEY, tables);
|
conf.setStrings(TABLES_KEY, tables);
|
||||||
conf.setStrings(TABLE_MAP_KEY, tableMap);
|
conf.setStrings(TABLE_MAP_KEY, tableMap);
|
||||||
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
|
conf.set(FileInputFormat.INPUT_DIR, inputDirs);
|
||||||
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
|
Job job =
|
||||||
|
Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + System.currentTimeMillis()));
|
||||||
job.setJarByClass(WALPlayer.class);
|
job.setJarByClass(WALPlayer.class);
|
||||||
|
|
||||||
job.setInputFormatClass(WALInputFormat.class);
|
job.setInputFormatClass(WALInputFormat.class);
|
||||||
|
@ -294,22 +295,24 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
|
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
|
||||||
|
|
||||||
// the bulk HFile case
|
// the bulk HFile case
|
||||||
if (tables.length != 1) {
|
List<TableName> tableNames = getTableNameList(tables);
|
||||||
throw new IOException("Exactly one table must be specified for the bulk export option");
|
|
||||||
}
|
|
||||||
TableName tableName = TableName.valueOf(tables[0]);
|
|
||||||
job.setMapperClass(WALKeyValueMapper.class);
|
job.setMapperClass(WALKeyValueMapper.class);
|
||||||
job.setReducerClass(CellSortReducer.class);
|
job.setReducerClass(CellSortReducer.class);
|
||||||
Path outputDir = new Path(hfileOutPath);
|
Path outputDir = new Path(hfileOutPath);
|
||||||
FileOutputFormat.setOutputPath(job, outputDir);
|
FileOutputFormat.setOutputPath(job, outputDir);
|
||||||
job.setMapOutputValueClass(MapReduceExtendedCell.class);
|
job.setMapOutputValueClass(MapReduceExtendedCell.class);
|
||||||
try (Connection conn = ConnectionFactory.createConnection(conf);
|
try (Connection conn = ConnectionFactory.createConnection(conf);) {
|
||||||
|
List<TableInfo> tableInfoList = new ArrayList<TableInfo>();
|
||||||
|
for (TableName tableName : tableNames) {
|
||||||
Table table = conn.getTable(tableName);
|
Table table = conn.getTable(tableName);
|
||||||
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
|
RegionLocator regionLocator = conn.getRegionLocator(tableName);
|
||||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
|
tableInfoList.add(new TableInfo(table.getDescriptor(), regionLocator));
|
||||||
|
}
|
||||||
|
MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfoList);
|
||||||
}
|
}
|
||||||
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
|
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
|
||||||
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
|
org.apache.hbase.thirdparty.com.google.common.base.Preconditions.class);
|
||||||
} else {
|
} else {
|
||||||
// output to live cluster
|
// output to live cluster
|
||||||
job.setMapperClass(WALMapper.class);
|
job.setMapperClass(WALMapper.class);
|
||||||
|
@ -321,17 +324,25 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
String codecCls = WALCellCodec.getWALCellCodecClass(conf);
|
String codecCls = WALCellCodec.getWALCellCodecClass(conf);
|
||||||
try {
|
try {
|
||||||
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(), Class.forName(codecCls));
|
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
|
||||||
|
Class.forName(codecCls));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Cannot determine wal codec class " + codecCls, e);
|
throw new IOException("Cannot determine wal codec class " + codecCls, e);
|
||||||
}
|
}
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<TableName> getTableNameList(String[] tables) {
|
||||||
|
List<TableName> list = new ArrayList<TableName>();
|
||||||
|
for (String name : tables) {
|
||||||
|
list.add(TableName.valueOf(name));
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Print usage
|
* Print usage
|
||||||
* @param errorMsg Error message. Can be null.
|
* @param errorMsg Error message. Can be null.
|
||||||
*/
|
*/
|
||||||
private void usage(final String errorMsg) {
|
private void usage(final String errorMsg) {
|
||||||
if (errorMsg != null && errorMsg.length() > 0) {
|
if (errorMsg != null && errorMsg.length() > 0) {
|
||||||
|
@ -340,8 +351,7 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
|
System.err.println("Usage: " + NAME + " [options] <wal inputdir> <tables> [<tableMappings>]");
|
||||||
System.err.println("Read all WAL entries for <tables>.");
|
System.err.println("Read all WAL entries for <tables>.");
|
||||||
System.err.println("If no tables (\"\") are specific, all tables are imported.");
|
System.err.println("If no tables (\"\") are specific, all tables are imported.");
|
||||||
System.err.println("(Careful, even hbase:meta entries will be imported"+
|
System.err.println("(Careful, even hbase:meta entries will be imported" + " in that case.)");
|
||||||
" in that case.)");
|
|
||||||
System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
|
System.err.println("Otherwise <tables> is a comma separated list of tables.\n");
|
||||||
System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
|
System.err.println("The WAL entries can be mapped to new set of tables via <tableMapping>.");
|
||||||
System.err.println("<tableMapping> is a command separated list of targettables.");
|
System.err.println("<tableMapping> is a command separated list of targettables.");
|
||||||
|
@ -354,16 +364,14 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
|
System.err.println(" -D" + WALInputFormat.START_TIME_KEY + "=[date|ms]");
|
||||||
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
|
System.err.println(" -D" + WALInputFormat.END_TIME_KEY + "=[date|ms]");
|
||||||
System.err.println(" -D " + JOB_NAME_CONF_KEY
|
System.err.println(" -D " + JOB_NAME_CONF_KEY
|
||||||
+ "=jobName - use the specified mapreduce job name for the wal player");
|
+ "=jobName - use the specified mapreduce job name for the wal player");
|
||||||
System.err.println("For performance also consider the following options:\n"
|
System.err.println("For performance also consider the following options:\n"
|
||||||
+ " -Dmapreduce.map.speculative=false\n"
|
+ " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false");
|
||||||
+ " -Dmapreduce.reduce.speculative=false");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main entry point.
|
* Main entry point.
|
||||||
*
|
* @param args The command line parameters.
|
||||||
* @param args The command line parameters.
|
|
||||||
* @throws Exception When running the job fails.
|
* @throws Exception When running the job fails.
|
||||||
*/
|
*/
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
|
@ -88,9 +88,9 @@ public class FSTableDescriptors implements TableDescriptors {
|
||||||
/**
|
/**
|
||||||
* The file name prefix used to store HTD in HDFS
|
* The file name prefix used to store HTD in HDFS
|
||||||
*/
|
*/
|
||||||
static final String TABLEINFO_FILE_PREFIX = ".tableinfo";
|
public static final String TABLEINFO_FILE_PREFIX = ".tableinfo";
|
||||||
static final String TABLEINFO_DIR = ".tabledesc";
|
public static final String TABLEINFO_DIR = ".tabledesc";
|
||||||
static final String TMP_DIR = ".tmp";
|
public static final String TMP_DIR = ".tmp";
|
||||||
|
|
||||||
// This cache does not age out the old stuff. Thinking is that the amount
|
// This cache does not age out the old stuff. Thinking is that the amount
|
||||||
// of data we keep up in here is so small, no need to do occasional purge.
|
// of data we keep up in here is so small, no need to do occasional purge.
|
||||||
|
|
Loading…
Reference in New Issue