HBASE-23739 BoundedRecoveredHFilesOutputSink should read the table descriptor directly (#1223)

Signed-off-by: Pankaj <pankajkumar@apache.org>
This commit is contained in:
Guanghao Zhang 2020-03-06 19:06:59 +08:00 committed by GitHub
parent 4cb60327be
commit df62dde394
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 103 additions and 68 deletions

View File

@ -179,7 +179,7 @@ public class SplitLogWorker implements Runnable {
server.getCoordinatedStateManager() == null ? null
: server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, p,
sequenceIdChecker, splitLogWorkerCoordination, factory)) {
sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
@ -28,25 +29,20 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.CellSet;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@ -61,10 +57,6 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
private final WALSplitter walSplitter;
private final Map<TableName, TableDescriptor> tableDescCache;
private Connection connection;
private Admin admin;
private FileSystem rootFS;
// Since the splitting process may create multiple output files, we need a map
// to track the output count of each region.
@ -72,19 +64,13 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
// Need a counter to track the opening writers.
private final AtomicInteger openingWritersNum = new AtomicInteger(0);
private final ConcurrentMap<TableName, TableDescriptor> tableDescCache;
public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter,
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
super(controller, entryBuffers, numWriters);
this.walSplitter = walSplitter;
tableDescCache = new HashMap<>();
}
@Override
void startWriterThreads() throws IOException {
connection = ConnectionFactory.createConnection(walSplitter.conf);
admin = connection.getAdmin();
rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf);
super.startWriterThreads();
this.tableDescCache = new ConcurrentHashMap<>();
}
@Override
@ -137,8 +123,6 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
} finally {
isSuccessful &= writeRemainingEntryBuffers();
}
IOUtils.closeQuietly(admin);
IOUtils.closeQuietly(connection);
return isSuccessful ? splits : null;
}
@ -199,10 +183,10 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
long seqId, String familyName, boolean isMetaTable) throws IOException {
Path outputFile = WALSplitUtil
.getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, rootFS);
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, walSplitter.rootFS);
checkPathValid(outputFile);
StoreFileWriter.Builder writerBuilder =
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, rootFS)
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
.withFilePath(outputFile);
HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
if (isMetaTable) {
@ -216,10 +200,11 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
private void configContextForNonMetaWriter(TableName tableName, String familyName,
HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder)
throws IOException {
if (!tableDescCache.containsKey(tableName)) {
tableDescCache.put(tableName, admin.getDescriptor(tableName));
TableDescriptor tableDesc =
tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t));
if (tableDesc == null) {
throw new IOException("Failed to get table descriptor for table " + tableName);
}
TableDescriptor tableDesc = tableDescCache.get(tableName);
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize())
.withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding())
@ -228,11 +213,27 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
}
private void checkPathValid(Path outputFile) throws IOException {
if (rootFS.exists(outputFile)) {
if (walSplitter.rootFS.exists(outputFile)) {
LOG.warn("this file {} may be left after last failed split ", outputFile);
if (!rootFS.delete(outputFile, false)) {
if (!walSplitter.rootFS.delete(outputFile, false)) {
LOG.warn("delete old generated HFile {} failed", outputFile);
}
}
}
private TableDescriptor getTableDescriptor(TableName tableName) {
if (walSplitter.rsServices != null) {
try {
return walSplitter.rsServices.getConnection().getAdmin().getDescriptor(tableName);
} catch (IOException e) {
LOG.warn("Failed to get table descriptor for table {}", tableName, e);
}
}
try {
return walSplitter.tableDescriptors.get(tableName);
} catch (IOException e) {
LOG.warn("Failed to get table descriptor for table {}", tableName, e);
return null;
}
}
}

View File

@ -40,16 +40,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
@ -82,6 +85,10 @@ public class WALSplitter {
protected final Path walDir;
protected final FileSystem walFS;
protected final Configuration conf;
final Path rootDir;
final FileSystem rootFS;
final RegionServerServices rsServices;
final TableDescriptors tableDescriptors;
// Major subcomponents of the split process.
// These are separated into inner classes to make testing easier.
@ -114,21 +121,31 @@ public class WALSplitter {
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) {
Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName =
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir;
this.walFS = walFS;
this.rootDir = rootDir;
this.rootFS = rootFS;
this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.rsServices = rsServices;
if (rsServices != null) {
this.tableDescriptors = rsServices.getTableDescriptors();
} else {
this.tableDescriptors = new FSTableDescriptors(rootFS, rootDir, true, true);
}
this.walFactory = factory;
PipelineController controller = new PipelineController();
this.tmpDirName =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
// if we limit the number of writers opened for sinking recovered edits
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
@ -175,10 +192,12 @@ public class WALSplitter {
*/
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
throws IOException {
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker,
splitLogWorkerCoordination);
SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
RegionServerServices rsServices) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
splitLogWorkerCoordination, rsServices);
return s.splitLogFile(logfile, reporter);
}
@ -187,16 +206,19 @@ public class WALSplitter {
// It is public only because TestWALObserver is in a different package,
// which uses this method to do log splitting.
@VisibleForTesting
public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
Collections.singletonList(logDir), null);
public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
Configuration conf, final WALFactory factory) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
FileSystem rootFS = rootDir.getFileSystem(conf);
final FileStatus[] logfiles =
SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null);
List<Path> splits = new ArrayList<>();
if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile: logfiles) {
WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null);
for (FileStatus logfile : logfiles) {
WALSplitter s =
new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
splits.addAll(s.outputSink.splits);
}

