HBASE-23739 BoundedRecoveredHFilesOutputSink should read the table descriptor directly (#1223)

Signed-off-by: Pankaj <pankajkumar@apache.org>
This commit is contained in:
Guanghao Zhang 2020-03-06 19:06:59 +08:00
parent 8e26761fd0
commit 6ebe966354
7 changed files with 103 additions and 68 deletions

View File

@ -103,7 +103,7 @@ public class SplitLogWorker implements Runnable {
server.getCoordinatedStateManager() == null ? null server.getCoordinatedStateManager() == null ? null
: server.getCoordinatedStateManager().getSplitLogWorkerCoordination(); : server.getCoordinatedStateManager().getSplitLogWorkerCoordination();
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf,
p, sequenceIdChecker, splitLogWorkerCoordination, factory)) { p, sequenceIdChecker, splitLogWorkerCoordination, factory, server)) {
return Status.PREEMPTED; return Status.PREEMPTED;
} }
} catch (InterruptedIOException iioe) { } catch (InterruptedIOException iioe) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.HashMap; import java.util.HashMap;
@ -28,25 +29,20 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.CellSet; import org.apache.hadoop.hbase.regionserver.CellSet;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -61,10 +57,6 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false; public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
private final WALSplitter walSplitter; private final WALSplitter walSplitter;
private final Map<TableName, TableDescriptor> tableDescCache;
private Connection connection;
private Admin admin;
private FileSystem rootFS;
// Since the splitting process may create multiple output files, we need a map // Since the splitting process may create multiple output files, we need a map
// to track the output count of each region. // to track the output count of each region.
@ -72,19 +64,13 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
// Need a counter to track the opening writers. // Need a counter to track the opening writers.
private final AtomicInteger openingWritersNum = new AtomicInteger(0); private final AtomicInteger openingWritersNum = new AtomicInteger(0);
private final ConcurrentMap<TableName, TableDescriptor> tableDescCache;
public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter, public BoundedRecoveredHFilesOutputSink(WALSplitter walSplitter,
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
super(controller, entryBuffers, numWriters); super(controller, entryBuffers, numWriters);
this.walSplitter = walSplitter; this.walSplitter = walSplitter;
tableDescCache = new HashMap<>(); this.tableDescCache = new ConcurrentHashMap<>();
}
@Override
public void startWriterThreads() throws IOException {
connection = ConnectionFactory.createConnection(walSplitter.conf);
admin = connection.getAdmin();
rootFS = FSUtils.getRootDirFileSystem(walSplitter.conf);
super.startWriterThreads();
} }
@Override @Override
@ -137,8 +123,6 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
} finally { } finally {
isSuccessful &= writeRemainingEntryBuffers(); isSuccessful &= writeRemainingEntryBuffers();
} }
IOUtils.closeQuietly(admin);
IOUtils.closeQuietly(connection);
return isSuccessful ? splits : null; return isSuccessful ? splits : null;
} }
@ -199,10 +183,10 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
long seqId, String familyName, boolean isMetaTable) throws IOException { long seqId, String familyName, boolean isMetaTable) throws IOException {
Path outputFile = WALSplitUtil Path outputFile = WALSplitUtil
.getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId, .getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, rootFS); walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, walSplitter.rootFS);
checkPathValid(outputFile); checkPathValid(outputFile);
StoreFileWriter.Builder writerBuilder = StoreFileWriter.Builder writerBuilder =
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, rootFS) new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
.withFilePath(outputFile); .withFilePath(outputFile);
HFileContextBuilder hFileContextBuilder = new HFileContextBuilder(); HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
if (isMetaTable) { if (isMetaTable) {
@ -216,10 +200,11 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
private void configContextForNonMetaWriter(TableName tableName, String familyName, private void configContextForNonMetaWriter(TableName tableName, String familyName,
HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder) HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder)
throws IOException { throws IOException {
if (!tableDescCache.containsKey(tableName)) { TableDescriptor tableDesc =
tableDescCache.put(tableName, admin.getDescriptor(tableName)); tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t));
if (tableDesc == null) {
throw new IOException("Failed to get table descriptor for table " + tableName);
} }
TableDescriptor tableDesc = tableDescCache.get(tableName);
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName)); ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize()) hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize())
.withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding()) .withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding())
@ -228,11 +213,27 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
} }
private void checkPathValid(Path outputFile) throws IOException { private void checkPathValid(Path outputFile) throws IOException {
if (rootFS.exists(outputFile)) { if (walSplitter.rootFS.exists(outputFile)) {
LOG.warn("this file {} may be left after last failed split ", outputFile); LOG.warn("this file {} may be left after last failed split ", outputFile);
if (!rootFS.delete(outputFile, false)) { if (!walSplitter.rootFS.delete(outputFile, false)) {
LOG.warn("delete old generated HFile {} failed", outputFile); LOG.warn("delete old generated HFile {} failed", outputFile);
} }
} }
} }
private TableDescriptor getTableDescriptor(TableName tableName) {
if (walSplitter.rsServices != null) {
try {
return walSplitter.rsServices.getConnection().getAdmin().getDescriptor(tableName);
} catch (IOException e) {
LOG.warn("Failed to get table descriptor for table {}", tableName, e);
}
}
try {
return walSplitter.tableDescriptors.get(tableName);
} catch (IOException e) {
LOG.warn("Failed to get table descriptor for table {}", tableName, e);
return null;
}
}
} }

View File

@ -40,16 +40,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.regionserver.LastSequenceId; import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Reader;
@ -81,6 +84,10 @@ public class WALSplitter {
protected final Path walDir; protected final Path walDir;
protected final FileSystem walFS; protected final FileSystem walFS;
protected final Configuration conf; protected final Configuration conf;
final Path rootDir;
final FileSystem rootFS;
final RegionServerServices rsServices;
final TableDescriptors tableDescriptors;
// Major subcomponents of the split process. // Major subcomponents of the split process.
// These are separated into inner classes to make testing easier. // These are separated into inner classes to make testing easier.
@ -113,21 +120,31 @@ public class WALSplitter {
@VisibleForTesting @VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { Path rootDir, FileSystem rootFS, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, RegionServerServices rsServices) {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
String codecClassName = String codecClassName =
conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
this.walDir = walDir; this.walDir = walDir;
this.walFS = walFS; this.walFS = walFS;
this.rootDir = rootDir;
this.rootFS = rootFS;
this.sequenceIdChecker = idChecker; this.sequenceIdChecker = idChecker;
this.splitLogWorkerCoordination = splitLogWorkerCoordination; this.splitLogWorkerCoordination = splitLogWorkerCoordination;
this.rsServices = rsServices;
if (rsServices != null) {
this.tableDescriptors = rsServices.getTableDescriptors();
} else {
this.tableDescriptors = new FSTableDescriptors(rootFS, rootDir, true, true);
}
this.walFactory = factory; this.walFactory = factory;
PipelineController controller = new PipelineController(); PipelineController controller = new PipelineController();
this.tmpDirName = this.tmpDirName =
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
// if we limit the number of writers opened for sinking recovered edits // if we limit the number of writers opened for sinking recovered edits
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE); boolean splitToHFile = conf.getBoolean(WAL_SPLIT_TO_HFILE, DEFAULT_WAL_SPLIT_TO_HFILE);
@ -174,10 +191,12 @@ public class WALSplitter {
*/ */
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) SplitLogWorkerCoordination splitLogWorkerCoordination, WALFactory factory,
throws IOException { RegionServerServices rsServices) throws IOException {
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, idChecker, Path rootDir = FSUtils.getRootDir(conf);
splitLogWorkerCoordination); FileSystem rootFS = rootDir.getFileSystem(conf);
WALSplitter s = new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, idChecker,
splitLogWorkerCoordination, rsServices);
return s.splitLogFile(logfile, reporter); return s.splitLogFile(logfile, reporter);
} }
@ -186,16 +205,19 @@ public class WALSplitter {
// It is public only because TestWALObserver is in a different package, // It is public only because TestWALObserver is in a different package,
// which uses this method to do log splitting. // which uses this method to do log splitting.
@VisibleForTesting @VisibleForTesting
public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir, public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
FileSystem walFS, Configuration conf, final WALFactory factory) throws IOException { Configuration conf, final WALFactory factory) throws IOException {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Path rootDir = FSUtils.getRootDir(conf);
Collections.singletonList(logDir), null); FileSystem rootFS = rootDir.getFileSystem(conf);
final FileStatus[] logfiles =
SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null);
List<Path> splits = new ArrayList<>(); List<Path> splits = new ArrayList<>();
if (ArrayUtils.isNotEmpty(logfiles)) { if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile: logfiles) { for (FileStatus logfile : logfiles) {
WALSplitter s = new WALSplitter(factory, conf, rootDir, walFS, null, null); WALSplitter s =
new WALSplitter(factory, conf, walDir, walFS, rootDir, rootFS, null, null, null);
if (s.splitLogFile(logfile, null)) { if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf); finishSplitLogFile(walDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) { if (s.outputSink.splits != null) {
splits.addAll(s.outputSink.splits); splits.addAll(s.outputSink.splits);
} }

View File

@ -901,8 +901,8 @@ public abstract class AbstractTestWALReplay {
FileStatus[] listStatus = wal.getFiles(); FileStatus[] listStatus = wal.getFiles();
assertNotNull(listStatus); assertNotNull(listStatus);
assertTrue(listStatus.length > 0); assertTrue(listStatus.length > 0);
WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null,
this.fs, this.conf, null, null, null, wals); wals, null);
FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName), FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName),
new Path(hri.getEncodedName(), "recovered.edits")), new Path(hri.getEncodedName(), "recovered.edits")),
new PathFilter() { new PathFilter() {
@ -1052,8 +1052,8 @@ public abstract class AbstractTestWALReplay {
first = fs.getFileStatus(smallFile); first = fs.getFileStatus(smallFile);
second = fs.getFileStatus(largeFile); second = fs.getFileStatus(largeFile);
} }
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals); WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, wals, null);
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals); WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, wals, null);
WAL wal = createWAL(this.conf, hbaseRootDir, logName); WAL wal = createWAL(this.conf, hbaseRootDir, logName);
region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());

