HBASE-14417 Incremental backup and bulk loading

This commit is contained in:
tedyu 2017-03-28 16:23:36 -07:00
parent cb4fac1d18
commit 0345fc8775
14 changed files with 1275 additions and 53 deletions

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
/**
* Implementation of a file cleaner that checks if an hfile is still referenced by backup before
* deleting it from hfile archive directory.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class BackupHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
private static final Log LOG = LogFactory.getLog(BackupHFileCleaner.class);
private boolean stopped = false;
private boolean aborted;
private Configuration conf;
private Connection connection;
private long prevReadFromBackupTbl = 0, // timestamp of most recent read from hbase:backup table
secondPrevReadFromBackupTbl = 0; // timestamp of 2nd most recent read from hbase:backup table
//used by unit test to skip reading hbase:backup
private boolean checkForFullyBackedUpTables = true;
private List<TableName> fullyBackedUpTables = null;
private Set<String> getFilenameFromBulkLoad(Map<byte[], List<Path>>[] maps) {
Set<String> filenames = new HashSet<String>();
for (Map<byte[], List<Path>> map : maps) {
if (map == null) continue;
for (List<Path> paths : map.values()) {
for (Path p : paths) {
filenames.add(p.getName());
}
}
}
return filenames;
}
private Set<String> loadHFileRefs(List<TableName> tableList) throws IOException {
if (connection == null) {
connection = ConnectionFactory.createConnection(conf);
}
try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
Map<byte[], List<Path>>[] res =
tbl.readBulkLoadedFiles(null, tableList);
secondPrevReadFromBackupTbl = prevReadFromBackupTbl;
prevReadFromBackupTbl = EnvironmentEdgeManager.currentTime();
return getFilenameFromBulkLoad(res);
}
}
@VisibleForTesting
void setCheckForFullyBackedUpTables(boolean b) {
checkForFullyBackedUpTables = b;
}
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
if (conf == null) {
return files;
}
// obtain the Set of TableName's which have been fully backed up
// so that we filter BulkLoad to be returned from server
if (checkForFullyBackedUpTables) {
if (connection == null) return files;
try (BackupSystemTable tbl = new BackupSystemTable(connection)) {
fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up, skipping checking", ioe);
return Collections.emptyList();
}
Collections.sort(fullyBackedUpTables);
}
final Set<String> hfileRefs;
try {
hfileRefs = loadHFileRefs(fullyBackedUpTables);
} catch (IOException ioe) {
LOG.error("Failed to read hfile references, skipping checking deletable files", ioe);
return Collections.emptyList();
}
Iterable<FileStatus> deletables = Iterables.filter(files, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus file) {
// If the file is recent, be conservative and wait for one more scan of hbase:backup table
if (file.getModificationTime() > secondPrevReadFromBackupTbl) {
return false;
}
String hfile = file.getPath().getName();
boolean foundHFileRef = hfileRefs.contains(hfile);
return !foundHFileRef;
}
});
return deletables;
}
@Override
public boolean isFileDeletable(FileStatus fStat) {
// work is done in getDeletableFiles()
return true;
}
@Override
public void setConf(Configuration config) {
this.conf = config;
this.connection = null;
try {
this.connection = ConnectionFactory.createConnection(conf);
} catch (IOException ioe) {
LOG.error("Couldn't establish connection", ioe);
}
}
@Override
public void stop(String why) {
if (this.stopped) {
return;
}
if (this.connection != null) {
try {
this.connection.close();
} catch (IOException ioe) {
LOG.debug("Got " + ioe + " when closing connection");
}
}
this.stopped = true;
}
@Override
public boolean isStopped() {
return this.stopped;
}
@Override
public void abort(String why, Throwable e) {
LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
this.aborted = true;
stop(why);
}
@Override
public boolean isAborted() {
return this.aborted;
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.backup;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Pair;
/**
* An Observer to facilitate backup operations
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class BackupObserver implements RegionObserver {
private static final Log LOG = LogFactory.getLog(BackupObserver.class);
@Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths,
boolean hasLoaded) throws IOException {
Configuration cfg = ctx.getEnvironment().getConfiguration();
if (!hasLoaded) {
// there is no need to record state
return hasLoaded;
}
if (finalPaths == null || !BackupManager.isBackupEnabled(cfg)) {
LOG.debug("skipping recording bulk load in postBulkLoadHFile since backup is disabled");
return hasLoaded;
}
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
HRegionInfo info = ctx.getEnvironment().getRegionInfo();
TableName tableName = info.getTable();
if (!fullyBackedUpTables.contains(tableName)) {
if (LOG.isTraceEnabled()) {
LOG.trace(tableName + " has not gone thru full backup");
}
return hasLoaded;
}
tbl.writePathsPostBulkLoad(tableName, info.getEncodedNameAsBytes(), finalPaths);
return hasLoaded;
} catch (IOException ioe) {
LOG.error("Failed to get tables which have been fully backed up", ioe);
return false;
}
}
@Override
public void preCommitStoreFile(final ObserverContext<RegionCoprocessorEnvironment> ctx,
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
Configuration cfg = ctx.getEnvironment().getConfiguration();
if (pairs == null || pairs.isEmpty() || !BackupManager.isBackupEnabled(cfg)) {
LOG.debug("skipping recording bulk load in preCommitStoreFile since backup is disabled");
return;
}
try (Connection connection = ConnectionFactory.createConnection(cfg);
BackupSystemTable tbl = new BackupSystemTable(connection)) {
List<TableName> fullyBackedUpTables = tbl.getTablesForBackupType(BackupType.FULL);
HRegionInfo info = ctx.getEnvironment().getRegionInfo();
TableName tableName = info.getTable();
if (!fullyBackedUpTables.contains(tableName)) {
if (LOG.isTraceEnabled()) {
LOG.trace(tableName + " has not gone thru full backup");
}
return;
}
tbl.writeFilesForBulkLoadPreCommit(tableName, info.getEncodedNameAsBytes(), family, pairs);
return;
}
}
}

View File

@ -188,7 +188,33 @@ public class BackupAdminImpl implements BackupAdmin {
removeTableFromBackupImage(info, tn, sysTable); removeTableFromBackupImage(info, tn, sysTable);
} }
} }
LOG.debug("Delete backup info " + backupInfo.getBackupId()); Map<byte[], String> map = sysTable.readBulkLoadedFiles(backupId);
FileSystem fs = FileSystem.get(conn.getConfiguration());
boolean succ = true;
int numDeleted = 0;
for (String f : map.values()) {
Path p = new Path(f);
try {
LOG.debug("Delete backup info " + p + " for " + backupInfo.getBackupId());
if (!fs.delete(p)) {
if (fs.exists(p)) {
LOG.warn(f + " was not deleted");
succ = false;
}
} else {
numDeleted++;
}
} catch (IOException ioe) {
LOG.warn(f + " was not deleted", ioe);
succ = false;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(numDeleted + " bulk loaded files out of " + map.size() + " were deleted");
}
if (succ) {
sysTable.deleteBulkLoadedFiles(map);
}
sysTable.deleteBackupInfo(backupInfo.getBackupId()); sysTable.deleteBackupInfo(backupInfo.getBackupId());
LOG.info("Delete backup " + backupInfo.getBackupId() + " completed."); LOG.info("Delete backup " + backupInfo.getBackupId() + " completed.");

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.procedure.ProcedureManagerHost; import org.apache.hadoop.hbase.procedure.ProcedureManagerHost;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -393,6 +395,20 @@ public class BackupManager implements Closeable {
return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir()); return systemTable.readRegionServerLastLogRollResult(backupInfo.getBackupRootDir());
} }
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
return systemTable.readBulkloadRows(tableList);
}
public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
systemTable.removeBulkLoadedRows(lst, rows);
}
public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps)
throws IOException {
systemTable.writeBulkLoadedFiles(sTableList, maps, backupInfo.getBackupId());
}
/** /**
* Get all completed backup information (in desc order by time) * Get all completed backup information (in desc order by time)
* @return history info of BackupCompleteData * @return history info of BackupCompleteData

View File

@ -22,11 +22,13 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -35,6 +37,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
@ -44,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
@ -59,6 +63,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* This class provides API to access backup system table<br> * This class provides API to access backup system table<br>
@ -77,6 +82,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public final class BackupSystemTable implements Closeable { public final class BackupSystemTable implements Closeable {
private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
static class WALItem { static class WALItem {
String backupId; String backupId;
@ -108,8 +114,6 @@ public final class BackupSystemTable implements Closeable {
} }
private static final Log LOG = LogFactory.getLog(BackupSystemTable.class);
private TableName tableName; private TableName tableName;
/** /**
* Stores backup sessions (contexts) * Stores backup sessions (contexts)
@ -119,6 +123,7 @@ public final class BackupSystemTable implements Closeable {
* Stores other meta * Stores other meta
*/ */
final static byte[] META_FAMILY = "meta".getBytes(); final static byte[] META_FAMILY = "meta".getBytes();
final static byte[] BULK_LOAD_FAMILY = "bulk".getBytes();
/** /**
* Connection to HBase cluster, shared among all instances * Connection to HBase cluster, shared among all instances
*/ */
@ -130,9 +135,22 @@ public final class BackupSystemTable implements Closeable {
private final static String INCR_BACKUP_SET = "incrbackupset:"; private final static String INCR_BACKUP_SET = "incrbackupset:";
private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:"; private final static String TABLE_RS_LOG_MAP_PREFIX = "trslm:";
private final static String RS_LOG_TS_PREFIX = "rslogts:"; private final static String RS_LOG_TS_PREFIX = "rslogts:";
private final static String BULK_LOAD_PREFIX = "bulk:";
private final static byte[] BULK_LOAD_PREFIX_BYTES = BULK_LOAD_PREFIX.getBytes();
final static byte[] TBL_COL = Bytes.toBytes("tbl");
final static byte[] FAM_COL = Bytes.toBytes("fam");
final static byte[] PATH_COL = Bytes.toBytes("path");
final static byte[] STATE_COL = Bytes.toBytes("state");
// the two states a bulk loaded file can be
final static byte[] BL_PREPARE = Bytes.toBytes("R");
final static byte[] BL_COMMIT = Bytes.toBytes("D");
private final static String WALS_PREFIX = "wals:"; private final static String WALS_PREFIX = "wals:";
private final static String SET_KEY_PREFIX = "backupset:"; private final static String SET_KEY_PREFIX = "backupset:";
// separator between BULK_LOAD_PREFIX and ordinals
protected final static String BLK_LD_DELIM = ":";
private final static byte[] EMPTY_VALUE = new byte[] {}; private final static byte[] EMPTY_VALUE = new byte[] {};
// Safe delimiter in a string // Safe delimiter in a string
@ -196,6 +214,97 @@ public final class BackupSystemTable implements Closeable {
} }
} }
/*
* @param backupId the backup Id
* @return Map of rows to path of bulk loaded hfile
*/
Map<byte[], String> readBulkLoadedFiles(String backupId) throws IOException {
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
try (Table table = connection.getTable(tableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
Map<byte[], String> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
while ((res = scanner.next()) != null) {
res.advance();
byte[] row = CellUtil.cloneRow(res.listCells().get(0));
for (Cell cell : res.listCells()) {
if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
BackupSystemTable.PATH_COL.length) == 0) {
map.put(row, Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
return map;
}
}
/*
* Used during restore
* @param backupId the backup Id
* @param sTableList List of tables
* @return array of Map of family to List of Paths
*/
public Map<byte[], List<Path>>[] readBulkLoadedFiles(String backupId, List<TableName> sTableList)
throws IOException {
Scan scan = BackupSystemTable.createScanForBulkLoadedFiles(backupId);
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList == null ? 1 : sTableList.size()];
try (Table table = connection.getTable(tableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
while ((res = scanner.next()) != null) {
res.advance();
TableName tbl = null;
byte[] fam = null;
String path = null;
for (Cell cell : res.listCells()) {
if (CellComparator.compareQualifiers(cell, BackupSystemTable.TBL_COL, 0,
BackupSystemTable.TBL_COL.length) == 0) {
tbl = TableName.valueOf(CellUtil.cloneValue(cell));
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
BackupSystemTable.FAM_COL.length) == 0) {
fam = CellUtil.cloneValue(cell);
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
BackupSystemTable.PATH_COL.length) == 0) {
path = Bytes.toString(CellUtil.cloneValue(cell));
}
}
int srcIdx = IncrementalTableBackupClient.getIndex(tbl, sTableList);
if (srcIdx == -1) {
// the table is not among the query
continue;
}
if (mapForSrc[srcIdx] == null) {
mapForSrc[srcIdx] = new TreeMap<>(Bytes.BYTES_COMPARATOR);
}
List<Path> files;
if (!mapForSrc[srcIdx].containsKey(fam)) {
files = new ArrayList<Path>();
mapForSrc[srcIdx].put(fam, files);
} else {
files = mapForSrc[srcIdx].get(fam);
}
files.add(new Path(path));
if (LOG.isDebugEnabled()) {
LOG.debug("found bulk loaded file : " + tbl + " " + Bytes.toString(fam) + " " + path);
}
};
return mapForSrc;
}
}
/*
* @param map Map of row keys to path of bulk loaded hfile
*/
void deleteBulkLoadedFiles(Map<byte[], String> map) throws IOException {
try (Table table = connection.getTable(tableName)) {
List<Delete> dels = new ArrayList<>();
for (byte[] row : map.keySet()) {
dels.add(new Delete(row).addFamily(BackupSystemTable.META_FAMILY));
}
table.delete(dels);
}
}
/** /**
* Deletes backup status from backup system table table * Deletes backup status from backup system table table
* @param backupId backup id * @param backupId backup id
@ -213,6 +322,156 @@ public final class BackupSystemTable implements Closeable {
} }
} }
/*
* For postBulkLoadHFile() hook.
* @param tabName table name
* @param region the region receiving hfile
* @param finalPaths family and associated hfiles
*/
public void writePathsPostBulkLoad(TableName tabName, byte[] region,
Map<byte[], List<Path>> finalPaths) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
finalPaths.size() + " entries");
}
try (Table table = connection.getTable(tableName)) {
List<Put> puts = BackupSystemTable.createPutForCommittedBulkload(tabName, region,
finalPaths);
table.put(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
}
}
/*
* For preCommitStoreFile() hook
* @param tabName table name
* @param region the region receiving hfile
* @param family column family
* @param pairs list of paths for hfiles
*/
public void writeFilesForBulkLoadPreCommit(TableName tabName, byte[] region,
final byte[] family, final List<Pair<Path, Path>> pairs) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("write bulk load descriptor to backup " + tabName + " with " +
pairs.size() + " entries");
}
try (Table table = connection.getTable(tableName)) {
List<Put> puts = BackupSystemTable.createPutForPreparedBulkload(tabName, region,
family, pairs);
table.put(puts);
LOG.debug("written " + puts.size() + " rows for bulk load of " + tabName);
}
}
/*
* Removes rows recording bulk loaded hfiles from backup table
* @param lst list of table names
* @param rows the rows to be deleted
*/
public void removeBulkLoadedRows(List<TableName> lst, List<byte[]> rows) throws IOException {
try (Table table = connection.getTable(tableName)) {
List<Delete> lstDels = new ArrayList<>();
for (byte[] row : rows) {
Delete del = new Delete(row);
lstDels.add(del);
LOG.debug("orig deleting the row: " + Bytes.toString(row));
}
table.delete(lstDels);
LOG.debug("deleted " + rows.size() + " original bulkload rows for " + lst.size() + " tables");
}
}
/*
* Reads the rows from backup table recording bulk loaded hfiles
* @param tableList list of table names
* @return The keys of the Map are table, region and column family.
* Value of the map reflects whether the hfile was recorded by preCommitStoreFile hook (true)
*/
public Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>>
readBulkloadRows(List<TableName> tableList) throws IOException {
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = new HashMap<>();
List<byte[]> rows = new ArrayList<>();
for (TableName tTable : tableList) {
Scan scan = BackupSystemTable.createScanForOrigBulkLoadedFiles(tTable);
Map<String, Map<String, List<Pair<String, Boolean>>>> tblMap = map.get(tTable);
try (Table table = connection.getTable(tableName);
ResultScanner scanner = table.getScanner(scan)) {
Result res = null;
while ((res = scanner.next()) != null) {
res.advance();
String fam = null;
String path = null;
boolean raw = false;
byte[] row = null;
String region = null;
for (Cell cell : res.listCells()) {
row = CellUtil.cloneRow(cell);
rows.add(row);
String rowStr = Bytes.toString(row);
region = BackupSystemTable.getRegionNameFromOrigBulkLoadRow(rowStr);
if (CellComparator.compareQualifiers(cell, BackupSystemTable.FAM_COL, 0,
BackupSystemTable.FAM_COL.length) == 0) {
fam = Bytes.toString(CellUtil.cloneValue(cell));
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.PATH_COL, 0,
BackupSystemTable.PATH_COL.length) == 0) {
path = Bytes.toString(CellUtil.cloneValue(cell));
} else if (CellComparator.compareQualifiers(cell, BackupSystemTable.STATE_COL, 0,
BackupSystemTable.STATE_COL.length) == 0) {
byte[] state = CellUtil.cloneValue(cell);
if (Bytes.equals(BackupSystemTable.BL_PREPARE, state)) {
raw = true;
} else raw = false;
}
}
if (map.get(tTable) == null) {
map.put(tTable, new HashMap<String, Map<String, List<Pair<String, Boolean>>>>());
tblMap = map.get(tTable);
}
if (tblMap.get(region) == null) {
tblMap.put(region, new HashMap<String, List<Pair<String, Boolean>>>());
}
Map<String, List<Pair<String, Boolean>>> famMap = tblMap.get(region);
if (famMap.get(fam) == null) {
famMap.put(fam, new ArrayList<Pair<String, Boolean>>());
}
famMap.get(fam).add(new Pair<>(path, raw));
LOG.debug("found orig " + path + " for " + fam + " of table " + region);
}
}
}
return new Pair<>(map, rows);
}
/*
* @param sTableList List of tables
* @param maps array of Map of family to List of Paths
* @param backupId the backup Id
*/
public void writeBulkLoadedFiles(List<TableName> sTableList, Map<byte[], List<Path>>[] maps,
String backupId) throws IOException {
try (Table table = connection.getTable(tableName)) {
long ts = EnvironmentEdgeManager.currentTime();
int cnt = 0;
List<Put> puts = new ArrayList<>();
for (int idx = 0; idx < maps.length; idx++) {
Map<byte[], List<Path>> map = maps[idx];
TableName tn = sTableList.get(idx);
if (map == null) continue;
for (Map.Entry<byte[], List<Path>> entry: map.entrySet()) {
byte[] fam = entry.getKey();
List<Path> paths = entry.getValue();
for (Path p : paths) {
Put put = BackupSystemTable.createPutForBulkLoadedFile(tn, fam, p.toString(),
backupId, ts, cnt++);
puts.add(put);
}
}
}
if (!puts.isEmpty()) {
table.put(puts);
}
}
}
/** /**
* Reads backup status object (instance of backup info) from backup system table table * Reads backup status object (instance of backup info) from backup system table table
* @param backupId backup id * @param backupId backup id
@ -399,6 +658,21 @@ public final class BackupSystemTable implements Closeable {
} }
/*
* Retrieve TableName's for completed backup of given type
* @param type backup type
* @return List of table names
*/
public List<TableName> getTablesForBackupType(BackupType type) throws IOException {
Set<TableName> names = new HashSet<>();
List<BackupInfo> infos = getBackupHistory(true);
for (BackupInfo info : infos) {
if (info.getType() != type) continue;
names.addAll(info.getTableNames());
}
return new ArrayList(names);
}
/** /**
* Get history for backup destination * Get history for backup destination
* @param backupRoot backup destination path * @param backupRoot backup destination path
@ -1233,6 +1507,119 @@ public final class BackupSystemTable implements Closeable {
return s.substring(index + 1); return s.substring(index + 1);
} }
/*
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
*/
static List<Put> createPutForCommittedBulkload(TableName table, byte[] region,
Map<byte[], List<Path>> finalPaths) {
List<Put> puts = new ArrayList<>();
for (Map.Entry<byte[], List<Path>> entry : finalPaths.entrySet()) {
for (Path path : entry.getValue()) {
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
String filename = file.substring(lastSlash+1);
Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
Bytes.toString(region), BLK_LD_DELIM, filename));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, entry.getKey());
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
file.getBytes());
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_COMMIT);
puts.add(put);
LOG.debug("writing done bulk path " + file + " for " + table + " " +
Bytes.toString(region));
}
}
return puts;
}
/*
* Creates Put's for bulk load resulting from running LoadIncrementalHFiles
*/
static List<Put> createPutForPreparedBulkload(TableName table, byte[] region,
final byte[] family, final List<Pair<Path, Path>> pairs) {
List<Put> puts = new ArrayList<>();
for (Pair<Path, Path> pair : pairs) {
Path path = pair.getSecond();
String file = path.toString();
int lastSlash = file.lastIndexOf("/");
String filename = file.substring(lastSlash+1);
Put put = new Put(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM,
Bytes.toString(region), BLK_LD_DELIM, filename));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, table.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, family);
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL,
file.getBytes());
put.addColumn(BackupSystemTable.META_FAMILY, STATE_COL, BL_PREPARE);
puts.add(put);
LOG.debug("writing raw bulk path " + file + " for " + table + " " +
Bytes.toString(region));
}
return puts;
}
public static List<Delete> createDeleteForOrigBulkLoad(List<TableName> lst) {
List<Delete> lstDels = new ArrayList<>();
for (TableName table : lst) {
Delete del = new Delete(rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM));
del.addFamily(BackupSystemTable.META_FAMILY);
lstDels.add(del);
}
return lstDels;
}
static Scan createScanForOrigBulkLoadedFiles(TableName table) throws IOException {
Scan scan = new Scan();
byte[] startRow = rowkey(BULK_LOAD_PREFIX, table.toString(), BLK_LD_DELIM);
byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
scan.withStartRow(startRow);
scan.withStopRow(stopRow);
scan.addFamily(BackupSystemTable.META_FAMILY);
scan.setMaxVersions(1);
return scan;
}
static String getTableNameFromOrigBulkLoadRow(String rowStr) {
String[] parts = rowStr.split(BLK_LD_DELIM);
return parts[1];
}
static String getRegionNameFromOrigBulkLoadRow(String rowStr) {
// format is bulk : namespace : table : region : file
String[] parts = rowStr.split(BLK_LD_DELIM);
int idx = 3;
if (parts.length == 4) {
// the table is in default namespace
idx = 2;
}
LOG.debug("bulk row string " + rowStr + " region " + parts[idx]);
return parts[idx];
}
/*
* Used to query bulk loaded hfiles which have been copied by incremental backup
* @param backupId the backup Id. It can be null when querying for all tables
* @return the Scan object
*/
static Scan createScanForBulkLoadedFiles(String backupId) throws IOException {
Scan scan = new Scan();
byte[] startRow = backupId == null ? BULK_LOAD_PREFIX_BYTES :
rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM);
byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
scan.setStartRow(startRow);
scan.setStopRow(stopRow);
//scan.setTimeRange(lower, Long.MAX_VALUE);
scan.addFamily(BackupSystemTable.META_FAMILY);
scan.setMaxVersions(1);
return scan;
}
static Put createPutForBulkLoadedFile(TableName tn, byte[] fam, String p, String backupId,
long ts, int idx) {
Put put = new Put(rowkey(BULK_LOAD_PREFIX, backupId+BLK_LD_DELIM+ts+BLK_LD_DELIM+idx));
put.addColumn(BackupSystemTable.META_FAMILY, TBL_COL, tn.getName());
put.addColumn(BackupSystemTable.META_FAMILY, FAM_COL, fam);
put.addColumn(BackupSystemTable.META_FAMILY, PATH_COL, p.getBytes());
return put;
}
/** /**
* Creates put list for list of WAL files * Creates put list for list of WAL files
* @param files list of WAL file paths * @param files list of WAL file paths
@ -1364,7 +1751,7 @@ public final class BackupSystemTable implements Closeable {
return Bytes.toString(data).substring(SET_KEY_PREFIX.length()); return Bytes.toString(data).substring(SET_KEY_PREFIX.length());
} }
private byte[] rowkey(String s, String... other) { private static byte[] rowkey(String s, String... other) {
StringBuilder sb = new StringBuilder(s); StringBuilder sb = new StringBuilder(s);
for (String ss : other) { for (String ss : other) {
sb.append(ss); sb.append(ss);

View File

@ -18,15 +18,21 @@
package org.apache.hadoop.hbase.backup.impl; package org.apache.hadoop.hbase.backup.impl;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -40,6 +46,10 @@ import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* Incremental backup implementation. * Incremental backup implementation.
@ -154,6 +164,118 @@ public class IncrementalTableBackupClient extends TableBackupClient {
return list; return list;
} }
static int getIndex(TableName tbl, List<TableName> sTableList) {
if (sTableList == null) return 0;
for (int i = 0; i < sTableList.size(); i++) {
if (tbl.equals(sTableList.get(i))) {
return i;
}
}
return -1;
}
/*
* Reads bulk load records from backup table, iterates through the records and forms the paths
* for bulk loaded hfiles. Copies the bulk loaded hfiles to backup destination
* @param sTableList list of tables to be backed up
* @return map of table to List of files
*/
Map<byte[], List<Path>>[] handleBulkLoad(List<TableName> sTableList) throws IOException {
Map<byte[], List<Path>>[] mapForSrc = new Map[sTableList.size()];
Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair =
backupManager.readBulkloadRows(sTableList);
Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> map = pair.getFirst();
FileSystem fs = FileSystem.get(conf);
FileSystem tgtFs;
try {
tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf);
} catch (URISyntaxException use) {
throw new IOException("Unable to get FileSystem", use);
}
Path rootdir = FSUtils.getRootDir(conf);
Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId);
for (Map.Entry<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>> tblEntry :
map.entrySet()) {
TableName srcTable = tblEntry.getKey();
int srcIdx = getIndex(srcTable, sTableList);
if (srcIdx < 0) {
LOG.warn("Couldn't find " + srcTable + " in source table List");
continue;
}
if (mapForSrc[srcIdx] == null) {
mapForSrc[srcIdx] = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
}
Path tblDir = FSUtils.getTableDir(rootdir, srcTable);
Path tgtTable = new Path(new Path(tgtRoot, srcTable.getNamespaceAsString()),
srcTable.getQualifierAsString());
for (Map.Entry<String,Map<String,List<Pair<String, Boolean>>>> regionEntry :
tblEntry.getValue().entrySet()){
String regionName = regionEntry.getKey();
Path regionDir = new Path(tblDir, regionName);
// map from family to List of hfiles
for (Map.Entry<String,List<Pair<String, Boolean>>> famEntry :
regionEntry.getValue().entrySet()) {
String fam = famEntry.getKey();
Path famDir = new Path(regionDir, fam);
List<Path> files;
if (!mapForSrc[srcIdx].containsKey(fam.getBytes())) {
files = new ArrayList<Path>();
mapForSrc[srcIdx].put(fam.getBytes(), files);
} else {
files = mapForSrc[srcIdx].get(fam.getBytes());
}
Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam);
String tblName = srcTable.getQualifierAsString();
Path tgtFam = new Path(new Path(tgtTable, regionName), fam);
if (!tgtFs.mkdirs(tgtFam)) {
throw new IOException("couldn't create " + tgtFam);
}
for (Pair<String, Boolean> fileWithState : famEntry.getValue()) {
String file = fileWithState.getFirst();
boolean raw = fileWithState.getSecond();
int idx = file.lastIndexOf("/");
String filename = file;
if (idx > 0) {
filename = file.substring(idx+1);
}
Path p = new Path(famDir, filename);
Path tgt = new Path(tgtFam, filename);
Path archive = new Path(archiveDir, filename);
if (fs.exists(p)) {
if (LOG.isTraceEnabled()) {
LOG.trace("found bulk hfile " + file + " in " + famDir + " for " + tblName);
}
try {
if (LOG.isTraceEnabled()) {
LOG.trace("copying " + p + " to " + tgt);
}
FileUtil.copy(fs, p, tgtFs, tgt, false,conf);
} catch (FileNotFoundException e) {
LOG.debug("copying archive " + archive + " to " + tgt);
try {
FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
} catch (FileNotFoundException fnfe) {
if (!raw) throw fnfe;
}
}
} else {
LOG.debug("copying archive " + archive + " to " + tgt);
try {
FileUtil.copy(fs, archive, tgtFs, tgt, false, conf);
} catch (FileNotFoundException fnfe) {
if (!raw) throw fnfe;
}
}
files.add(tgt);
}
}
}
}
backupManager.writeBulkLoadedFiles(sTableList, mapForSrc);
backupManager.removeBulkLoadedRows(sTableList, pair.getSecond());
return mapForSrc;
}
@Override @Override
public void execute() throws IOException { public void execute() throws IOException {
@ -204,6 +326,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
BackupUtils.getMinValue(BackupUtils BackupUtils.getMinValue(BackupUtils
.getRSLogTimestampMins(newTableSetTimestampMap)); .getRSLogTimestampMins(newTableSetTimestampMap));
backupManager.writeBackupStartCode(newStartCode); backupManager.writeBackupStartCode(newStartCode);
handleBulkLoad(backupInfo.getTableNames());
// backup complete // backup complete
completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf); completeBackup(conn, backupInfo, backupManager, BackupType.INCREMENTAL, conf);

View File

@ -19,9 +19,14 @@
package org.apache.hadoop.hbase.backup.impl; package org.apache.hadoop.hbase.backup.impl;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
@ -34,10 +39,13 @@ import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.HBackupFileSystem;
import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.RestoreRequest;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.mapreduce.MapReduceRestoreJob;
import org.apache.hadoop.hbase.backup.util.RestoreTool; import org.apache.hadoop.hbase.backup.util.RestoreTool;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
/** /**
* Restore table implementation * Restore table implementation
@ -50,6 +58,7 @@ public class RestoreTablesClient {
private Configuration conf; private Configuration conf;
private Connection conn; private Connection conn;
private String backupId; private String backupId;
private String fullBackupId;
private TableName[] sTableArray; private TableName[] sTableArray;
private TableName[] tTableArray; private TableName[] tTableArray;
private String targetRootDir; private String targetRootDir;
@ -141,6 +150,7 @@ public class RestoreTablesClient {
// We need hFS only for full restore (see the code) // We need hFS only for full restore (see the code)
BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId);
if (manifest.getType() == BackupType.FULL) { if (manifest.getType() == BackupType.FULL) {
fullBackupId = manifest.getBackupImage().getBackupId();
LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image " LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + " backup image "
+ tableBackupPath.toString()); + tableBackupPath.toString());
restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists, restoreTool.fullRestoreTable(conn, tableBackupPath, sTable, tTable, truncateIfExists,
@ -170,7 +180,6 @@ public class RestoreTablesClient {
restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable }, restoreTool.incrementalRestoreTable(conn, tableBackupPath, paths, new TableName[] { sTable },
new TableName[] { tTable }, lastIncrBackupId); new TableName[] { tTable }, lastIncrBackupId);
LOG.info(sTable + " has been successfully restored to " + tTable); LOG.info(sTable + " has been successfully restored to " + tTable);
} }
/** /**
@ -185,7 +194,7 @@ public class RestoreTablesClient {
TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException {
TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>(); TreeSet<BackupImage> restoreImageSet = new TreeSet<BackupImage>();
boolean truncateIfExists = isOverwrite; boolean truncateIfExists = isOverwrite;
try { Set<String> backupIdSet = new HashSet<>();
for (int i = 0; i < sTableArray.length; i++) { for (int i = 0; i < sTableArray.length; i++) {
TableName table = sTableArray[i]; TableName table = sTableArray[i];
BackupManifest manifest = backupManifestMap.get(table); BackupManifest manifest = backupManifestMap.get(table);
@ -208,16 +217,51 @@ public class RestoreTablesClient {
+ " " + " "
+ HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(),
table)); table));
if (image.getType() == BackupType.INCREMENTAL) {
backupIdSet.add(image.getBackupId());
LOG.debug("adding " + image.getBackupId() + " for bulk load");
}
}
}
}
try (BackupSystemTable table = new BackupSystemTable(conn)) {
List<TableName> sTableList = Arrays.asList(sTableArray);
for (String id : backupIdSet) {
LOG.debug("restoring bulk load for " + id);
Map<byte[], List<Path>>[] mapForSrc = table.readBulkLoadedFiles(id, sTableList);
Map<LoadQueueItem, ByteBuffer> loaderResult;
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
LoadIncrementalHFiles loader = MapReduceRestoreJob.createLoader(conf);
for (int i = 0; i < sTableList.size(); i++) {
if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
if (loaderResult.isEmpty()) {
String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " +tTableArray[i];
LOG.error(msg);
throw new IOException(msg);
}
} }
} }
} }
} catch (Exception e) {
LOG.error("Failed", e);
throw new IOException(e);
} }
LOG.debug("restoreStage finished"); LOG.debug("restoreStage finished");
} }
static long getTsFromBackupId(String backupId) {
if (backupId == null) {
return 0;
}
return Long.parseLong(backupId.substring(backupId.lastIndexOf("_")+1));
}
static boolean withinRange(long a, long lower, long upper) {
if (a < lower || a > upper) {
return false;
}
return true;
}
public void execute() throws IOException { public void execute() throws IOException {
// case VALIDATION: // case VALIDATION:

View File

@ -98,7 +98,7 @@ public class MapReduceRestoreJob implements RestoreJob {
result = player.run(playerArgs); result = player.run(playerArgs);
if (succeeded(result)) { if (succeeded(result)) {
// do bulk load // do bulk load
LoadIncrementalHFiles loader = createLoader(); LoadIncrementalHFiles loader = createLoader(getConf());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Restoring HFiles from directory " + bulkOutputPath); LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
} }
@ -134,13 +134,13 @@ public class MapReduceRestoreJob implements RestoreJob {
return result == 0; return result == 0;
} }
private LoadIncrementalHFiles createLoader() throws IOException { public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
// set configuration for restore: // set configuration for restore:
// LoadIncrementalHFile needs more time // LoadIncrementalHFile needs more time
// <name>hbase.rpc.timeout</name> <value>600000</value> // <name>hbase.rpc.timeout</name> <value>600000</value>
// calculates // calculates
Integer milliSecInHour = 3600000; Integer milliSecInHour = 3600000;
Configuration conf = new Configuration(getConf()); Configuration conf = new Configuration(config);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour);
// By default, it is 32 and loader will fail if # of files in any region exceed this // By default, it is 32 and loader will fail if # of files in any region exceed this

View File

@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Deque; import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -62,6 +63,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
@ -144,7 +148,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
initialize(); initialize();
} }
private void initialize() throws Exception { private void initialize() throws IOException {
if (initalized) { if (initalized) {
return; return;
} }
@ -282,6 +286,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public String toString() { public String toString() {
return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString(); return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
} }
public byte[] getFamily() {
return family;
}
public Path getFilePath() {
return hfilePath;
}
} }
/* /*
@ -1184,7 +1196,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* If the table is created for the first time, then "completebulkload" reads the files twice. * If the table is created for the first time, then "completebulkload" reads the files twice.
* More modifications necessary if we want to avoid doing it. * More modifications necessary if we want to avoid doing it.
*/ */
private void createTable(TableName tableName, String dirPath, Admin admin) throws Exception { private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
final Path hfofDir = new Path(dirPath); final Path hfofDir = new Path(dirPath);
final FileSystem fs = hfofDir.getFileSystem(getConf()); final FileSystem fs = hfofDir.getFileSystem(getConf());
@ -1238,7 +1250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} }
public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map, public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
TableName tableName) throws Exception{ TableName tableName) throws IOException {
initialize(); initialize();
try (Connection connection = ConnectionFactory.createConnection(getConf()); try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) { Admin admin = connection.getAdmin()) {
@ -1261,7 +1273,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName); try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) { RegionLocator locator = connection.getRegionLocator(tableName)) {
boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, "")); boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false);
if (dirPath != null) { if (dirPath != null) {
doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
} else { } else {

View File

@ -167,4 +167,20 @@ public class HFileArchiveUtil {
private static Path getArchivePath(final Path rootdir) { private static Path getArchivePath(final Path rootdir) {
return new Path(rootdir, HConstants.HFILE_ARCHIVE_DIRECTORY); return new Path(rootdir, HConstants.HFILE_ARCHIVE_DIRECTORY);
} }
/*
* @return table name given archive file path
*/
public static TableName getTableName(Path archivePath) {
Path p = archivePath;
String tbl = null;
// namespace is the 4th parent of file
for (int i = 0; i < 5; i++) {
if (p == null) return null;
if (i == 3) tbl = p.getName();
p = p.getParent();
}
if (p == null) return null;
return TableName.valueOf(p.getName(), tbl);
}
} }

View File

@ -49,6 +49,10 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
@ -88,6 +92,7 @@ public class TestBackupBase {
protected static String BACKUP_ROOT_DIR = "/backupUT"; protected static String BACKUP_ROOT_DIR = "/backupUT";
protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT"; protected static String BACKUP_REMOTE_ROOT_DIR = "/backupUT";
protected static String provider = "defaultProvider"; protected static String provider = "defaultProvider";
protected static boolean secure = false;
/** /**
* @throws java.lang.Exception * @throws java.lang.Exception
@ -96,6 +101,16 @@ public class TestBackupBase {
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL = new HBaseTestingUtility();
conf1 = TEST_UTIL.getConfiguration(); conf1 = TEST_UTIL.getConfiguration();
if (secure) {
// set the always on security provider
UserProvider.setUserProviderForTesting(TEST_UTIL.getConfiguration(),
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(TEST_UTIL.getConfiguration());
}
String coproc = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, (coproc == null ? "" : coproc + ",") +
BackupObserver.class.getName());
conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); conf1.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
BackupManager.decorateMasterConfiguration(conf1); BackupManager.decorateMasterConfiguration(conf1);
BackupManager.decorateRegionServerConfiguration(conf1); BackupManager.decorateRegionServerConfiguration(conf1);

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, SmallTests.class })
public class TestBackupHFileCleaner {
private static final Log LOG = LogFactory.getLog(TestBackupHFileCleaner.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf = TEST_UTIL.getConfiguration();
private static TableName tableName = TableName.valueOf("backup.hfile.cleaner");
private static String famName = "fam";
static FileSystem fs = null;
Path root;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true);
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.startMiniCluster(1);
fs = FileSystem.get(conf);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (fs != null) {
fs.close();
}
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setup() throws IOException {
root = TEST_UTIL.getDataTestDirOnTestFS();
}
@After
public void cleanup() {
try {
fs.delete(root, true);
} catch (IOException e) {
LOG.warn("Failed to delete files recursively from path " + root);
}
}
@Test
public void testGetDeletableFiles() throws IOException {
// 1. Create a file
Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
fs.createNewFile(file);
// 2. Assert file is successfully created
assertTrue("Test file not created!", fs.exists(file));
BackupHFileCleaner cleaner = new BackupHFileCleaner();
cleaner.setConf(conf);
cleaner.setCheckForFullyBackedUpTables(false);
// 3. Assert that file as is should be deletable
List<FileStatus> stats = new ArrayList<>();
FileStatus stat = fs.getFileStatus(file);
stats.add(stat);
Iterable<FileStatus> deletable = cleaner.getDeletableFiles(stats);
deletable = cleaner.getDeletableFiles(stats);
boolean found = false;
for (FileStatus stat1 : deletable) {
if (stat.equals(stat1)) found = true;
}
assertTrue("Cleaner should allow to delete this file as there is no hfile reference "
+ "for it.", found);
// 4. Add the file as bulk load
List<Path> list = new ArrayList<>(1);
list.add(file);
try (Connection conn = ConnectionFactory.createConnection(conf);
BackupSystemTable sysTbl = new BackupSystemTable(conn)) {
List<TableName> sTableList = new ArrayList<>();
sTableList.add(tableName);
Map<byte[], List<Path>>[] maps = new Map[1];
maps[0] = new HashMap<>();
maps[0].put(famName.getBytes(), list);
sysTbl.writeBulkLoadedFiles(sTableList, maps, "1");
}
// 5. Assert file should not be deletable
deletable = cleaner.getDeletableFiles(stats);
deletable = cleaner.getDeletableFiles(stats);
found = false;
for (FileStatus stat1 : deletable) {
if (stat.equals(stat1)) found = true;
}
assertFalse("Cleaner should not allow to delete this file as there is a hfile reference "
+ "for it.", found);
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.collect.Lists;
/**
* 1. Create table t1
* 2. Load data to t1
* 3 Full backup t1
* 4 Load data to t1
* 5 bulk load into t1
* 6 Incremental backup t1
*/
@Category(LargeTests.class)
@RunWith(Parameterized.class)
public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
private static final Log LOG = LogFactory.getLog(TestIncrementalBackupDeleteTable.class);
@Parameterized.Parameters
public static Collection<Object[]> data() {
secure = true;
List<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[] {Boolean.TRUE});
return params;
}
public TestIncrementalBackupWithBulkLoad(Boolean b) {
}
// implement all test cases in 1 test since incremental backup/restore has dependencies
@Test
public void TestIncBackupDeleteTable() throws Exception {
String testName = "TestIncBackupDeleteTable";
// #1 - create full backup for all tables
LOG.info("create full backup image for all tables");
List<TableName> tables = Lists.newArrayList(table1);
HBaseAdmin admin = null;
Connection conn = ConnectionFactory.createConnection(conf1);
admin = (HBaseAdmin) conn.getAdmin();
BackupAdminImpl client = new BackupAdminImpl(conn);
BackupRequest request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
String backupIdFull = client.backupTables(request);
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table table1
HTable t1 = (HTable) conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
t1.put(p1);
}
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH * 2);
t1.close();
int NB_ROWS2 = 20;
LOG.debug("bulk loading into " + testName);
int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
qualName, false, null, new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
}, true, false, true, NB_ROWS_IN_BATCH*2, NB_ROWS2);
// #3 - incremental backup for table1
tables = Lists.newArrayList(table1);
request = createBackupRequest(BackupType.INCREMENTAL, tables, BACKUP_ROOT_DIR);
String backupIdIncMultiple = client.backupTables(request);
assertTrue(checkSucceeded(backupIdIncMultiple));
// #5.1 - check tables for full restore */
HBaseAdmin hAdmin = TEST_UTIL.getHBaseAdmin();
// #6 - restore incremental backup for table1
TableName[] tablesRestoreIncMultiple = new TableName[] { table1 };
TableName[] tablesMapIncMultiple = new TableName[] { table1_restore };
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
HTable hTable = (HTable) conn.getTable(table1_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2+actual);
request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
backupIdFull = client.backupTables(request);
try (final BackupSystemTable table = new BackupSystemTable(conn)) {
Pair<Map<TableName, Map<String, Map<String, List<Pair<String, Boolean>>>>>, List<byte[]>> pair
= table.readBulkloadRows(tables);
assertTrue("map still has " + pair.getSecond().size() + " entries",
pair.getSecond().isEmpty());
}
assertTrue(checkSucceeded(backupIdFull));
hTable.close();
admin.close();
conn.close();
}
}

View File

@ -308,13 +308,14 @@ public class TestLoadIncrementalHFiles {
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false); runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
} }
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
boolean copyFiles) throws Exception { byte[][][] hfileRanges, boolean useMap, boolean deleteFile,
boolean copyFiles, int initRowCount, int factor) throws Exception {
Path dir = util.getDataTestDirOnTestFS(testName); Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem(); FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs); dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY)); Path familyDir = new Path(dir, Bytes.toString(fam));
int hfileIdx = 0; int hfileIdx = 0;
Map<byte[], List<Path>> map = null; Map<byte[], List<Path>> map = null;
@ -324,26 +325,26 @@ public class TestLoadIncrementalHFiles {
} }
if (useMap) { if (useMap) {
map = new TreeMap<>(Bytes.BYTES_COMPARATOR); map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
map.put(FAMILY, list); map.put(fam, list);
} }
Path last = null; Path last = null;
for (byte[][] range : hfileRanges) { for (byte[][] range : hfileRanges) {
byte[] from = range[0]; byte[] from = range[0];
byte[] to = range[1]; byte[] to = range[1];
Path path = new Path(familyDir, "hfile_" + hfileIdx++); Path path = new Path(familyDir, "hfile_" + hfileIdx++);
HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
if (useMap) { if (useMap) {
last = path; last = path;
list.add(path); list.add(path);
} }
} }
int expectedRows = hfileIdx * 1000; int expectedRows = hfileIdx * factor;
if (preCreateTable || map != null) { final TableName tableName = htd.getTableName();
if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
util.getAdmin().createTable(htd, tableSplitKeys); util.getAdmin().createTable(htd, tableSplitKeys);
} }
final TableName tableName = htd.getTableName();
Configuration conf = util.getConfiguration(); Configuration conf = util.getConfiguration();
if (copyFiles) { if (copyFiles) {
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
@ -351,33 +352,46 @@ public class TestLoadIncrementalHFiles {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String [] args= {dir.toString(), tableName.toString()}; String [] args= {dir.toString(), tableName.toString()};
if (useMap) { if (useMap) {
fs.delete(last); if (deleteFile) fs.delete(last);
Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName); Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
if (deleteFile) {
expectedRows -= 1000; expectedRows -= 1000;
for (LoadQueueItem item : loaded.keySet()) { for (LoadQueueItem item : loaded.keySet()) {
if (item.hfilePath.getName().equals(last.getName())) { if (item.hfilePath.getName().equals(last.getName())) {
fail(last + " should be missing"); fail(last + " should be missing");
} }
} }
}
} else { } else {
loader.run(args); loader.run(args);
} }
if (copyFiles) { if (copyFiles) {
for (Path p : list) { for (Path p : list) {
assertTrue(fs.exists(p)); assertTrue(p + " should exist", fs.exists(p));
} }
} }
Table table = util.getConnection().getTable(tableName); Table table = util.getConnection().getTable(tableName);
try { try {
assertEquals(expectedRows, util.countRows(table)); assertEquals(initRowCount + expectedRows, util.countRows(table));
} finally { } finally {
table.close(); table.close();
} }
return expectedRows;
}
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
boolean copyFiles) throws Exception {
loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys,
hfileRanges, useMap, true, copyFiles, 0, 1000);
final TableName tableName = htd.getTableName();
// verify staging folder has been cleaned up // verify staging folder has been cleaned up
Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME); Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
FileSystem fs = util.getTestFileSystem();
if(fs.exists(stagingBasePath)) { if(fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath); FileStatus[] files = fs.listStatus(stagingBasePath);
for(FileStatus file : files) { for(FileStatus file : files) {