View File

@ -907,8 +907,8 @@ public abstract class AbstractTestWALReplay {
FileStatus[] listStatus = wal.getFiles();
assertNotNull(listStatus);
assertTrue(listStatus.length > 0);
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null, wals);
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
wals, null);
FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName),
new Path(hri.getEncodedName(), "recovered.edits")),
new PathFilter() {
@ -1058,8 +1058,8 @@ public abstract class AbstractTestWALReplay {
first = fs.getFileStatus(smallFile);
second = fs.getFileStatus(largeFile);
}
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals);
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals);
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
WAL wal = createWAL(this.conf, hbaseRootDir, logName);
region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());

View File

@ -173,7 +173,7 @@ public class TestWALReaderOnSecureWAL {
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = FSUtils.getRootDir(conf);
try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null);
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");
@ -217,7 +217,7 @@ public class TestWALReaderOnSecureWAL {
FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = FSUtils.getRootDir(conf);
try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null);
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt");

View File

@ -812,15 +812,16 @@ public class TestWALSplit {
}
assertTrue("There should be some log greater than size 0.", 0 < largestSize);
// Set up a splitter that will throw an IOE on the output side
WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
@Override
protected Writer createWriter(Path logfile) throws IOException {
Writer mockWriter = Mockito.mock(Writer.class);
Mockito.doThrow(new IOException("Injected")).when(
mockWriter).append(Mockito.<Entry>any());
return mockWriter;
}
};
WALSplitter logSplitter =
new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
@Override
protected Writer createWriter(Path logfile) throws IOException {
Writer mockWriter = Mockito.mock(Writer.class);
Mockito.doThrow(new IOException("Injected")).when(mockWriter)
.append(Mockito.<Entry> any());
return mockWriter;
}
};
// Set up a background thread dumper. Needs a thread to depend on and then we need to run
// the thread dumping in a background thread so it does not hold up the test.
final AtomicBoolean stop = new AtomicBoolean(false);
@ -941,8 +942,8 @@ public class TestWALSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile(
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals);
boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
null, wals, null);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@ -1000,7 +1001,8 @@ public class TestWALSplit {
makeRegionDirs(regions);
// Create a splitter that reads and writes the data without touching disk
WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) {
WALSplitter logSplitter =
new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
/* Produce a mock writer that doesn't write anywhere */
@Override
@ -1148,7 +1150,8 @@ public class TestWALSplit {
assertTrue("There should be some log file",
logfiles != null && logfiles.length > 0);
WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) {
WALSplitter logSplitter =
new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
@Override
protected Writer createWriter(Path logfile)
throws IOException {

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
@ -172,8 +173,9 @@ public class TestWALSplitToHFile {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final TableDescriptor td = createBasic3FamilyTD(tableName);
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(basedir);
final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(tableDir);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
@ -203,7 +205,12 @@ public class TestWALSplitToHFile {
// all edits in logs are seen as 'stale'/old.
region.close(true);
wal.shutdown();
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
try {
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
} catch (Exception e) {
LOG.debug("Got exception", e);
}
WAL wal2 = createWAL(this.conf, rootDir, logName);
HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
long seqid2 = region2.getOpenSeqNum();
@ -230,7 +237,7 @@ public class TestWALSplitToHFile {
FileSystem newFS = FileSystem.get(newConf);
// Make a new wal for new region open.
WAL wal3 = createWAL(newConf, rootDir, logName);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td, null);
HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
long seqid3 = region3.initialize();
Result result3 = region3.get(g);
// Assert that count of cells is same as before crash.
@ -259,11 +266,12 @@ public class TestWALSplitToHFile {
throws IOException, SecurityException, IllegalArgumentException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(basedir);
final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(tableDir);
final byte[] rowName = tableName.getName();
final int countPerFamily = 10;
final TableDescriptor td = createBasic3FamilyTD(tableName);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
HBaseTestingUtility.closeRegionAndWAL(region3);
// Write countPerFamily edits into the three families. Do a flush on one
@ -318,9 +326,10 @@ public class TestWALSplitToHFile {
public void testReplayEditsAfterAbortingFlush() throws IOException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(basedir);
final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(tableDir);
final TableDescriptor td = createBasic3FamilyTD(tableName);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
HBaseTestingUtility.closeRegionAndWAL(region3);
// Write countPerFamily edits into the three families. Do a flush on one