HBASE-27649 WALPlayer does not properly dedupe overridden cell versions (#5047)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
Bryan Beaudreault 2023-02-25 12:10:40 -05:00 committed by GitHub
parent bc31e68e85
commit 82c7dbd488
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 426 additions and 58 deletions

View File

@ -352,6 +352,11 @@ public class BackupManager implements Closeable {
public ArrayList<BackupImage> getAncestors(BackupInfo backupInfo, TableName table)
throws IOException {
ArrayList<BackupImage> ancestors = getAncestors(backupInfo);
return filterAncestorsForTable(ancestors, table);
}
public static ArrayList<BackupImage> filterAncestorsForTable(ArrayList<BackupImage> ancestors,
TableName table) {
ArrayList<BackupImage> tableAncestors = new ArrayList<>();
for (BackupImage image : ancestors) {
if (image.hasTable(table)) {

View File

@ -18,11 +18,13 @@
package org.apache.hadoop.hbase.backup.impl;
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY;
import static org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -40,17 +42,26 @@ import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/**
* Incremental backup implementation. See the {@link #execute() execute} method.
*/
@ -276,10 +287,48 @@ public class IncrementalTableBackupClient extends TableBackupClient {
// case INCREMENTAL_COPY:
try {
// todo: need to add an abstraction to encapsulate and DRY this up
ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo);
Map<TableName, List<RegionInfo>> regionsByTable = new HashMap<>();
List<ImmutableBytesWritable> splits = new ArrayList<>();
for (TableName table : backupInfo.getTables()) {
ArrayList<BackupImage> ancestorsForTable =
BackupManager.filterAncestorsForTable(ancestors, table);
BackupImage backupImage = ancestorsForTable.get(ancestorsForTable.size() - 1);
if (backupImage.getType() != BackupType.FULL) {
throw new RuntimeException("No full backup found in ancestors for table " + table);
}
String lastFullBackupId = backupImage.getBackupId();
Path backupRootDir = new Path(backupInfo.getBackupRootDir());
FileSystem backupFs = backupRootDir.getFileSystem(conf);
Path tableInfoPath =
BackupUtils.getTableInfoPath(backupFs, backupRootDir, lastFullBackupId, table);
SnapshotProtos.SnapshotDescription snapshotDesc =
SnapshotDescriptionUtils.readSnapshotInfo(backupFs, tableInfoPath);
SnapshotManifest manifest =
SnapshotManifest.open(conf, backupFs, tableInfoPath, snapshotDesc);
List<RegionInfo> regionInfos = new ArrayList<>(manifest.getRegionManifests().size());
for (SnapshotProtos.SnapshotRegionManifest regionManifest : manifest.getRegionManifests()) {
HBaseProtos.RegionInfo regionInfo = regionManifest.getRegionInfo();
RegionInfo regionInfoObj = ProtobufUtil.toRegionInfo(regionInfo);
// scanning meta doesnt return mob regions, so skip them here too so we keep parity
if (Bytes.equals(regionInfoObj.getStartKey(), MobConstants.MOB_REGION_NAME_BYTES)) {
continue;
}
regionInfos.add(regionInfoObj);
splits.add(new ImmutableBytesWritable(HFileOutputFormat2
.combineTableNameSuffix(table.getName(), regionInfoObj.getStartKey())));
}
regionsByTable.put(table, regionInfos);
}
// copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
BackupUtils.copyTableRegionInfo(conn, backupInfo, regionsByTable, conf);
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles();
convertWALsToHFiles(splits);
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
backupInfo.getBackupRootDir());
} catch (Exception e) {
@ -359,7 +408,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
}
protected void convertWALsToHFiles() throws IOException {
protected void convertWALsToHFiles(List<ImmutableBytesWritable> splits) throws IOException {
// get incremental backup file list and prepare parameters for DistCp
List<String> incrBackupFileList = backupInfo.getIncrBackupFileList();
// Get list of tables in incremental backup set
@ -375,7 +424,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
LOG.warn("Table " + table + " does not exists. Skipping in WAL converter");
}
}
walToHFiles(incrBackupFileList, tableList);
walToHFiles(incrBackupFileList, tableList, splits);
}
@ -385,8 +434,9 @@ public class IncrementalTableBackupClient extends TableBackupClient {
}
}
protected void walToHFiles(List<String> dirPaths, List<String> tableList) throws IOException {
Tool player = new WALPlayer();
protected void walToHFiles(List<String> dirPaths, List<String> tableList,
List<ImmutableBytesWritable> splits) throws IOException {
WALPlayer player = new WALPlayer();
// Player reads all files in arbitrary directory structure and creates
// a Map task for each file. We use ';' as separator
@ -401,6 +451,7 @@ public class IncrementalTableBackupClient extends TableBackupClient {
conf.set(JOB_NAME_CONF_KEY, jobname);
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
player.setSplits(splits);
try {
player.setConf(conf);
int result = player.run(playerArgs);

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupInfo;
@ -54,6 +53,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -122,7 +122,8 @@ public final class BackupUtils {
* @param conf configuration
* @throws IOException exception
*/
public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo, Configuration conf)
public static void copyTableRegionInfo(Connection conn, BackupInfo backupInfo,
Map<TableName, List<RegionInfo>> lastFullBackupForTable, Configuration conf)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
FileSystem fs = rootDir.getFileSystem(conf);
@ -147,20 +148,56 @@ public final class BackupUtils {
LOG.debug("Attempting to copy table info for:" + table + " target: " + target
+ " descriptor: " + orig);
LOG.debug("Finished copying tableinfo.");
List<RegionInfo> regions = MetaTableAccessor.getTableRegions(conn, table);
// For each region, write the region info to disk
LOG.debug("Starting to write region info for table " + table);
for (RegionInfo regionInfo : regions) {
Path regionDir = FSUtils
.getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
}
copyTableRegionInfosFromParent(table, targetFs, backupInfo,
lastFullBackupForTable.get(table), conf);
LOG.debug("Finished writing region info for table " + table);
}
}
}
private static void copyTableRegionInfosFromParent(TableName table, FileSystem targetFs,
BackupInfo backupInfo, List<RegionInfo> lastFullBackupForTable, Configuration conf)
throws IOException {
for (RegionInfo regionInfo : lastFullBackupForTable) {
Path regionDir =
FSUtils.getRegionDirFromTableDir(new Path(backupInfo.getTableBackupDir(table)), regionInfo);
regionDir = new Path(backupInfo.getTableBackupDir(table), regionDir.getName());
writeRegioninfoOnFilesystem(conf, targetFs, regionDir, regionInfo);
}
}
/**
* Returns value represent path for:
* ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/
* snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and
* 0.98) this path contains .snapshotinfo, .data.manifest (trunk)
* @param tableName table name
* @return path to table info
* @throws IOException exception
*/
public static Path getTableInfoPath(FileSystem fs, Path backupRootPath, String backupId,
TableName tableName) throws IOException {
Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
Path tableInfoPath = null;
// can't build the path directly as the timestamp values are different
FileStatus[] snapshots = fs.listStatus(tableSnapShotPath,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
for (FileStatus snapshot : snapshots) {
tableInfoPath = snapshot.getPath();
// SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
if (tableInfoPath.getName().endsWith("data.manifest")) {
break;
}
}
return tableInfoPath;
}
static Path getTableSnapshotPath(Path backupRootPath, TableName tableName, String backupId) {
return new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId),
HConstants.SNAPSHOT_DIR_NAME);
}
/**
* Write the .regioninfo file on-disk.
*/

View File

@ -145,13 +145,13 @@ public class RestoreTool {
* the future
* @param conn HBase connection
* @param tableBackupPath backup path
* @param logDirs : incremental backup folders, which contains WAL
* @param tableNames : source tableNames(table names were backuped)
* @param newTableNames : target tableNames(table names to be restored to)
* @param hfileDirs incremental backup folders, which contains hfiles to bulkload
* @param tableNames source tableNames(table names were backuped)
* @param newTableNames target tableNames(table names to be restored to)
* @param incrBackupId incremental backup Id
* @throws IOException exception
*/
public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] logDirs,
public void incrementalRestoreTable(Connection conn, Path tableBackupPath, Path[] hfileDirs,
TableName[] tableNames, TableName[] newTableNames, String incrBackupId) throws IOException {
try (Admin admin = conn.getAdmin()) {
if (tableNames.length != newTableNames.length) {
@ -202,7 +202,7 @@ public class RestoreTool {
}
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
restoreService.run(logDirs, tableNames, restoreRootDir, newTableNames, false);
restoreService.run(hfileDirs, tableNames, restoreRootDir, newTableNames, false);
}
}
@ -225,39 +225,14 @@ public class RestoreTool {
HConstants.SNAPSHOT_DIR_NAME);
}
/**
* Returns value represent path for:
* ""/$USER/SBACKUP_ROOT/backup_id/namespace/table/.hbase-snapshot/
* snapshot_1396650097621_namespace_table" this path contains .snapshotinfo, .tabledesc (0.96 and
* 0.98) this path contains .snapshotinfo, .data.manifest (trunk)
* @param tableName table name
* @return path to table info
* @throws IOException exception
*/
Path getTableInfoPath(TableName tableName) throws IOException {
Path tableSnapShotPath = getTableSnapshotPath(backupRootPath, tableName, backupId);
Path tableInfoPath = null;
// can't build the path directly as the timestamp values are different
FileStatus[] snapshots = fs.listStatus(tableSnapShotPath,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
for (FileStatus snapshot : snapshots) {
tableInfoPath = snapshot.getPath();
// SnapshotManifest.DATA_MANIFEST_NAME = "data.manifest";
if (tableInfoPath.getName().endsWith("data.manifest")) {
break;
}
}
return tableInfoPath;
}
/**
* Get table descriptor
* @param tableName is the table backed up
* @return {@link TableDescriptor} saved in backup image of the table
*/
TableDescriptor getTableDesc(TableName tableName) throws IOException {
Path tableInfoPath = this.getTableInfoPath(tableName);
Path tableInfoPath = BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName);
;
SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fs, tableInfoPath);
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, tableInfoPath, desc);
TableDescriptor tableDescriptor = manifest.getTableDescriptor();
@ -307,7 +282,8 @@ public class RestoreTool {
tableDescriptor = manifest.getTableDescriptor();
} else {
tableDescriptor = getTableDesc(tableName);
snapshotMap.put(tableName, getTableInfoPath(tableName));
snapshotMap.put(tableName,
BackupUtils.getTableInfoPath(fs, backupRootPath, backupId, tableName));
}
if (tableDescriptor == null) {
LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost");

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.backup.BackupInfo.BackupPhase;
import org.apache.hadoop.hbase.backup.BackupInfo.BackupState;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient;
import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager;
@ -52,14 +53,20 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveLogCleaner;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -71,6 +78,10 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
/**
* This class is only a base for other integration-level backup tests. Do not add tests here.
* TestBackupSmallTests is where tests that don't require bring machines up/down should go All other
@ -128,10 +139,50 @@ public class TestBackupBase {
LOG.debug("For incremental backup, current table set is "
+ backupManager.getIncrementalBackupTableSet());
newTimestamps = ((IncrementalBackupManager) backupManager).getIncrBackupLogFileMap();
// todo: need to add an abstraction to encapsulate and DRY this up`
ArrayList<BackupImage> ancestors = backupManager.getAncestors(backupInfo);
Map<TableName, List<RegionInfo>> regionsByTable = new HashMap<>();
List<ImmutableBytesWritable> splits = new ArrayList<>();
for (TableName table : backupInfo.getTables()) {
ArrayList<BackupImage> ancestorsForTable =
BackupManager.filterAncestorsForTable(ancestors, table);
BackupImage backupImage = ancestorsForTable.get(ancestorsForTable.size() - 1);
if (backupImage.getType() != BackupType.FULL) {
throw new RuntimeException("No full backup found in ancestors for table " + table);
}
String lastFullBackupId = backupImage.getBackupId();
Path backupRootDir = new Path(backupInfo.getBackupRootDir());
FileSystem backupFs = backupRootDir.getFileSystem(conf);
Path tableInfoPath =
BackupUtils.getTableInfoPath(backupFs, backupRootDir, lastFullBackupId, table);
SnapshotProtos.SnapshotDescription snapshotDesc =
SnapshotDescriptionUtils.readSnapshotInfo(backupFs, tableInfoPath);
SnapshotManifest manifest =
SnapshotManifest.open(conf, backupFs, tableInfoPath, snapshotDesc);
List<RegionInfo> regionInfos = new ArrayList<>(manifest.getRegionManifests().size());
for (SnapshotProtos.SnapshotRegionManifest regionManifest : manifest
.getRegionManifests()) {
HBaseProtos.RegionInfo regionInfo = regionManifest.getRegionInfo();
RegionInfo regionInfoObj = ProtobufUtil.toRegionInfo(regionInfo);
// scanning meta doesnt return mob regions, so skip them here too so we keep parity
if (Bytes.equals(regionInfoObj.getStartKey(), MobConstants.MOB_REGION_NAME_BYTES)) {
continue;
}
regionInfos.add(regionInfoObj);
splits.add(new ImmutableBytesWritable(HFileOutputFormat2
.combineTableNameSuffix(table.getName(), regionInfoObj.getStartKey())));
}
regionsByTable.put(table, regionInfos);
}
// copy out the table and region info files for each table
BackupUtils.copyTableRegionInfo(conn, backupInfo, conf);
BackupUtils.copyTableRegionInfo(conn, backupInfo, regionsByTable, conf);
// convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
convertWALsToHFiles();
convertWALsToHFiles(splits);
incrementalCopyHFiles(new String[] { getBulkOutputDir().toString() },
backupInfo.getBackupRootDir());
failStageIf(Stage.stage_2);

View File

@ -130,6 +130,8 @@ public class TestIncrementalBackup extends TestBackupBase {
byte[] name = regions.get(0).getRegionInfo().getRegionName();
long startSplitTime = EnvironmentEdgeManager.currentTime();
try {
// todo: this fails, and itd be nice if we could really add a split so we can prove
// that our new splits passthrough works (expect split to disappear once we restore)
admin.splitRegionAsync(name).get();
} catch (Exception e) {
// although split fail, this may not affect following check in current API,

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Similar to CellSerialization, but includes the sequenceId from an ExtendedCell. This is necessary
* so that CellSortReducer can sort by sequenceId, if applicable. Note that these two serializations
* are not compatible -- data serialized by CellSerialization cannot be deserialized with
* ExtendedCellSerialization and vice versa. This is ok for {@link HFileOutputFormat2} because the
* serialization is not actually used for the actual written HFiles, just intermediate data (between
* mapper and reducer of a single job).
*/
@InterfaceAudience.Private
public class ExtendedCellSerialization implements Serialization<ExtendedCell> {
@Override
public boolean accept(Class<?> c) {
return ExtendedCell.class.isAssignableFrom(c);
}
@Override
public ExtendedCellDeserializer getDeserializer(Class<ExtendedCell> t) {
return new ExtendedCellDeserializer();
}
@Override
public ExtendedCellSerializer getSerializer(Class<ExtendedCell> c) {
return new ExtendedCellSerializer();
}
public static class ExtendedCellDeserializer implements Deserializer<ExtendedCell> {
private DataInputStream dis;
@Override
public void close() throws IOException {
this.dis.close();
}
@Override
public KeyValue deserialize(ExtendedCell ignore) throws IOException {
KeyValue kv = KeyValueUtil.create(this.dis);
PrivateCellUtil.setSequenceId(kv, this.dis.readLong());
return kv;
}
@Override
public void open(InputStream is) throws IOException {
this.dis = new DataInputStream(is);
}
}
public static class ExtendedCellSerializer implements Serializer<ExtendedCell> {
private DataOutputStream dos;
@Override
public void close() throws IOException {
this.dos.close();
}
@Override
public void open(OutputStream os) throws IOException {
this.dos = new DataOutputStream(os);
}
@Override
public void serialize(ExtendedCell kv) throws IOException {
dos.writeInt(PrivateCellUtil.estimatedSerializedSizeOf(kv) - Bytes.SIZEOF_INT);
PrivateCellUtil.writeCell(kv, dos, true);
dos.writeLong(kv.getSequenceId());
}
}
}

View File

@ -30,6 +30,7 @@ import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -124,7 +125,7 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable,
protected static final byte[] tableSeparator = Bytes.toBytes(";");
protected static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
public static byte[] combineTableNameSuffix(byte[] tableName, byte[] suffix) {
return Bytes.add(tableName, tableSeparator, suffix);
}
@ -159,6 +160,15 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable,
static final String MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY =
"hbase.mapreduce.use.multi.table.hfileoutputformat";
/**
* ExtendedCell and ExtendedCellSerialization are InterfaceAudience.Private. We expose this config
* package-private for internal usage for jobs like WALPlayer which need to use features of
* ExtendedCell.
*/
static final String EXTENDED_CELL_SERIALIZATION_ENABLED_KEY =
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
public static final String REMOTE_CLUSTER_CONF_PREFIX = "hbase.hfileoutputformat.remote.cluster.";
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
@ -619,9 +629,7 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable,
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
}
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
CellSerialization.class.getName());
mergeSerializations(conf);
if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
LOG.info("bulkload locality sensitive enabled");
@ -670,6 +678,33 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable,
LOG.info("Incremental output configured for tables: " + StringUtils.join(allTableNames, ","));
}
private static void mergeSerializations(Configuration conf) {
List<String> serializations = new ArrayList<>();
// add any existing values that have been set
String[] existing = conf.getStrings("io.serializations");
if (existing != null) {
Collections.addAll(serializations, existing);
}
serializations.add(MutationSerialization.class.getName());
serializations.add(ResultSerialization.class.getName());
// Add ExtendedCellSerialization, if configured. Order matters here. Hadoop's
// SerializationFactory runs through serializations in the order they are registered.
// We want to register ExtendedCellSerialization before CellSerialization because both
// work for ExtendedCells but only ExtendedCellSerialization handles them properly.
if (
conf.getBoolean(EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT)
) {
serializations.add(ExtendedCellSerialization.class.getName());
}
serializations.add(CellSerialization.class.getName());
conf.setStrings("io.serializations", serializations.toArray(new String[0]));
}
public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDescriptor)
throws IOException {
Configuration conf = job.getConfiguration();
@ -846,9 +881,16 @@ public class HFileOutputFormat2 extends FileOutputFormat<ImmutableBytesWritable,
* Configure <code>job</code> with a TotalOrderPartitioner, partitioning against
* <code>splitPoints</code>. Cleans up the partitions file after job exists.
*/
static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
public static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints,
boolean writeMultipleTables) throws IOException {
Configuration conf = job.getConfiguration();
// todo: need to think if there's a better way
if (conf.get(job.getJobName() + ".wrotePartitions") != null) {
LOG.info("Already configured partitions, skipping... {}", splitPoints);
return;
}
LOG.info("Configuring partitions {}", splitPoints);
conf.set(job.getJobName() + ".wrotePartitions", "true");
// create the partitions file
FileSystem fs = FileSystem.get(conf);
String hbaseTmpFsDir =

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -79,6 +80,7 @@ public class WALPlayer extends Configured implements Tool {
protected static final String tableSeparator = ";";
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
private List<ImmutableBytesWritable> splits;
public WALPlayer() {
}
@ -87,6 +89,10 @@ public class WALPlayer extends Configured implements Tool {
super(c);
}
public void setSplits(List<ImmutableBytesWritable> splits) {
this.splits = splits;
}
/**
* A mapper that just writes out KeyValues. This one can be used together with
* {@link CellSortReducer}
@ -105,6 +111,13 @@ public class WALPlayer extends Configured implements Tool {
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}
// Set sequenceId from WALKey, since it is not included by WALCellCodec. The sequenceId
// on WALKey is the same value that was on the cells in the WALEdit. This enables
// CellSortReducer to use sequenceId to disambiguate duplicate cell timestamps.
// See HBASE-27649
PrivateCellUtil.setSequenceId(cell, key.getSequenceId());
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
@ -308,6 +321,15 @@ public class WALPlayer extends Configured implements Tool {
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
// WALPlayer needs ExtendedCellSerialization so that sequenceId can be propagated when
// sorting cells in CellSortReducer
job.getConfiguration().setBoolean(HFileOutputFormat2.EXTENDED_CELL_SERIALIZATION_ENABLED_KEY,
true);
if (splits != null) {
HFileOutputFormat2.configurePartitioner(job, splits, true);
}
// the bulk HFile case
List<TableName> tableNames = getTableNameList(tables);

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.mapreduce;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -29,6 +33,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -50,8 +55,10 @@ import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper;
import org.apache.hadoop.hbase.regionserver.TestRecoveredEdits;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LauncherSecurityManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.hbase.wal.WAL;
@ -131,6 +138,80 @@ public class TestWALPlayer {
assertTrue(TEST_UTIL.countRows(tn) > 0);
}
/**
* Tests that when you write multiple cells with the same timestamp they are properly sorted by
* their sequenceId in WALPlayer/CellSortReducer so that the correct one wins when querying from
* the resulting bulkloaded HFiles. See HBASE-27649
*/
@Test
public void testWALPlayerBulkLoadWithOverriddenTimestamps() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName() + "1");
final byte[] family = Bytes.toBytes("family");
final byte[] column1 = Bytes.toBytes("c1");
final byte[] column2 = Bytes.toBytes("c2");
final byte[] row = Bytes.toBytes("row");
Table table = TEST_UTIL.createTable(tableName, family);
long now = EnvironmentEdgeManager.currentTime();
// put a row into the first table
Put p = new Put(row);
p.addColumn(family, column1, now, column1);
p.addColumn(family, column2, now, column2);
table.put(p);
byte[] lastVal = null;
for (int i = 0; i < 50; i++) {
lastVal = Bytes.toBytes(ThreadLocalRandom.current().nextLong());
p = new Put(row);
p.addColumn(family, column1, now, lastVal);
table.put(p);
// wal rolling is necessary to trigger the bug. otherwise no sorting
// needs to occur in the reducer because it's all sorted and coming from a single file.
if (i % 10 == 0) {
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
}
}
WAL log = cluster.getRegionServer(0).getWAL(null);
log.rollWriter();
String walInputDir = new Path(cluster.getMaster().getMasterFileSystem().getWALRootDir(),
HConstants.HREGION_LOGDIR_NAME).toString();
Configuration configuration = new Configuration(TEST_UTIL.getConfiguration());
String outPath = "/tmp/" + name.getMethodName();
configuration.set(WALPlayer.BULK_OUTPUT_CONF_KEY, outPath);
configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
WALPlayer player = new WALPlayer(configuration);
assertEquals(0, ToolRunner.run(configuration, player,
new String[] { walInputDir, tableName.getNameAsString() }));
Get g = new Get(row);
Result result = table.get(g);
byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal)));
table = TEST_UTIL.truncateTable(tableName);
g = new Get(row);
result = table.get(g);
assertThat(result.listCells(), nullValue());
BulkLoadHFiles.create(configuration).bulkLoad(tableName,
new Path(outPath, tableName.getNamespaceAsString() + "/" + tableName.getNameAsString()));
g = new Get(row);
result = table.get(g);
value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
assertThat(result.listCells(), notNullValue());
assertThat(Bytes.toStringBinary(value), equalTo(Bytes.toStringBinary(lastVal)));
}
/**
* Simple end-to-end test
*/