View File

@ -173,7 +173,7 @@ public class TestWALReaderOnSecureWAL {
FileStatus[] listStatus = fs.listStatus(walPath.getParent()); FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
try { try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null); WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
s.splitLogFile(listStatus[0], null); s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt"); "corrupt");
@ -217,7 +217,7 @@ public class TestWALReaderOnSecureWAL {
FileStatus[] listStatus = fs.listStatus(walPath.getParent()); FileStatus[] listStatus = fs.listStatus(walPath.getParent());
Path rootdir = FSUtils.getRootDir(conf); Path rootdir = FSUtils.getRootDir(conf);
try { try {
WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, null, null); WALSplitter s = new WALSplitter(wals, conf, rootdir, fs, rootdir, fs, null, null, null);
s.splitLogFile(listStatus[0], null); s.splitLogFile(listStatus[0], null);
Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()),
"corrupt"); "corrupt");

View File

@ -812,15 +812,16 @@ public class TestWALSplit {
} }
assertTrue("There should be some log greater than size 0.", 0 < largestSize); assertTrue("There should be some log greater than size 0.", 0 < largestSize);
// Set up a splitter that will throw an IOE on the output side // Set up a splitter that will throw an IOE on the output side
WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { WALSplitter logSplitter =
@Override new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
protected Writer createWriter(Path logfile) throws IOException { @Override
Writer mockWriter = Mockito.mock(Writer.class); protected Writer createWriter(Path logfile) throws IOException {
Mockito.doThrow(new IOException("Injected")).when( Writer mockWriter = Mockito.mock(Writer.class);
mockWriter).append(Mockito.<Entry>any()); Mockito.doThrow(new IOException("Injected")).when(mockWriter)
return mockWriter; .append(Mockito.<Entry> any());
} return mockWriter;
}; }
};
// Set up a background thread dumper. Needs a thread to depend on and then we need to run // Set up a background thread dumper. Needs a thread to depend on and then we need to run
// the thread dumping in a background thread so it does not hold up the test. // the thread dumping in a background thread so it does not hold up the test.
final AtomicBoolean stop = new AtomicBoolean(false); final AtomicBoolean stop = new AtomicBoolean(false);
@ -941,8 +942,8 @@ public class TestWALSplit {
try { try {
conf.setInt("hbase.splitlog.report.period", 1000); conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = WALSplitter.splitLogFile( boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, wals); null, wals, null);
assertFalse("Log splitting should failed", ret); assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0); assertTrue(count.get() > 0);
} catch (IOException e) { } catch (IOException e) {
@ -1000,7 +1001,8 @@ public class TestWALSplit {
makeRegionDirs(regions); makeRegionDirs(regions);
// Create a splitter that reads and writes the data without touching disk // Create a splitter that reads and writes the data without touching disk
WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, null, null) { WALSplitter logSplitter =
new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
/* Produce a mock writer that doesn't write anywhere */ /* Produce a mock writer that doesn't write anywhere */
@Override @Override
@ -1148,7 +1150,8 @@ public class TestWALSplit {
assertTrue("There should be some log file", assertTrue("There should be some log file",
logfiles != null && logfiles.length > 0); logfiles != null && logfiles.length > 0);
WALSplitter logSplitter = new WALSplitter(wals, conf, HBASEDIR, fs, null, null) { WALSplitter logSplitter =
new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
@Override @Override
protected Writer createWriter(Path logfile) protected Writer createWriter(Path logfile)
throws IOException { throws IOException {

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -172,8 +173,9 @@ public class TestWALSplitToHFile {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final TableDescriptor td = createBasic3FamilyTD(tableName); final TableDescriptor td = createBasic3FamilyTD(tableName);
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(basedir); deleteDir(tableDir);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
final byte[] rowName = tableName.getName(); final byte[] rowName = tableName.getName();
final int countPerFamily = 10; final int countPerFamily = 10;
@ -203,7 +205,12 @@ public class TestWALSplitToHFile {
// all edits in logs are seen as 'stale'/old. // all edits in logs are seen as 'stale'/old.
region.close(true); region.close(true);
wal.shutdown(); wal.shutdown();
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); try {
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
} catch (Exception e) {
LOG.debug("Got exception", e);
}
WAL wal2 = createWAL(this.conf, rootDir, logName); WAL wal2 = createWAL(this.conf, rootDir, logName);
HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
long seqid2 = region2.getOpenSeqNum(); long seqid2 = region2.getOpenSeqNum();
@ -230,7 +237,7 @@ public class TestWALSplitToHFile {
FileSystem newFS = FileSystem.get(newConf); FileSystem newFS = FileSystem.get(newConf);
// Make a new wal for new region open. // Make a new wal for new region open.
WAL wal3 = createWAL(newConf, rootDir, logName); WAL wal3 = createWAL(newConf, rootDir, logName);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, ri, td, null); HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
long seqid3 = region3.initialize(); long seqid3 = region3.initialize();
Result result3 = region3.get(g); Result result3 = region3.get(g);
// Assert that count of cells is same as before crash. // Assert that count of cells is same as before crash.
@ -259,11 +266,12 @@ public class TestWALSplitToHFile {
throws IOException, SecurityException, IllegalArgumentException { throws IOException, SecurityException, IllegalArgumentException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(basedir); deleteDir(tableDir);
final byte[] rowName = tableName.getName(); final byte[] rowName = tableName.getName();
final int countPerFamily = 10; final int countPerFamily = 10;
final TableDescriptor td = createBasic3FamilyTD(tableName); final TableDescriptor td = createBasic3FamilyTD(tableName);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
HBaseTestingUtility.closeRegionAndWAL(region3); HBaseTestingUtility.closeRegionAndWAL(region3);
// Write countPerFamily edits into the three families. Do a flush on one // Write countPerFamily edits into the three families. Do a flush on one
@ -318,9 +326,10 @@ public class TestWALSplitToHFile {
public void testReplayEditsAfterAbortingFlush() throws IOException { public void testReplayEditsAfterAbortingFlush() throws IOException {
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
final Path basedir = FSUtils.getTableDir(this.rootDir, tableName); final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
deleteDir(basedir); deleteDir(tableDir);
final TableDescriptor td = createBasic3FamilyTD(tableName); final TableDescriptor td = createBasic3FamilyTD(tableName);
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
HBaseTestingUtility.closeRegionAndWAL(region3); HBaseTestingUtility.closeRegionAndWAL(region3);
// Write countPerFamily edits into the three families. Do a flush on one // Write countPerFamily edits into the three families. Do a flush on one