Revert "HBASE-25891 Remove dependence on storing WAL filenames for backup (#3359)"
This reverts commit df57b1ca6b
.
This commit is contained in:
parent
df57b1ca6b
commit
fc5a8fd3bf
|
@ -28,6 +28,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos.BackupInfo.Builder;
|
||||
|
@ -134,13 +136,7 @@ public class BackupInfo implements Comparable<BackupInfo> {
|
|||
* New region server log timestamps for table set after distributed log roll key - table name,
|
||||
* value - map of RegionServer hostname -> last log rolled timestamp
|
||||
*/
|
||||
private Map<TableName, Map<String, Long>> tableSetTimestampMap;
|
||||
|
||||
/**
|
||||
* Previous Region server log timestamps for table set after distributed log roll key -
|
||||
* table name, value - map of RegionServer hostname -> last log rolled timestamp
|
||||
*/
|
||||
private Map<TableName, Map<String, Long>> incrTimestampMap;
|
||||
private HashMap<TableName, HashMap<String, Long>> tableSetTimestampMap;
|
||||
|
||||
/**
|
||||
* Backup progress in %% (0-100)
|
||||
|
@ -194,12 +190,12 @@ public class BackupInfo implements Comparable<BackupInfo> {
|
|||
this.backupTableInfoMap = backupTableInfoMap;
|
||||
}
|
||||
|
||||
public Map<TableName, Map<String, Long>> getTableSetTimestampMap() {
|
||||
public HashMap<TableName, HashMap<String, Long>> getTableSetTimestampMap() {
|
||||
return tableSetTimestampMap;
|
||||
}
|
||||
|
||||
public void setTableSetTimestampMap(Map<TableName,
|
||||
Map<String, Long>> tableSetTimestampMap) {
|
||||
public void setTableSetTimestampMap(HashMap<TableName,
|
||||
HashMap<String, Long>> tableSetTimestampMap) {
|
||||
this.tableSetTimestampMap = tableSetTimestampMap;
|
||||
}
|
||||
|
||||
|
@ -355,19 +351,19 @@ public class BackupInfo implements Comparable<BackupInfo> {
|
|||
|
||||
/**
|
||||
* Set the new region server log timestamps after distributed log roll
|
||||
* @param prevTableSetTimestampMap table timestamp map
|
||||
* @param newTableSetTimestampMap table timestamp map
|
||||
*/
|
||||
public void setIncrTimestampMap(Map<TableName,
|
||||
Map<String, Long>> prevTableSetTimestampMap) {
|
||||
this.incrTimestampMap = prevTableSetTimestampMap;
|
||||
public void setIncrTimestampMap(HashMap<TableName,
|
||||
HashMap<String, Long>> newTableSetTimestampMap) {
|
||||
this.tableSetTimestampMap = newTableSetTimestampMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get new region server log timestamps after distributed log roll
|
||||
* @return new region server log timestamps
|
||||
*/
|
||||
public Map<TableName, Map<String, Long>> getIncrTimestampMap() {
|
||||
return this.incrTimestampMap;
|
||||
public HashMap<TableName, HashMap<String, Long>> getIncrTimestampMap() {
|
||||
return this.tableSetTimestampMap;
|
||||
}
|
||||
|
||||
public TableName getTableBySnapshot(String snapshotName) {
|
||||
|
@ -383,7 +379,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
|
|||
BackupProtos.BackupInfo.Builder builder = BackupProtos.BackupInfo.newBuilder();
|
||||
builder.setBackupId(getBackupId());
|
||||
setBackupTableInfoMap(builder);
|
||||
setTableSetTimestampMap(builder);
|
||||
builder.setCompleteTs(getCompleteTs());
|
||||
if (getFailedMsg() != null) {
|
||||
builder.setFailedMessage(getFailedMsg());
|
||||
|
@ -451,16 +446,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
|
|||
}
|
||||
}
|
||||
|
||||
private void setTableSetTimestampMap(Builder builder) {
|
||||
if (this.getTableSetTimestampMap() != null) {
|
||||
for (Entry<TableName, Map<String, Long>> entry : this.getTableSetTimestampMap().entrySet()) {
|
||||
builder.putTableSetTimestamp(entry.getKey().getNameAsString(),
|
||||
BackupProtos.BackupInfo.RSTimestampMap.newBuilder().putAllRsTimestamp(entry.getValue())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static BackupInfo fromByteArray(byte[] data) throws IOException {
|
||||
return fromProto(BackupProtos.BackupInfo.parseFrom(data));
|
||||
}
|
||||
|
@ -473,7 +458,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
|
|||
BackupInfo context = new BackupInfo();
|
||||
context.setBackupId(proto.getBackupId());
|
||||
context.setBackupTableInfoMap(toMap(proto.getBackupTableInfoList()));
|
||||
context.setTableSetTimestampMap(getTableSetTimestampMap(proto.getTableSetTimestampMap()));
|
||||
context.setCompleteTs(proto.getCompleteTs());
|
||||
if (proto.hasFailedMessage()) {
|
||||
context.setFailedMsg(proto.getFailedMessage());
|
||||
|
@ -507,17 +491,6 @@ public class BackupInfo implements Comparable<BackupInfo> {
|
|||
return map;
|
||||
}
|
||||
|
||||
private static Map<TableName, Map<String, Long>> getTableSetTimestampMap(
|
||||
Map<String, BackupProtos.BackupInfo.RSTimestampMap> map) {
|
||||
Map<TableName, Map<String, Long>> tableSetTimestampMap = new HashMap<>();
|
||||
for (Entry<String, BackupProtos.BackupInfo.RSTimestampMap> entry : map.entrySet()) {
|
||||
tableSetTimestampMap
|
||||
.put(TableName.valueOf(entry.getKey()), entry.getValue().getRsTimestampMap());
|
||||
}
|
||||
|
||||
return tableSetTimestampMap;
|
||||
}
|
||||
|
||||
public String getShortDescription() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("{");
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -481,7 +482,7 @@ public class BackupManager implements Closeable {
|
|||
* @throws IOException exception
|
||||
*/
|
||||
public void writeRegionServerLogTimestamp(Set<TableName> tables,
|
||||
Map<String, Long> newTimestamps) throws IOException {
|
||||
HashMap<String, Long> newTimestamps) throws IOException {
|
||||
systemTable.writeRegionServerLogTimestamp(tables, newTimestamps, backupInfo.getBackupRootDir());
|
||||
}
|
||||
|
||||
|
@ -492,7 +493,7 @@ public class BackupManager implements Closeable {
|
|||
* RegionServer,PreviousTimeStamp
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public Map<TableName, Map<String, Long>> readLogTimestampMap() throws IOException {
|
||||
public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap() throws IOException {
|
||||
return systemTable.readLogTimestampMap(backupInfo.getBackupRootDir());
|
||||
}
|
||||
|
||||
|
@ -514,6 +515,24 @@ public class BackupManager implements Closeable {
|
|||
systemTable.addIncrementalBackupTableSet(tables, backupInfo.getBackupRootDir());
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves list of WAL files after incremental backup operation. These files will be stored until
|
||||
* TTL expiration and are used by Backup Log Cleaner plug-in to determine which WAL files can be
|
||||
* safely purged.
|
||||
*/
|
||||
public void recordWALFiles(List<String> files) throws IOException {
|
||||
systemTable.addWALFiles(files, backupInfo.getBackupId(), backupInfo.getBackupRootDir());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get WAL files iterator.
|
||||
* @return WAL files iterator from backup system table
|
||||
* @throws IOException if getting the WAL files iterator fails
|
||||
*/
|
||||
public Iterator<BackupSystemTable.WALItem> getWALFilesFromBackupSystem() throws IOException {
|
||||
return systemTable.getWALFilesIterator(backupInfo.getBackupRootDir());
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
return conn;
|
||||
}
|
||||
|
|
|
@ -116,7 +116,7 @@ public class BackupManifest {
|
|||
private long startTs;
|
||||
private long completeTs;
|
||||
private ArrayList<BackupImage> ancestors;
|
||||
private Map<TableName, Map<String, Long>> incrTimeRanges;
|
||||
private HashMap<TableName, HashMap<String, Long>> incrTimeRanges;
|
||||
|
||||
static Builder newBuilder() {
|
||||
return new Builder();
|
||||
|
@ -187,11 +187,11 @@ public class BackupManifest {
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
private static Map<TableName, Map<String, Long>> loadIncrementalTimestampMap(
|
||||
private static HashMap<TableName, HashMap<String, Long>> loadIncrementalTimestampMap(
|
||||
BackupProtos.BackupImage proto) {
|
||||
List<BackupProtos.TableServerTimestamp> list = proto.getTstMapList();
|
||||
|
||||
Map<TableName, Map<String, Long>> incrTimeRanges = new HashMap<>();
|
||||
HashMap<TableName, HashMap<String, Long>> incrTimeRanges = new HashMap<>();
|
||||
|
||||
if (list == null || list.size() == 0) {
|
||||
return incrTimeRanges;
|
||||
|
@ -199,7 +199,7 @@ public class BackupManifest {
|
|||
|
||||
for (BackupProtos.TableServerTimestamp tst : list) {
|
||||
TableName tn = ProtobufUtil.toTableName(tst.getTableName());
|
||||
Map<String, Long> map = incrTimeRanges.get(tn);
|
||||
HashMap<String, Long> map = incrTimeRanges.get(tn);
|
||||
if (map == null) {
|
||||
map = new HashMap<>();
|
||||
incrTimeRanges.put(tn, map);
|
||||
|
@ -217,9 +217,9 @@ public class BackupManifest {
|
|||
if (this.incrTimeRanges == null) {
|
||||
return;
|
||||
}
|
||||
for (Entry<TableName, Map<String, Long>> entry : this.incrTimeRanges.entrySet()) {
|
||||
for (Entry<TableName, HashMap<String, Long>> entry : this.incrTimeRanges.entrySet()) {
|
||||
TableName key = entry.getKey();
|
||||
Map<String, Long> value = entry.getValue();
|
||||
HashMap<String, Long> value = entry.getValue();
|
||||
BackupProtos.TableServerTimestamp.Builder tstBuilder =
|
||||
BackupProtos.TableServerTimestamp.newBuilder();
|
||||
tstBuilder.setTableName(ProtobufUtil.toProtoTableName(key));
|
||||
|
@ -359,11 +359,11 @@ public class BackupManifest {
|
|||
return hash;
|
||||
}
|
||||
|
||||
public Map<TableName, Map<String, Long>> getIncrTimeRanges() {
|
||||
public HashMap<TableName, HashMap<String, Long>> getIncrTimeRanges() {
|
||||
return incrTimeRanges;
|
||||
}
|
||||
|
||||
private void setIncrTimeRanges(Map<TableName, Map<String, Long>> incrTimeRanges) {
|
||||
private void setIncrTimeRanges(HashMap<TableName, HashMap<String, Long>> incrTimeRanges) {
|
||||
this.incrTimeRanges = incrTimeRanges;
|
||||
}
|
||||
}
|
||||
|
@ -512,11 +512,11 @@ public class BackupManifest {
|
|||
* Set the incremental timestamp map directly.
|
||||
* @param incrTimestampMap timestamp map
|
||||
*/
|
||||
public void setIncrTimestampMap(Map<TableName, Map<String, Long>> incrTimestampMap) {
|
||||
public void setIncrTimestampMap(HashMap<TableName, HashMap<String, Long>> incrTimestampMap) {
|
||||
this.backupImage.setIncrTimeRanges(incrTimestampMap);
|
||||
}
|
||||
|
||||
public Map<TableName, Map<String, Long>> getIncrTimestampMap() {
|
||||
public Map<TableName, HashMap<String, Long>> getIncrTimestampMap() {
|
||||
return backupImage.getIncrTimeRanges();
|
||||
}
|
||||
|
||||
|
|
|
@ -34,9 +34,11 @@ import java.util.Set;
|
|||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
|
@ -63,12 +65,14 @@ import org.apache.hadoop.hbase.client.SnapshotDescription;
|
|||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.BackupProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
|
@ -174,10 +178,11 @@ public final class BackupSystemTable implements Closeable {
|
|||
final static byte[] BL_PREPARE = Bytes.toBytes("R");
|
||||
final static byte[] BL_COMMIT = Bytes.toBytes("D");
|
||||
|
||||
private final static String WALS_PREFIX = "wals:";
|
||||
private final static String SET_KEY_PREFIX = "backupset:";
|
||||
|
||||
// separator between BULK_LOAD_PREFIX and ordinals
|
||||
private final static String BLK_LD_DELIM = ":";
|
||||
protected final static String BLK_LD_DELIM = ":";
|
||||
private final static byte[] EMPTY_VALUE = new byte[] {};
|
||||
|
||||
// Safe delimiter in a string
|
||||
|
@ -853,7 +858,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
* @throws IOException exception
|
||||
*/
|
||||
public void writeRegionServerLogTimestamp(Set<TableName> tables,
|
||||
Map<String, Long> newTimestamps, String backupRoot) throws IOException {
|
||||
HashMap<String, Long> newTimestamps, String backupRoot) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("write RS log time stamps to backup system table for tables ["
|
||||
+ StringUtils.join(tables, ",") + "]");
|
||||
|
@ -878,13 +883,13 @@ public final class BackupSystemTable implements Closeable {
|
|||
* RegionServer,PreviousTimeStamp
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public Map<TableName, Map<String, Long>> readLogTimestampMap(String backupRoot)
|
||||
public HashMap<TableName, HashMap<String, Long>> readLogTimestampMap(String backupRoot)
|
||||
throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("read RS log ts from backup system table for root=" + backupRoot);
|
||||
}
|
||||
|
||||
Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
|
||||
HashMap<TableName, HashMap<String, Long>> tableTimestampMap = new HashMap<>();
|
||||
|
||||
Scan scan = createScanForReadLogTimestampMap(backupRoot);
|
||||
try (Table table = connection.getTable(tableName);
|
||||
|
@ -1006,6 +1011,148 @@ public final class BackupSystemTable implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register WAL files as eligible for deletion
|
||||
* @param files files
|
||||
* @param backupId backup id
|
||||
* @param backupRoot root directory path to backup destination
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public void addWALFiles(List<String> files, String backupId, String backupRoot)
|
||||
throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("add WAL files to backup system table: " + backupId + " " + backupRoot + " files ["
|
||||
+ StringUtils.join(files, ",") + "]");
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
files.forEach(file -> LOG.debug("add :" + file));
|
||||
}
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
List<Put> puts = createPutsForAddWALFiles(files, backupId, backupRoot);
|
||||
table.put(puts);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register WAL files as eligible for deletion
|
||||
* @param backupRoot root directory path to backup
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public Iterator<WALItem> getWALFilesIterator(String backupRoot) throws IOException {
|
||||
LOG.trace("get WAL files from backup system table");
|
||||
|
||||
final Table table = connection.getTable(tableName);
|
||||
Scan scan = createScanForGetWALs(backupRoot);
|
||||
final ResultScanner scanner = table.getScanner(scan);
|
||||
final Iterator<Result> it = scanner.iterator();
|
||||
return new Iterator<WALItem>() {
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
boolean next = it.hasNext();
|
||||
if (!next) {
|
||||
// close all
|
||||
try {
|
||||
scanner.close();
|
||||
table.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Close WAL Iterator", e);
|
||||
}
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WALItem next() {
|
||||
Result next = it.next();
|
||||
List<Cell> cells = next.listCells();
|
||||
byte[] buf = cells.get(0).getValueArray();
|
||||
int len = cells.get(0).getValueLength();
|
||||
int offset = cells.get(0).getValueOffset();
|
||||
String backupId = new String(buf, offset, len);
|
||||
buf = cells.get(1).getValueArray();
|
||||
len = cells.get(1).getValueLength();
|
||||
offset = cells.get(1).getValueOffset();
|
||||
String walFile = new String(buf, offset, len);
|
||||
buf = cells.get(2).getValueArray();
|
||||
len = cells.get(2).getValueLength();
|
||||
offset = cells.get(2).getValueOffset();
|
||||
String backupRoot = new String(buf, offset, len);
|
||||
return new WALItem(backupId, walFile, backupRoot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
// not implemented
|
||||
throw new RuntimeException("remove is not supported");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if WAL file is eligible for deletion Future: to support all backup destinations
|
||||
* @param file name of a file to check
|
||||
* @return true, if deletable, false otherwise.
|
||||
* @throws IOException exception
|
||||
*/
|
||||
// TODO: multiple backup destination support
|
||||
public boolean isWALFileDeletable(String file) throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Check if WAL file has been already backed up in backup system table " + file);
|
||||
}
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
Get get = createGetForCheckWALFile(file);
|
||||
Result res = table.get(get);
|
||||
return (!res.isEmpty());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if WAL file is eligible for deletion using multi-get
|
||||
* @param files names of a file to check
|
||||
* @return map of results (key: FileStatus object. value: true if the file is deletable, false
|
||||
* otherwise)
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public Map<FileStatus, Boolean> areWALFilesDeletable(Iterable<FileStatus> files)
|
||||
throws IOException {
|
||||
final int BUF_SIZE = 100;
|
||||
|
||||
Map<FileStatus, Boolean> ret = new HashMap<>();
|
||||
try (Table table = connection.getTable(tableName)) {
|
||||
List<Get> getBuffer = new ArrayList<>();
|
||||
List<FileStatus> fileStatuses = new ArrayList<>();
|
||||
|
||||
for (FileStatus file : files) {
|
||||
String fn = file.getPath().getName();
|
||||
if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) {
|
||||
ret.put(file, true);
|
||||
continue;
|
||||
}
|
||||
String wal = file.getPath().toString();
|
||||
Get get = createGetForCheckWALFile(wal);
|
||||
getBuffer.add(get);
|
||||
fileStatuses.add(file);
|
||||
if (getBuffer.size() >= BUF_SIZE) {
|
||||
Result[] results = table.get(getBuffer);
|
||||
for (int i = 0; i < results.length; i++) {
|
||||
ret.put(fileStatuses.get(i), !results[i].isEmpty());
|
||||
}
|
||||
getBuffer.clear();
|
||||
fileStatuses.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if (!getBuffer.isEmpty()) {
|
||||
Result[] results = table.get(getBuffer);
|
||||
for (int i = 0; i < results.length; i++) {
|
||||
ret.put(fileStatuses.get(i), !results[i].isEmpty());
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if we have at least one backup session in backup system table This API is used by
|
||||
* BackupLogCleaner
|
||||
|
@ -1506,7 +1653,7 @@ public final class BackupSystemTable implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
|
||||
protected static boolean snapshotExists(Admin admin, String snapshotName) throws IOException {
|
||||
List<SnapshotDescription> list = admin.listSnapshots();
|
||||
for (SnapshotDescription desc : list) {
|
||||
if (desc.getName().equals(snapshotName)) {
|
||||
|
@ -1760,6 +1907,56 @@ public final class BackupSystemTable implements Closeable {
|
|||
return put;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates put list for list of WAL files
|
||||
* @param files list of WAL file paths
|
||||
* @param backupId backup id
|
||||
* @return put list
|
||||
*/
|
||||
private List<Put> createPutsForAddWALFiles(List<String> files, String backupId,
|
||||
String backupRoot) {
|
||||
List<Put> puts = new ArrayList<>(files.size());
|
||||
for (String file : files) {
|
||||
Put put = new Put(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("backupId"),
|
||||
Bytes.toBytes(backupId));
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("file"), Bytes.toBytes(file));
|
||||
put.addColumn(BackupSystemTable.META_FAMILY, Bytes.toBytes("root"),
|
||||
Bytes.toBytes(backupRoot));
|
||||
puts.add(put);
|
||||
}
|
||||
return puts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates Scan operation to load WALs
|
||||
* @param backupRoot path to backup destination
|
||||
* @return scan operation
|
||||
*/
|
||||
private Scan createScanForGetWALs(String backupRoot) {
|
||||
// TODO: support for backupRoot
|
||||
Scan scan = new Scan();
|
||||
byte[] startRow = Bytes.toBytes(WALS_PREFIX);
|
||||
byte[] stopRow = Arrays.copyOf(startRow, startRow.length);
|
||||
stopRow[stopRow.length - 1] = (byte) (stopRow[stopRow.length - 1] + 1);
|
||||
scan.withStartRow(startRow);
|
||||
scan.withStopRow(stopRow);
|
||||
scan.addFamily(BackupSystemTable.META_FAMILY);
|
||||
return scan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates Get operation for a given wal file name TODO: support for backup destination
|
||||
* @param file file
|
||||
* @return get operation
|
||||
*/
|
||||
private Get createGetForCheckWALFile(String file) {
|
||||
Get get = new Get(rowkey(WALS_PREFIX, BackupUtils.getUniqueWALFileNamePart(file)));
|
||||
// add backup root column
|
||||
get.addFamily(BackupSystemTable.META_FAMILY);
|
||||
return get;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates Scan operation to load backup set list
|
||||
* @return scan operation
|
||||
|
|
|
@ -27,6 +27,7 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CON
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -159,6 +160,14 @@ public class FullTableBackupClient extends TableBackupClient {
|
|||
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
|
||||
|
||||
newTimestamps = backupManager.readRegionServerLastLogRollResult();
|
||||
if (firstBackup) {
|
||||
// Updates registered log files
|
||||
// We record ALL old WAL files as registered, because
|
||||
// this is a first full backup in the system and these
|
||||
// files are not needed for next incremental backup
|
||||
List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
|
||||
backupManager.recordWALFiles(logFiles);
|
||||
}
|
||||
|
||||
// SNAPSHOT_TABLES:
|
||||
backupInfo.setPhase(BackupPhase.SNAPSHOT);
|
||||
|
@ -186,10 +195,9 @@ public class FullTableBackupClient extends TableBackupClient {
|
|||
// For incremental backup, it contains the incremental backup table set.
|
||||
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
|
||||
|
||||
Map<TableName, Map<String, Long>> newTableSetTimestampMap =
|
||||
HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
|
||||
backupManager.readLogTimestampMap();
|
||||
|
||||
backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
|
||||
Long newStartCode =
|
||||
BackupUtils.getMinValue(BackupUtils
|
||||
.getRSLogTimestampMins(newTableSetTimestampMap));
|
||||
|
|
|
@ -21,8 +21,11 @@ package org.apache.hadoop.hbase.backup.impl;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -30,6 +33,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable.WALItem;
|
||||
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
|
||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
|
@ -60,16 +64,16 @@ public class IncrementalBackupManager extends BackupManager {
|
|||
* @return The new HashMap of RS log time stamps after the log roll for this incremental backup.
|
||||
* @throws IOException exception
|
||||
*/
|
||||
public Map<String, Long> getIncrBackupLogFileMap() throws IOException {
|
||||
public HashMap<String, Long> getIncrBackupLogFileMap() throws IOException {
|
||||
List<String> logList;
|
||||
Map<String, Long> newTimestamps;
|
||||
Map<String, Long> previousTimestampMins;
|
||||
HashMap<String, Long> newTimestamps;
|
||||
HashMap<String, Long> previousTimestampMins;
|
||||
|
||||
String savedStartCode = readBackupStartCode();
|
||||
|
||||
// key: tableName
|
||||
// value: <RegionServer,PreviousTimeStamp>
|
||||
Map<TableName, Map<String, Long>> previousTimestampMap = readLogTimestampMap();
|
||||
HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
|
||||
|
||||
previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
|
||||
|
||||
|
@ -95,19 +99,68 @@ public class IncrementalBackupManager extends BackupManager {
|
|||
newTimestamps = readRegionServerLastLogRollResult();
|
||||
|
||||
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
|
||||
logList = excludeProcV2WALs(logList);
|
||||
List<WALItem> logFromSystemTable =
|
||||
getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
|
||||
.getBackupRootDir());
|
||||
logList = excludeAlreadyBackedUpAndProcV2WALs(logList, logFromSystemTable);
|
||||
backupInfo.setIncrBackupFileList(logList);
|
||||
|
||||
return newTimestamps;
|
||||
}
|
||||
|
||||
private List<String> excludeProcV2WALs(List<String> logList) {
|
||||
/**
|
||||
* Get list of WAL files eligible for incremental backup.
|
||||
*
|
||||
* @return list of WAL files
|
||||
* @throws IOException if getting the list of WAL files fails
|
||||
*/
|
||||
public List<String> getIncrBackupLogFileList() throws IOException {
|
||||
List<String> logList;
|
||||
HashMap<String, Long> newTimestamps;
|
||||
HashMap<String, Long> previousTimestampMins;
|
||||
|
||||
String savedStartCode = readBackupStartCode();
|
||||
|
||||
// key: tableName
|
||||
// value: <RegionServer,PreviousTimeStamp>
|
||||
HashMap<TableName, HashMap<String, Long>> previousTimestampMap = readLogTimestampMap();
|
||||
|
||||
previousTimestampMins = BackupUtils.getRSLogTimestampMins(previousTimestampMap);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("StartCode " + savedStartCode + "for backupID " + backupInfo.getBackupId());
|
||||
}
|
||||
// get all new log files from .logs and .oldlogs after last TS and before new timestamp
|
||||
if (savedStartCode == null || previousTimestampMins == null
|
||||
|| previousTimestampMins.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Cannot read any previous back up timestamps from backup system table. "
|
||||
+ "In order to create an incremental backup, at least one full backup is needed.");
|
||||
}
|
||||
|
||||
newTimestamps = readRegionServerLastLogRollResult();
|
||||
|
||||
logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode);
|
||||
List<WALItem> logFromSystemTable =
|
||||
getLogFilesFromBackupSystem(previousTimestampMins, newTimestamps, getBackupInfo()
|
||||
.getBackupRootDir());
|
||||
|
||||
logList = excludeAlreadyBackedUpAndProcV2WALs(logList, logFromSystemTable);
|
||||
backupInfo.setIncrBackupFileList(logList);
|
||||
|
||||
return logList;
|
||||
}
|
||||
|
||||
private List<String> excludeAlreadyBackedUpAndProcV2WALs(List<String> logList,
|
||||
List<WALItem> logFromSystemTable) {
|
||||
Set<String> walFileNameSet = convertToSet(logFromSystemTable);
|
||||
|
||||
List<String> list = new ArrayList<>();
|
||||
for (int i=0; i < logList.size(); i++) {
|
||||
Path p = new Path(logList.get(i));
|
||||
String name = p.getName();
|
||||
|
||||
if (name.startsWith(WALProcedureStore.LOG_PREFIX)) {
|
||||
if (walFileNameSet.contains(name) || name.startsWith(WALProcedureStore.LOG_PREFIX)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -116,6 +169,65 @@ public class IncrementalBackupManager extends BackupManager {
|
|||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Set of WAL file names (not full path names)
|
||||
* @param logFromSystemTable the logs from the system table to convert
|
||||
* @return set of WAL file names
|
||||
*/
|
||||
private Set<String> convertToSet(List<WALItem> logFromSystemTable) {
|
||||
Set<String> set = new HashSet<>();
|
||||
for (int i=0; i < logFromSystemTable.size(); i++) {
|
||||
WALItem item = logFromSystemTable.get(i);
|
||||
set.add((new Path(item.walFile)).getName());
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
/**
|
||||
* For each region server: get all log files newer than the last timestamps, but not newer than
|
||||
* the newest timestamps.
|
||||
* @param olderTimestamps timestamp map for each region server of the last backup.
|
||||
* @param newestTimestamps timestamp map for each region server that the backup should lead to.
|
||||
* @return list of log files which needs to be added to this backup
|
||||
* @throws IOException if getting the WAL files from the backup system fails
|
||||
*/
|
||||
private List<WALItem> getLogFilesFromBackupSystem(HashMap<String, Long> olderTimestamps,
|
||||
HashMap<String, Long> newestTimestamps, String backupRoot) throws IOException {
|
||||
List<WALItem> logFiles = new ArrayList<>();
|
||||
Iterator<WALItem> it = getWALFilesFromBackupSystem();
|
||||
while (it.hasNext()) {
|
||||
WALItem item = it.next();
|
||||
String rootDir = item.getBackupRoot();
|
||||
if (!rootDir.equals(backupRoot)) {
|
||||
continue;
|
||||
}
|
||||
String walFileName = item.getWalFile();
|
||||
String server = BackupUtils.parseHostNameFromLogFile(new Path(walFileName));
|
||||
if (server == null) {
|
||||
continue;
|
||||
}
|
||||
Long tss = getTimestamp(walFileName);
|
||||
Long oldTss = olderTimestamps.get(server);
|
||||
Long newTss = newestTimestamps.get(server);
|
||||
if (oldTss == null) {
|
||||
logFiles.add(item);
|
||||
continue;
|
||||
}
|
||||
if (newTss == null) {
|
||||
newTss = Long.MAX_VALUE;
|
||||
}
|
||||
if (tss > oldTss && tss < newTss) {
|
||||
logFiles.add(item);
|
||||
}
|
||||
}
|
||||
return logFiles;
|
||||
}
|
||||
|
||||
private Long getTimestamp(String walFileName) {
|
||||
int index = walFileName.lastIndexOf(BackupUtils.LOGNAME_SEPARATOR);
|
||||
return Long.parseLong(walFileName.substring(index + 1));
|
||||
}
|
||||
|
||||
/**
|
||||
* For each region server: get all log files newer than the last timestamps but not newer than the
|
||||
* newest timestamps.
|
||||
|
@ -126,8 +238,8 @@ public class IncrementalBackupManager extends BackupManager {
|
|||
* @return a list of log files to be backed up
|
||||
* @throws IOException exception
|
||||
*/
|
||||
private List<String> getLogFilesForNewBackup(Map<String, Long> olderTimestamps,
|
||||
Map<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
|
||||
private List<String> getLogFilesForNewBackup(HashMap<String, Long> olderTimestamps,
|
||||
HashMap<String, Long> newestTimestamps, Configuration conf, String savedStartCode)
|
||||
throws IOException {
|
||||
LOG.debug("In getLogFilesForNewBackup()\n" + "olderTimestamps: " + olderTimestamps
|
||||
+ "\n newestTimestamps: " + newestTimestamps);
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -287,6 +288,8 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
|||
convertWALsToHFiles();
|
||||
incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()},
|
||||
backupInfo.getBackupRootDir());
|
||||
// Save list of WAL files copied
|
||||
backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
|
||||
} catch (Exception e) {
|
||||
String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId;
|
||||
// fail the overall backup and return
|
||||
|
@ -298,7 +301,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
|||
// After this checkpoint, even if entering cancel process, will let the backup finished
|
||||
try {
|
||||
// Set the previousTimestampMap which is before this current log roll to the manifest.
|
||||
Map<TableName, Map<String, Long>> previousTimestampMap =
|
||||
HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
|
||||
backupManager.readLogTimestampMap();
|
||||
backupInfo.setIncrTimestampMap(previousTimestampMap);
|
||||
|
||||
|
@ -306,10 +309,9 @@ public class IncrementalTableBackupClient extends TableBackupClient {
|
|||
// For incremental backup, it contains the incremental backup table set.
|
||||
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
|
||||
|
||||
Map<TableName, Map<String, Long>> newTableSetTimestampMap =
|
||||
HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
|
||||
backupManager.readLogTimestampMap();
|
||||
|
||||
backupInfo.setTableSetTimestampMap(newTableSetTimestampMap);
|
||||
Long newStartCode =
|
||||
BackupUtils.getMinValue(BackupUtils.getRSLogTimestampMins(newTableSetTimestampMap));
|
||||
backupManager.writeBackupStartCode(newStartCode);
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -62,7 +61,7 @@ public abstract class TableBackupClient {
|
|||
protected Connection conn;
|
||||
protected String backupId;
|
||||
protected List<TableName> tableList;
|
||||
protected Map<String, Long> newTimestamps = null;
|
||||
protected HashMap<String, Long> newTimestamps = null;
|
||||
|
||||
protected BackupManager backupManager;
|
||||
protected BackupInfo backupInfo;
|
||||
|
@ -295,7 +294,7 @@ public abstract class TableBackupClient {
|
|||
|
||||
if (type == BackupType.INCREMENTAL) {
|
||||
// We'll store the log timestamps for this table only in its manifest.
|
||||
Map<TableName, Map<String, Long>> tableTimestampMap = new HashMap<>();
|
||||
HashMap<TableName, HashMap<String, Long>> tableTimestampMap = new HashMap<>();
|
||||
tableTimestampMap.put(table, backupInfo.getIncrTimestampMap().get(table));
|
||||
manifest.setIncrTimestampMap(tableTimestampMap);
|
||||
ArrayList<BackupImage> ancestorss = backupManager.getAncestors(backupInfo);
|
||||
|
|
|
@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.backup.master;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.BackupInfo;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupManager;
|
||||
import org.apache.hadoop.hbase.backup.util.BackupUtils;
|
||||
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
|
||||
import org.apache.hadoop.hbase.net.Address;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
|
||||
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
|
||||
|
||||
/**
|
||||
|
@ -79,77 +76,46 @@ public class BackupLogCleaner extends BaseLogCleanerDelegate {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private Map<Address, Long> getServersToOldestBackupMapping(List<BackupInfo> backups)
|
||||
throws IOException {
|
||||
Map<Address, Long> serverAddressToLastBackupMap = new HashMap<>();
|
||||
|
||||
Map<TableName, Long> tableNameBackupInfoMap = new HashMap<>();
|
||||
for (BackupInfo backupInfo : backups) {
|
||||
for (TableName table : backupInfo.getTables()) {
|
||||
tableNameBackupInfoMap.putIfAbsent(table, backupInfo.getStartTs());
|
||||
if (tableNameBackupInfoMap.get(table) <= backupInfo.getStartTs()) {
|
||||
tableNameBackupInfoMap.put(table, backupInfo.getStartTs());
|
||||
for (Map.Entry<String, Long> entry : backupInfo.getTableSetTimestampMap().get(table)
|
||||
.entrySet()) {
|
||||
serverAddressToLastBackupMap.put(Address.fromString(entry.getKey()), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return serverAddressToLastBackupMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
|
||||
List<FileStatus> filteredFiles = new ArrayList<>();
|
||||
|
||||
// all members of this class are null if backup is disabled,
|
||||
// so we cannot filter the files
|
||||
if (this.getConf() == null || !BackupManager.isBackupEnabled(getConf())) {
|
||||
LOG.debug("Backup is not enabled. Check your {} setting",
|
||||
BackupRestoreConstants.BACKUP_ENABLE_KEY);
|
||||
BackupRestoreConstants.BACKUP_ENABLE_KEY);
|
||||
return files;
|
||||
}
|
||||
|
||||
Map<Address, Long> addressToLastBackupMap;
|
||||
try {
|
||||
try (BackupManager backupManager = new BackupManager(conn, getConf())) {
|
||||
addressToLastBackupMap =
|
||||
getServersToOldestBackupMapping(backupManager.getBackupHistory(true));
|
||||
try (final BackupSystemTable table = new BackupSystemTable(conn)) {
|
||||
// If we do not have recorded backup sessions
|
||||
try {
|
||||
if (!table.hasBackupSessions()) {
|
||||
LOG.trace("BackupLogCleaner has no backup sessions");
|
||||
return files;
|
||||
}
|
||||
} catch (TableNotFoundException tnfe) {
|
||||
LOG.warn("Backup system table is not available: {}", tnfe.getMessage());
|
||||
return files;
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Failed to analyse backup history with exception: {}. Retaining all logs",
|
||||
ex.getMessage(), ex);
|
||||
List<FileStatus> list = new ArrayList<>();
|
||||
Map<FileStatus, Boolean> walFilesDeletableMap = table.areWALFilesDeletable(files);
|
||||
for (Map.Entry<FileStatus, Boolean> entry: walFilesDeletableMap.entrySet()) {
|
||||
FileStatus file = entry.getKey();
|
||||
String wal = file.getPath().toString();
|
||||
boolean deletable = entry.getValue();
|
||||
if (deletable) {
|
||||
LOG.debug("Found log file in backup system table, deleting: {}", wal);
|
||||
list.add(file);
|
||||
} else {
|
||||
LOG.debug("Did not find this log in backup system table, keeping: {}", wal);
|
||||
}
|
||||
}
|
||||
return list;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to get backup system table table, therefore will keep all files", e);
|
||||
// nothing to delete
|
||||
return Collections.emptyList();
|
||||
}
|
||||
for (FileStatus file : files) {
|
||||
String fn = file.getPath().getName();
|
||||
if (fn.startsWith(WALProcedureStore.LOG_PREFIX)) {
|
||||
filteredFiles.add(file);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
Address walServerAddress =
|
||||
Address.fromString(BackupUtils.parseHostNameFromLogFile(file.getPath()));
|
||||
long walTimestamp = AbstractFSWALProvider.getTimestamp(file.getPath().getName());
|
||||
|
||||
if (!addressToLastBackupMap.containsKey(walServerAddress)
|
||||
|| addressToLastBackupMap.get(walServerAddress) >= walTimestamp) {
|
||||
filteredFiles.add(file);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
LOG.warn(
|
||||
"Error occurred while filtering file: {} with error: {}. Ignoring cleanup of this log",
|
||||
file.getPath(), ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
LOG
|
||||
.info("Total files: {}, Filtered Files: {}", IterableUtils.size(files), filteredFiles.size());
|
||||
return filteredFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Comparator;
|
|||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
@ -83,8 +82,8 @@ public final class BackupUtils {
|
|||
* @param rsLogTimestampMap timestamp map
|
||||
* @return the min timestamp of each RS
|
||||
*/
|
||||
public static Map<String, Long> getRSLogTimestampMins(
|
||||
Map<TableName, Map<String, Long>> rsLogTimestampMap) {
|
||||
public static HashMap<String, Long> getRSLogTimestampMins(
|
||||
HashMap<TableName, HashMap<String, Long>> rsLogTimestampMap) {
|
||||
if (rsLogTimestampMap == null || rsLogTimestampMap.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
@ -92,14 +91,18 @@ public final class BackupUtils {
|
|||
HashMap<String, Long> rsLogTimestampMins = new HashMap<>();
|
||||
HashMap<String, HashMap<TableName, Long>> rsLogTimestampMapByRS = new HashMap<>();
|
||||
|
||||
for (Entry<TableName, Map<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
|
||||
for (Entry<TableName, HashMap<String, Long>> tableEntry : rsLogTimestampMap.entrySet()) {
|
||||
TableName table = tableEntry.getKey();
|
||||
Map<String, Long> rsLogTimestamp = tableEntry.getValue();
|
||||
HashMap<String, Long> rsLogTimestamp = tableEntry.getValue();
|
||||
for (Entry<String, Long> rsEntry : rsLogTimestamp.entrySet()) {
|
||||
String rs = rsEntry.getKey();
|
||||
Long ts = rsEntry.getValue();
|
||||
rsLogTimestampMapByRS.putIfAbsent(rs, new HashMap<>());
|
||||
rsLogTimestampMapByRS.get(rs).put(table, ts);
|
||||
if (!rsLogTimestampMapByRS.containsKey(rs)) {
|
||||
rsLogTimestampMapByRS.put(rs, new HashMap<>());
|
||||
rsLogTimestampMapByRS.get(rs).put(table, ts);
|
||||
} else {
|
||||
rsLogTimestampMapByRS.get(rs).put(table, ts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -346,7 +349,7 @@ public final class BackupUtils {
|
|||
* @param map map
|
||||
* @return the min value
|
||||
*/
|
||||
public static <T> Long getMinValue(Map<T, Long> map) {
|
||||
public static <T> Long getMinValue(HashMap<T, Long> map) {
|
||||
Long minTimestamp = null;
|
||||
if (map != null) {
|
||||
ArrayList<Long> timestampList = new ArrayList<>(map.values());
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -94,6 +92,7 @@ public class TestBackupBase {
|
|||
protected static TableName table1_restore = TableName.valueOf("default:table1");
|
||||
protected static TableName table2_restore = TableName.valueOf("ns2:table2");
|
||||
protected static TableName table3_restore = TableName.valueOf("ns3:table3_restore");
|
||||
protected static TableName table4_restore = TableName.valueOf("ns4:table4_restore");
|
||||
|
||||
protected static final int NB_ROWS_IN_BATCH = 99;
|
||||
protected static final byte[] qualName = Bytes.toBytes("q1");
|
||||
|
@ -136,12 +135,14 @@ public class TestBackupBase {
|
|||
incrementalCopyHFiles(new String[] {getBulkOutputDir().toString()},
|
||||
backupInfo.getBackupRootDir());
|
||||
failStageIf(Stage.stage_2);
|
||||
// Save list of WAL files copied
|
||||
backupManager.recordWALFiles(backupInfo.getIncrBackupFileList());
|
||||
|
||||
// case INCR_BACKUP_COMPLETE:
|
||||
// set overall backup status: complete. Here we make sure to complete the backup.
|
||||
// After this checkpoint, even if entering cancel process, will let the backup finished
|
||||
// Set the previousTimestampMap which is before this current log roll to the manifest.
|
||||
Map<TableName, Map<String, Long>> previousTimestampMap =
|
||||
HashMap<TableName, HashMap<String, Long>> previousTimestampMap =
|
||||
backupManager.readLogTimestampMap();
|
||||
backupInfo.setIncrTimestampMap(previousTimestampMap);
|
||||
|
||||
|
@ -150,7 +151,7 @@ public class TestBackupBase {
|
|||
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
|
||||
failStageIf(Stage.stage_3);
|
||||
|
||||
Map<TableName, Map<String, Long>> newTableSetTimestampMap =
|
||||
HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
|
||||
backupManager.readLogTimestampMap();
|
||||
|
||||
Long newStartCode =
|
||||
|
@ -211,6 +212,14 @@ public class TestBackupBase {
|
|||
LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props);
|
||||
failStageIf(Stage.stage_2);
|
||||
newTimestamps = backupManager.readRegionServerLastLogRollResult();
|
||||
if (firstBackup) {
|
||||
// Updates registered log files
|
||||
// We record ALL old WAL files as registered, because
|
||||
// this is a first full backup in the system and these
|
||||
// files are not needed for next incremental backup
|
||||
List<String> logFiles = BackupUtils.getWALFilesOlderThan(conf, newTimestamps);
|
||||
backupManager.recordWALFiles(logFiles);
|
||||
}
|
||||
|
||||
// SNAPSHOT_TABLES:
|
||||
backupInfo.setPhase(BackupPhase.SNAPSHOT);
|
||||
|
@ -238,7 +247,7 @@ public class TestBackupBase {
|
|||
// For incremental backup, it contains the incremental backup table set.
|
||||
backupManager.writeRegionServerLogTimestamp(backupInfo.getTables(), newTimestamps);
|
||||
|
||||
Map<TableName, Map<String, Long>> newTableSetTimestampMap =
|
||||
HashMap<TableName, HashMap<String, Long>> newTableSetTimestampMap =
|
||||
backupManager.readLogTimestampMap();
|
||||
|
||||
Long newStartCode =
|
||||
|
@ -421,10 +430,15 @@ public class TestBackupBase {
|
|||
Admin ha = TEST_UTIL.getAdmin();
|
||||
|
||||
// Create namespaces
|
||||
ha.createNamespace(NamespaceDescriptor.create("ns1").build());
|
||||
ha.createNamespace(NamespaceDescriptor.create("ns2").build());
|
||||
ha.createNamespace(NamespaceDescriptor.create("ns3").build());
|
||||
ha.createNamespace(NamespaceDescriptor.create("ns4").build());
|
||||
NamespaceDescriptor desc1 = NamespaceDescriptor.create("ns1").build();
|
||||
NamespaceDescriptor desc2 = NamespaceDescriptor.create("ns2").build();
|
||||
NamespaceDescriptor desc3 = NamespaceDescriptor.create("ns3").build();
|
||||
NamespaceDescriptor desc4 = NamespaceDescriptor.create("ns4").build();
|
||||
|
||||
ha.createNamespace(desc1);
|
||||
ha.createNamespace(desc2);
|
||||
ha.createNamespace(desc3);
|
||||
ha.createNamespace(desc4);
|
||||
|
||||
TableDescriptor desc = TableDescriptorBuilder.newBuilder(table1)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(famName)).build();
|
||||
|
@ -493,21 +507,6 @@ public class TestBackupBase {
|
|||
return ret;
|
||||
}
|
||||
|
||||
protected List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
|
||||
Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
|
||||
FileSystem fs = logRoot.getFileSystem(c);
|
||||
RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
|
||||
List<FileStatus> logFiles = new ArrayList<FileStatus>();
|
||||
while (it.hasNext()) {
|
||||
LocatedFileStatus lfs = it.next();
|
||||
if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
|
||||
logFiles.add(lfs);
|
||||
LOG.info(Objects.toString(lfs));
|
||||
}
|
||||
}
|
||||
return logFiles;
|
||||
}
|
||||
|
||||
protected void dumpBackupDir() throws IOException {
|
||||
// Dump Backup Dir
|
||||
FileSystem fs = FileSystem.get(conf1);
|
||||
|
|
|
@ -18,9 +18,11 @@
|
|||
package org.apache.hadoop.hbase.backup;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -33,6 +35,8 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
|
||||
|
@ -266,12 +270,12 @@ public class TestBackupSystemTable {
|
|||
|
||||
table.writeRegionServerLogTimestamp(tables, rsTimestampMap, "root");
|
||||
|
||||
Map<TableName, Map<String, Long>> result = table.readLogTimestampMap("root");
|
||||
HashMap<TableName, HashMap<String, Long>> result = table.readLogTimestampMap("root");
|
||||
|
||||
assertTrue(tables.size() == result.size());
|
||||
|
||||
for (TableName t : tables) {
|
||||
Map<String, Long> rstm = result.get(t);
|
||||
HashMap<String, Long> rstm = result.get(t);
|
||||
assertNotNull(rstm);
|
||||
assertEquals(rstm.get("rs1:100"), new Long(100L));
|
||||
assertEquals(rstm.get("rs2:100"), new Long(101L));
|
||||
|
@ -297,7 +301,7 @@ public class TestBackupSystemTable {
|
|||
assertTrue(5 == result.size());
|
||||
|
||||
for (TableName t : tables) {
|
||||
Map<String, Long> rstm = result.get(t);
|
||||
HashMap<String, Long> rstm = result.get(t);
|
||||
assertNotNull(rstm);
|
||||
if (t.equals(TableName.valueOf("t3")) == false) {
|
||||
assertEquals(rstm.get("rs1:100"), new Long(100L));
|
||||
|
@ -311,7 +315,7 @@ public class TestBackupSystemTable {
|
|||
}
|
||||
|
||||
for (TableName t : tables1) {
|
||||
Map<String, Long> rstm = result.get(t);
|
||||
HashMap<String, Long> rstm = result.get(t);
|
||||
assertNotNull(rstm);
|
||||
assertEquals(rstm.get("rs1:100"), new Long(200L));
|
||||
assertEquals(rstm.get("rs2:100"), new Long(201L));
|
||||
|
@ -322,6 +326,43 @@ public class TestBackupSystemTable {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddWALFiles() throws IOException {
|
||||
List<String> files =
|
||||
Arrays.asList("hdfs://server/WALs/srv1,101,15555/srv1,101,15555.default.1",
|
||||
"hdfs://server/WALs/srv2,102,16666/srv2,102,16666.default.2",
|
||||
"hdfs://server/WALs/srv3,103,17777/srv3,103,17777.default.3");
|
||||
String newFile = "hdfs://server/WALs/srv1,101,15555/srv1,101,15555.default.5";
|
||||
|
||||
table.addWALFiles(files, "backup", "root");
|
||||
|
||||
assertTrue(table.isWALFileDeletable(files.get(0)));
|
||||
assertTrue(table.isWALFileDeletable(files.get(1)));
|
||||
assertTrue(table.isWALFileDeletable(files.get(2)));
|
||||
assertFalse(table.isWALFileDeletable(newFile));
|
||||
|
||||
// test for isWALFilesDeletable
|
||||
List<FileStatus> fileStatues = new ArrayList<>();
|
||||
for (String file : files) {
|
||||
FileStatus fileStatus = new FileStatus();
|
||||
fileStatus.setPath(new Path(file));
|
||||
fileStatues.add(fileStatus);
|
||||
}
|
||||
|
||||
FileStatus newFileStatus = new FileStatus();
|
||||
newFileStatus.setPath(new Path(newFile));
|
||||
fileStatues.add(newFileStatus);
|
||||
|
||||
Map<FileStatus, Boolean> walFilesDeletableMap = table.areWALFilesDeletable(fileStatues);
|
||||
|
||||
assertTrue(walFilesDeletableMap.get(fileStatues.get(0)));
|
||||
assertTrue(walFilesDeletableMap.get(fileStatues.get(1)));
|
||||
assertTrue(walFilesDeletableMap.get(fileStatues.get(2)));
|
||||
assertFalse(walFilesDeletableMap.get(newFileStatus));
|
||||
|
||||
cleanBackupTable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Backup set tests
|
||||
*/
|
||||
|
|
|
@ -19,11 +19,19 @@ package org.apache.hadoop.hbase.backup.master;
|
|||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.util.HashMap;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.backup.BackupType;
|
||||
import org.apache.hadoop.hbase.backup.TestBackupBase;
|
||||
|
@ -32,14 +40,16 @@ import org.apache.hadoop.hbase.client.Connection;
|
|||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
|
@ -54,7 +64,6 @@ public class TestBackupLogCleaner extends TestBackupBase {
|
|||
|
||||
// implements all test cases in 1 test since incremental full backup/
|
||||
// incremental backup has dependencies
|
||||
|
||||
@Test
|
||||
public void testBackupLogCleaner() throws Exception {
|
||||
|
||||
|
@ -68,11 +77,10 @@ public class TestBackupLogCleaner extends TestBackupBase {
|
|||
assertFalse(systemTable.hasBackupSessions());
|
||||
|
||||
List<FileStatus> walFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
|
||||
List<String> swalFiles = convert(walFiles);
|
||||
BackupLogCleaner cleaner = new BackupLogCleaner();
|
||||
cleaner.setConf(TEST_UTIL.getConfiguration());
|
||||
Map<String, Object> params = new HashMap<>();
|
||||
params.put(HMaster.MASTER, TEST_UTIL.getHBaseCluster().getMaster());
|
||||
cleaner.init(params);
|
||||
cleaner.init(null);
|
||||
cleaner.setConf(TEST_UTIL.getConfiguration());
|
||||
|
||||
Iterable<FileStatus> deletable = cleaner.getDeletableFiles(walFiles);
|
||||
|
@ -81,6 +89,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
|
|||
// We can delete all files because we do not have yet recorded backup sessions
|
||||
assertTrue(size == walFiles.size());
|
||||
|
||||
systemTable.addWALFiles(swalFiles, "backup", "root");
|
||||
String backupIdFull = fullTableBackup(tableSetFullList);
|
||||
assertTrue(checkSucceeded(backupIdFull));
|
||||
// Check one more time
|
||||
|
@ -91,6 +100,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
|
|||
|
||||
List<FileStatus> newWalFiles = getListOfWALFiles(TEST_UTIL.getConfiguration());
|
||||
LOG.debug("WAL list after full backup");
|
||||
convert(newWalFiles);
|
||||
|
||||
// New list of wal files is greater than the previous one,
|
||||
// because new wal per RS have been opened after full backup
|
||||
|
@ -130,4 +140,29 @@ public class TestBackupLogCleaner extends TestBackupBase {
|
|||
conn.close();
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> convert(List<FileStatus> walFiles) {
|
||||
List<String> result = new ArrayList<String>();
|
||||
for (FileStatus fs : walFiles) {
|
||||
LOG.debug("+++WAL: " + fs.getPath().toString());
|
||||
result.add(fs.getPath().toString());
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private List<FileStatus> getListOfWALFiles(Configuration c) throws IOException {
|
||||
Path logRoot = new Path(CommonFSUtils.getWALRootDir(c), HConstants.HREGION_LOGDIR_NAME);
|
||||
FileSystem fs = logRoot.getFileSystem(c);
|
||||
RemoteIterator<LocatedFileStatus> it = fs.listFiles(logRoot, true);
|
||||
List<FileStatus> logFiles = new ArrayList<FileStatus>();
|
||||
while (it.hasNext()) {
|
||||
LocatedFileStatus lfs = it.next();
|
||||
if (lfs.isFile() && !AbstractFSWALProvider.isMetaFile(lfs.getPath())) {
|
||||
logFiles.add(lfs);
|
||||
LOG.info(Objects.toString(lfs));
|
||||
}
|
||||
}
|
||||
return logFiles;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -91,11 +91,7 @@ message BackupInfo {
|
|||
optional uint32 progress = 10;
|
||||
optional uint32 workers_number = 11;
|
||||
optional uint64 bandwidth = 12;
|
||||
map<string, RSTimestampMap> table_set_timestamp = 13;
|
||||
|
||||
message RSTimestampMap {
|
||||
map<string, uint64> rs_timestamp = 1;
|
||||
}
|
||||
/**
|
||||
* Backup session states
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue