HBASE-2641 Refactor HLog splitLog, hbase-2437 continued; break out split code as new classes

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1001924 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-09-27 22:02:23 +00:00
parent 10a84f3b88
commit 823cfeac9c
17 changed files with 806 additions and 497 deletions

View File

@ -30,6 +30,9 @@ Release 0.21.0 - Unreleased
HBASE-2692 Master rewrite and cleanup for 0.90
(Karthik Ranganathan, Jon Gray & Stack)
HBASE-2961 Close zookeeper when done with it (HCM, Master, and RS)
HBASE-2641 HBASE-2641 Refactor HLog splitLog, hbase-2437 continued;
break out split code as new classes
(James Kennedy via Stack)
BUG FIXES

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -180,7 +181,8 @@ public class MasterFileSystem {
this.splitLogLock.lock();
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
try {
HLog.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
HLogSplitter splitter = HLogSplitter.createLogSplitter(conf);
splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
} catch (IOException e) {
LOG.error("Failed splitting " + logDir.toString(), e);
} finally {

View File

@ -51,12 +51,12 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@ -2436,7 +2436,7 @@ public class HRegion implements HeapSize { // , Writable{
* @return Returns <code>this</code>
* @throws IOException
*/
HRegion openHRegion(final Progressable reporter)
protected HRegion openHRegion(final Progressable reporter)
throws IOException {
long seqid = initialize(reporter);
if (this.log != null) {
@ -3034,6 +3034,13 @@ public class HRegion implements HeapSize { // , Writable{
return old;
}
/**
* Give the region a chance to prepare before it is split.
*/
protected void prepareToSplit() {
// nothing
}
/**
* Checks every store to see if one has too many
* store files

View File

@ -164,4 +164,9 @@ class LogRoller extends Thread implements WALObserver {
WALEdit logEdit) {
// Not interested.
}
@Override
public void logCloseRequested() {
// not interested
}
}

View File

@ -134,6 +134,7 @@ class SplitTransaction {
public boolean prepare() {
if (this.parent.isClosed() || this.parent.isClosing()) return false;
HRegionInfo hri = this.parent.getRegionInfo();
parent.prepareToSplit();
// Check splitrow.
byte [] startKey = hri.getStartKey();
byte [] endKey = hri.getEndKey();

View File

@ -19,11 +19,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
@ -33,23 +30,14 @@ import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@ -70,10 +58,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@ -81,8 +66,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
* implementation.
@ -289,7 +272,7 @@ public class HLog implements Syncable {
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
final Configuration conf)
throws IOException {
this(fs, dir, oldLogDir, conf, null, null);
this(fs, dir, oldLogDir, conf, null, true, null);
}
/**
@ -313,7 +296,33 @@ public class HLog implements Syncable {
*/
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
final Configuration conf, final List<WALObserver> listeners,
final String prefix)
final String prefix) throws IOException {
this(fs, dir, oldLogDir, conf, listeners, true, prefix);
}
/**
* Create an edit log at the given <code>dir</code> location.
*
* You should never have to load an existing log. If there is a log at
* startup, it should have already been processed and deleted by the time the
* HLog object is started up.
*
* @param fs filesystem handle
* @param dir path to where hlogs are stored
* @param oldLogDir path to where hlogs are archived
* @param conf configuration to use
* @param listeners Listeners on WAL events. Listeners passed here will
* be registered before we do anything else; e.g. the
* Constructor {@link #rollWriter().
* @param failIfLogDirExists If true IOException will be thrown if dir already exists.
* @param prefix should always be hostname and port in distributed env and
* it will be URL encoded before being used.
* If prefix is null, "hlog" will be used
* @throws IOException
*/
public HLog(final FileSystem fs, final Path dir, final Path oldLogDir,
final Configuration conf, final List<WALObserver> listeners,
final boolean failIfLogDirExists, final String prefix)
throws IOException {
super();
this.fs = fs;
@ -333,7 +342,7 @@ public class HLog implements Syncable {
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
if (fs.exists(dir)) {
if (failIfLogDirExists && fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
}
fs.mkdirs(dir);
@ -464,7 +473,8 @@ public class HLog implements Syncable {
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
HLog.Writer nextWriter = createWriter(fs, newPath, HBaseConfiguration.create(conf));
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath,
HBaseConfiguration.create(conf));
int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
@ -516,6 +526,21 @@ public class HLog implements Syncable {
return regionsToFlush;
}
/**
* This method allows subclasses to inject different writers without having to
* extend other methods like rollWriter().
*
* @param fs
* @param path
* @param conf
* @return
* @throws IOException
*/
protected Writer createWriterInstance(final FileSystem fs, final Path path,
final Configuration conf) throws IOException {
return createWriter(fs, path, conf);
}
/**
* Get a reader for the WAL.
* @param fs
@ -529,7 +554,7 @@ public class HLog implements Syncable {
throws IOException {
try {
if (logReaderClass == null) {
logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl",
logReaderClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class);
}
@ -757,6 +782,12 @@ public class HLog implements Syncable {
cacheFlushLock.lock();
try {
// Tell our listeners that the log is closing
if (!this.listeners.isEmpty()) {
for (WALObserver i : this.listeners) {
i.logCloseRequested();
}
}
synchronized (updateLock) {
this.closed = true;
if (LOG.isDebugEnabled()) {
@ -1200,71 +1231,6 @@ public class HLog implements Syncable {
return Bytes.equals(METAFAMILY, family);
}
/**
* Split up a bunch of regionserver commit log files that are no longer
* being written to, into new files, one per region for region to replay on
* startup. Delete the old log files when finished.
*
* @param rootDir qualified root directory of the HBase instance
* @param srcDir Directory of log files to split: e.g.
* <code>${ROOTDIR}/log_HOST_PORT</code>
* @param oldLogDir directory where processed (split) logs will be archived to
* @param fs FileSystem
* @param conf Configuration
* @throws IOException will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileSystem fs, final Configuration conf)
throws IOException {
long millis = System.currentTimeMillis();
List<Path> splits = null;
if (!fs.exists(srcDir)) {
// Nothing to do
return splits;
}
FileStatus [] logfiles = fs.listStatus(srcDir);
if (logfiles == null || logfiles.length == 0) {
// Nothing to do
return splits;
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
srcDir.toString());
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
try {
FileStatus[] files = fs.listStatus(srcDir);
for(FileStatus file : files) {
Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " +
FSUtils.getPath(newPath));
fs.rename(file.getPath(), newPath);
}
LOG.debug("Moved " + files.length + " log files to " +
FSUtils.getPath(oldLogDir));
fs.delete(srcDir, true);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
IOException io = new IOException("Cannot delete: " + srcDir);
io.initCause(e);
throw io;
}
long endMillis = System.currentTimeMillis();
LOG.info("hlog file splitting completed in " + (endMillis - millis) +
" millis for " + srcDir.toString());
return splits;
}
// Private immutable datastructure to hold Writer and its Path.
private final static class WriterAndPath {
final Path p;
final Writer w;
WriterAndPath(final Path p, final Writer w) {
this.p = p;
this.w = w;
}
}
@SuppressWarnings("unchecked")
public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
return (Class<? extends HLogKey>)
@ -1282,103 +1248,6 @@ public class HLog implements Syncable {
}
}
/**
* Sorts the HLog edits in the given list of logfiles (that are a mix of edits on multiple regions)
* by region and then splits them per region directories, in batches of (hbase.hlog.split.batch.size)
*
* A batch consists of a set of log files that will be sorted in a single map of edits indexed by region
* the resulting map will be concurrently written by multiple threads to their corresponding regions
*
* Each batch consists of more more log files that are
* - recovered (files is opened for append then closed to ensure no process is writing into it)
* - parsed (each edit in the log is appended to a list of edits indexed by region
* see {@link #parseHLog} for more details)
* - marked as either processed or corrupt depending on parsing outcome
* - the resulting edits indexed by region are concurrently written to their corresponding region
* region directories
* - original files are then archived to a different directory
*
*
*
* @param rootDir hbase directory
* @param srcDir logs directory
* @param oldLogDir directory where processed logs are archived to
* @param logfiles the list of log files to split
* @param fs
* @param conf
* @return
* @throws IOException
*/
private static List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
final Configuration conf)
throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
final Map<byte [], WriterAndPath> logWriters =
Collections.synchronizedMap(
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
List<Path> splits = null;
// Number of logs in a read batch
// More means faster but bigger mem consumption
//TODO make a note on the conf rename and update hbase-site.xml if needed
int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
try {
int i = -1;
while (i < logfiles.length) {
final Map<byte[], LinkedList<Entry>> editsByRegion =
new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
for (int j = 0; j < logFilesPerStep; j++) {
i++;
if (i == logfiles.length) {
break;
}
FileStatus log = logfiles[i];
Path logPath = log.getPath();
long logLength = log.getLen();
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logPath + ", length=" + logLength );
try {
recoverFileLease(fs, logPath, conf);
parseHLog(log, editsByRegion, fs, conf);
processedLogs.add(logPath);
} catch (EOFException eof) {
// truncated files are expected if a RS crashes (see HBASE-2643)
LOG.info("EOF from hlog " + logPath + ". continuing");
processedLogs.add(logPath);
} catch (IOException e) {
if (skipErrors) {
LOG.warn("Got while parsing hlog " + logPath +
". Marking as corrupted", e);
corruptedLogs.add(logPath);
} else {
throw e;
}
}
}
writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
}
if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
throw new IOException("Discovered orphan hlog after split. Maybe " +
"HRegionServer was not dead when we started");
}
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
} finally {
splits = new ArrayList<Path>(logWriters.size());
for (WriterAndPath wap : logWriters.values()) {
wap.w.close();
splits.add(wap.p);
LOG.debug("Closed " + wap.p);
}
}
return splits;
}
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
@ -1474,257 +1343,27 @@ public class HLog implements Syncable {
return dirName.toString();
}
/**
* Get the directory we are making logs in.
*
* @return dir
*/
protected Path getDir() {
return dir;
}
public static boolean validateHLogFilename(String filename) {
return pattern.matcher(filename).matches();
}
private static Path getHLogArchivePath(Path oldLogDir, Path p) {
static Path getHLogArchivePath(Path oldLogDir, Path p) {
return new Path(oldLogDir, p.getName());
}
/**
* Takes splitLogsMap and concurrently writes them to region directories using a thread pool
*
* @param splitLogsMap map that contains the log splitting result indexed by region
* @param logWriters map that contains a writer per region
* @param rootDir hbase root dir
* @param fs
* @param conf
* @throws IOException
*/
private static void writeEditsBatchToRegions(
final Map<byte[], LinkedList<Entry>> splitLogsMap,
final Map<byte[], WriterAndPath> logWriters,
final Path rootDir, final FileSystem fs, final Configuration conf)
throws IOException {
// Number of threads to use when log splitting to rewrite the logs.
// More means faster but bigger mem consumption.
int logWriterThreads =
conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("SplitWriter-%1$d");
ThreadFactory factory = builder.build();
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
for (final byte [] region : splitLogsMap.keySet()) {
Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
writeFutureResult.put(region, threadPool.submit(splitter));
}
threadPool.shutdown();
// Wait for all threads to terminate
try {
for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds";
if (j < 30) {
LOG.debug(message);
} else {
LOG.info(message);
}
}
} catch(InterruptedException ex) {
LOG.warn("Hlog writers were interrupted, possible data loss!");
if (!skipErrors) {
throw new IOException("Could not finish writing log entries", ex);
//TODO maybe we should fail here regardless if skipErrors is active or not
}
}
for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
try {
entry.getValue().get();
} catch (ExecutionException e) {
throw (new IOException(e.getCause()));
} catch (InterruptedException e1) {
LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) +
" was interrupted, however the write process should have " +
"finished. Throwing up ", e1);
throw (new IOException(e1.getCause()));
}
}
}
/*
* Parse a single hlog and put the edits in @splitLogsMap
*
* @param logfile to split
* @param splitLogsMap output parameter: a map with region names as keys and a
* list of edits as values
* @param fs the filesystem
* @param conf the configuration
* @throws IOException if hlog is corrupted, or can't be open
*/
private static void parseHLog(final FileStatus logfile,
final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
final Configuration conf)
throws IOException {
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed.
long length = logfile.getLen();
if (length <= 0) {
LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
}
Path path = logfile.getPath();
Reader in;
int editsCount = 0;
try {
in = HLog.getReader(fs, path, conf);
} catch (EOFException e) {
if (length <= 0) {
//TODO should we ignore an empty, not-last log file if skip.errors is false?
//Either way, the caller should decide what to do. E.g. ignore if this is the last
//log in sequence.
//TODO is this scenario still possible if the log has been recovered (i.e. closed)
LOG.warn("Could not open " + path + " for reading. File is empty: " + e);
return;
} else {
throw e;
}
}
try {
Entry entry;
while ((entry = in.next()) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
LinkedList<Entry> queue = splitLogsMap.get(region);
if (queue == null) {
queue = new LinkedList<Entry>();
splitLogsMap.put(region, queue);
}
queue.addLast(entry);
editsCount++;
}
} finally {
LOG.debug("Pushed=" + editsCount + " entries from " + path);
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
LOG.warn("Close log reader in finally threw exception -- continuing", e);
}
}
}
private static Callable<Void> createNewSplitter(final Path rootDir,
final Map<byte[], WriterAndPath> logWriters,
final Map<byte[], LinkedList<Entry>> logEntries,
final byte[] region, final FileSystem fs, final Configuration conf) {
return new Callable<Void>() {
public String getName() {
return "Split writer thread for region " + Bytes.toStringBinary(region);
}
@Override
public Void call() throws IOException {
LinkedList<Entry> entries = logEntries.get(region);
LOG.debug(this.getName()+" got " + entries.size() + " to process");
long threadTime = System.currentTimeMillis();
try {
int editsCount = 0;
WriterAndPath wap = logWriters.get(region);
for (Entry logEntry: entries) {
if (wap == null) {
Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
if (fs.exists(regionedits)) {
LOG.warn("Found existing old edits file. It could be the " +
"result of a previous failed split attempt. Deleting " +
regionedits + ", length=" + fs.getFileStatus(regionedits).getLen());
if (!fs.delete(regionedits, false)) {
LOG.warn("Failed delete of old " + regionedits);
}
}
Writer w = createWriter(fs, regionedits, conf);
wap = new WriterAndPath(regionedits, w);
logWriters.put(region, wap);
LOG.debug("Creating writer path=" + regionedits +
" region=" + Bytes.toStringBinary(region));
}
wap.w.append(logEntry);
editsCount++;
}
LOG.debug(this.getName() + " Applied " + editsCount +
" total edits to " + Bytes.toStringBinary(region) +
" in " + (System.currentTimeMillis() - threadTime) + "ms");
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.fatal(this.getName() + " Got while writing log entry to log", e);
throw e;
}
return null;
}
};
}
/**
* Moves processed logs to a oldLogDir after successful processing
* Moves corrupted logs (any log that couldn't be successfully parsed
* to corruptDir (.corrupt) for later investigation
*
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
* @param fs
* @param conf
* @throws IOException
*/
private static void archiveLogs(final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf)
throws IOException{
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR),
conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
fs.mkdirs(corruptDir);
fs.mkdirs(oldLogDir);
for (Path corrupted: corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
LOG.info("Moving corrupted log " + corrupted + " to " + p);
fs.rename(corrupted, p);
}
for (Path p: processedLogs) {
Path newPath = getHLogArchivePath(oldLogDir, p);
fs.rename(p, newPath);
LOG.info("Archived processed log " + p + " to " + newPath);
}
}
/*
* Path to a file under RECOVERED_EDITS_DIR directory of the region found in
* <code>logEntry</code> named for the sequenceid in the passed
* <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
* This method also ensures existence of RECOVERED_EDITS_DIR under the region
* creating it if necessary.
* @param fs
* @param logEntry
* @param rootDir HBase root dir.
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir)
throws IOException {
Path tableDir = HTableDescriptor.getTableDir(rootDir,
logEntry.getKey().getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
Bytes.toString(logEntry.getKey().getEncodedRegionName()));
Path dir = getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
return new Path(dir,
formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()));
}
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
/**
* Returns sorted set of edit files made by wal-log splitter.
* @param fs
@ -1736,7 +1375,7 @@ public class HLog implements Syncable {
final Path regiondir)
throws IOException {
Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
FileStatus [] files = fs.listStatus(editsdir, new PathFilter () {
FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
@Override
public boolean accept(Path p) {
boolean result = false;
@ -1835,7 +1474,9 @@ public class HLog implements Syncable {
if (!fs.getFileStatus(p).isDir()) {
throw new IOException(p + " is not a directory");
}
splitLog(baseDir, p, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(baseDir, p, oldLogDir, fs, conf);
}
/**

View File

@ -0,0 +1,548 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import com.google.common.util.concurrent.NamingThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class is responsible for splitting up a bunch of regionserver commit log
* files that are no longer being written to, into new files, one per region for
* region to replay on startup. Delete the old log files when finished.
*/
public class HLogSplitter {
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
/**
* Name of file that holds recovered edits written by the wal log splitting
* code, one per region
*/
public static final String RECOVERED_EDITS = "recovered.edits";
/**
* Create a new HLogSplitter using the given {@link Configuration} and the
* <code>hbase.hlog.splitter.impl</code> property to derived the instance
* class to use.
*
* @param conf
* @return New HLogSplitter instance
*/
public static HLogSplitter createLogSplitter(Configuration conf) {
@SuppressWarnings("unchecked")
Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
.getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
try {
return splitterClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
// Private immutable datastructure to hold Writer and its Path.
private final static class WriterAndPath {
final Path p;
final Writer w;
WriterAndPath(final Path p, final Writer w) {
this.p = p;
this.w = w;
}
}
/**
* Split up a bunch of regionserver commit log files that are no longer being
* written to, into new files, one per region for region to replay on startup.
* Delete the old log files when finished.
*
* @param rootDir
* qualified root directory of the HBase instance
* @param srcDir
* Directory of log files to split: e.g.
* <code>${ROOTDIR}/log_HOST_PORT</code>
* @param oldLogDir
* directory where processed (split) logs will be archived to
* @param fs
* FileSystem
* @param conf
* Configuration
* @throws IOException
* will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
public List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileSystem fs, final Configuration conf)
throws IOException {
long millis = System.currentTimeMillis();
List<Path> splits = null;
if (!fs.exists(srcDir)) {
// Nothing to do
return splits;
}
FileStatus[] logfiles = fs.listStatus(srcDir);
if (logfiles == null || logfiles.length == 0) {
// Nothing to do
return splits;
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in "
+ srcDir.toString());
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
try {
FileStatus[] files = fs.listStatus(srcDir);
for (FileStatus file : files) {
Path newPath = HLog.getHLogArchivePath(oldLogDir, file.getPath());
LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to "
+ FSUtils.getPath(newPath));
fs.rename(file.getPath(), newPath);
}
LOG.debug("Moved " + files.length + " log files to "
+ FSUtils.getPath(oldLogDir));
fs.delete(srcDir, true);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
IOException io = new IOException("Cannot delete: " + srcDir);
io.initCause(e);
throw io;
}
long endMillis = System.currentTimeMillis();
LOG.info("hlog file splitting completed in " + (endMillis - millis)
+ " millis for " + srcDir.toString());
return splits;
}
/**
* Sorts the HLog edits in the given list of logfiles (that are a mix of edits
* on multiple regions) by region and then splits them per region directories,
* in batches of (hbase.hlog.split.batch.size)
*
* A batch consists of a set of log files that will be sorted in a single map
* of edits indexed by region the resulting map will be concurrently written
* by multiple threads to their corresponding regions
*
* Each batch consists of more more log files that are - recovered (files is
* opened for append then closed to ensure no process is writing into it) -
* parsed (each edit in the log is appended to a list of edits indexed by
* region see {@link #parseHLog} for more details) - marked as either
* processed or corrupt depending on parsing outcome - the resulting edits
* indexed by region are concurrently written to their corresponding region
* region directories - original files are then archived to a different
* directory
*
*
*
* @param rootDir
* hbase directory
* @param srcDir
* logs directory
* @param oldLogDir
* directory where processed logs are archived to
* @param logfiles
* the list of log files to split
* @param fs
* @param conf
* @return
* @throws IOException
*/
private List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
final Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
final Map<byte[], WriterAndPath> logWriters = Collections
.synchronizedMap(new TreeMap<byte[], WriterAndPath>(
Bytes.BYTES_COMPARATOR));
List<Path> splits = null;
// Number of logs in a read batch
// More means faster but bigger mem consumption
// TODO make a note on the conf rename and update hbase-site.xml if needed
int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
try {
int i = -1;
while (i < logfiles.length) {
final Map<byte[], LinkedList<Entry>> editsByRegion = new TreeMap<byte[], LinkedList<Entry>>(
Bytes.BYTES_COMPARATOR);
for (int j = 0; j < logFilesPerStep; j++) {
i++;
if (i == logfiles.length) {
break;
}
FileStatus log = logfiles[i];
Path logPath = log.getPath();
long logLength = log.getLen();
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length
+ ": " + logPath + ", length=" + logLength);
try {
recoverFileLease(fs, logPath, conf);
parseHLog(log, editsByRegion, fs, conf);
processedLogs.add(logPath);
} catch (EOFException eof) {
// truncated files are expected if a RS crashes (see HBASE-2643)
LOG.info("EOF from hlog " + logPath + ". continuing");
processedLogs.add(logPath);
} catch (IOException e) {
if (skipErrors) {
LOG.warn("Got while parsing hlog " + logPath
+ ". Marking as corrupted", e);
corruptedLogs.add(logPath);
} else {
throw e;
}
}
}
writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
}
if (fs.listStatus(srcDir).length > processedLogs.size()
+ corruptedLogs.size()) {
throw new IOException("Discovered orphan hlog after split. Maybe "
+ "HRegionServer was not dead when we started");
}
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
} finally {
splits = new ArrayList<Path>(logWriters.size());
for (WriterAndPath wap : logWriters.values()) {
wap.w.close();
splits.add(wap.p);
LOG.debug("Closed " + wap.p);
}
}
return splits;
}
/**
* Takes splitLogsMap and concurrently writes them to region directories using a thread pool
*
* @param splitLogsMap map that contains the log splitting result indexed by region
* @param logWriters map that contains a writer per region
* @param rootDir hbase root dir
* @param fs
* @param conf
* @throws IOException
*/
private void writeEditsBatchToRegions(
final Map<byte[], LinkedList<Entry>> splitLogsMap,
final Map<byte[], WriterAndPath> logWriters, final Path rootDir,
final FileSystem fs, final Configuration conf)
throws IOException {
// Number of threads to use when log splitting to rewrite the logs.
// More means faster but bigger mem consumption.
int logWriterThreads = conf.getInt(
"hbase.regionserver.hlog.splitlog.writer.threads", 3);
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("SplitWriter-%1$d");
ThreadFactory factory = builder.build();
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
for (final byte[] region : splitLogsMap.keySet()) {
Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap,
region, fs, conf);
writeFutureResult.put(region, threadPool.submit(splitter));
}
threadPool.shutdown();
// Wait for all threads to terminate
try {
for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
String message = "Waiting for hlog writers to terminate, elapsed " + j
* 5 + " seconds";
if (j < 30) {
LOG.debug(message);
} else {
LOG.info(message);
}
}
} catch (InterruptedException ex) {
LOG.warn("Hlog writers were interrupted, possible data loss!");
if (!skipErrors) {
throw new IOException("Could not finish writing log entries", ex);
// TODO maybe we should fail here regardless if skipErrors is active or not
}
}
for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
try {
entry.getValue().get();
} catch (ExecutionException e) {
throw (new IOException(e.getCause()));
} catch (InterruptedException e1) {
LOG.warn("Writer for region " + Bytes.toString(entry.getKey())
+ " was interrupted, however the write process should have "
+ "finished. Throwing up ", e1);
throw (new IOException(e1.getCause()));
}
}
}
/**
* Moves processed logs to a oldLogDir after successful processing Moves
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
* (.corrupt) for later investigation
*
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
* @param fs
* @param conf
* @throws IOException
*/
private static void archiveLogs(final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf) throws IOException {
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
fs.mkdirs(corruptDir);
fs.mkdirs(oldLogDir);
for (Path corrupted : corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
LOG.info("Moving corrupted log " + corrupted + " to " + p);
fs.rename(corrupted, p);
}
for (Path p : processedLogs) {
Path newPath = HLog.getHLogArchivePath(oldLogDir, p);
fs.rename(p, newPath);
LOG.info("Archived processed log " + p + " to " + newPath);
}
}
/**
* Path to a file under RECOVERED_EDITS_DIR directory of the region found in
* <code>logEntry</code> named for the sequenceid in the passed
* <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
* This method also ensures existence of RECOVERED_EDITS_DIR under the region
* creating it if necessary.
* @param fs
* @param logEntry
* @param rootDir HBase root dir.
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir) throws IOException {
Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey()
.getTablename());
Path regiondir = HRegion.getRegionDir(tableDir,
Bytes.toString(logEntry.getKey().getEncodedRegionName()));
Path dir = HLog.getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
.getLogSeqNum()));
}
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
/*
* Parse a single hlog and put the edits in @splitLogsMap
*
* @param logfile to split
* @param splitLogsMap output parameter: a map with region names as keys and a
* list of edits as values
* @param fs the filesystem
* @param conf the configuration
* @throws IOException if hlog is corrupted, or can't be open
*/
private void parseHLog(final FileStatus logfile,
final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
final Configuration conf)
throws IOException {
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed.
long length = logfile.getLen();
if (length <= 0) {
LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
}
Path path = logfile.getPath();
Reader in;
int editsCount = 0;
try {
in = getReader(fs, path, conf);
} catch (EOFException e) {
if (length <= 0) {
//TODO should we ignore an empty, not-last log file if skip.errors is false?
//Either way, the caller should decide what to do. E.g. ignore if this is the last
//log in sequence.
//TODO is this scenario still possible if the log has been recovered (i.e. closed)
LOG.warn("Could not open " + path + " for reading. File is empty" + e);
return;
} else {
throw e;
}
}
try {
Entry entry;
while ((entry = in.next()) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
LinkedList<Entry> queue = splitLogsMap.get(region);
if (queue == null) {
queue = new LinkedList<Entry>();
splitLogsMap.put(region, queue);
}
queue.addLast(entry);
editsCount++;
}
} finally {
LOG.debug("Pushed=" + editsCount + " entries from " + path);
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
LOG
.warn("Close log reader in finally threw exception -- continuing",
e);
}
}
}
private Callable<Void> createNewSplitter(final Path rootDir,
final Map<byte[], WriterAndPath> logWriters,
final Map<byte[], LinkedList<Entry>> logEntries, final byte[] region,
final FileSystem fs, final Configuration conf) {
return new Callable<Void>() {
public String getName() {
return "Split writer thread for region " + Bytes.toStringBinary(region);
}
@Override
public Void call() throws IOException {
LinkedList<Entry> entries = logEntries.get(region);
LOG.debug(this.getName() + " got " + entries.size() + " to process");
long threadTime = System.currentTimeMillis();
try {
int editsCount = 0;
WriterAndPath wap = logWriters.get(region);
for (Entry logEntry : entries) {
if (wap == null) {
Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
if (fs.exists(regionedits)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting "
+ regionedits + ", length="
+ fs.getFileStatus(regionedits).getLen());
if (!fs.delete(regionedits, false)) {
LOG.warn("Failed delete of old " + regionedits);
}
}
Writer w = createWriter(fs, regionedits, conf);
wap = new WriterAndPath(regionedits, w);
logWriters.put(region, wap);
LOG.debug("Creating writer path=" + regionedits + " region="
+ Bytes.toStringBinary(region));
}
wap.w.append(logEntry);
editsCount++;
}
LOG.debug(this.getName() + " Applied " + editsCount
+ " total edits to " + Bytes.toStringBinary(region) + " in "
+ (System.currentTimeMillis() - threadTime) + "ms");
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.fatal(this.getName() + " Got while writing log entry to log", e);
throw e;
}
return null;
}
};
}
/**
* Create a new {@link Writer} for writing log splits.
*
* @param fs
* @param logfile
* @param conf
* @return A new Writer instance
* @throws IOException
*/
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
return HLog.createWriter(fs, logfile, conf);
}
/**
* Create a new {@link Reader} for reading logs to split.
*
* @param fs
* @param curLogFile
* @param conf
* @return A new Reader instance
* @throws IOException
*/
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
throws IOException {
return HLog.getReader(fs, curLogFile, conf);
}
}

View File

@ -30,8 +30,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.io.SequenceFile;
import org.mortbay.log.Log;
@ -103,7 +101,24 @@ public class SequenceFileLogReader implements HLog.Reader {
int edit = 0;
long entryStart = 0;
public SequenceFileLogReader() { }
private Class<? extends HLogKey> keyClass;
/**
* Default constructor.
*/
public SequenceFileLogReader() {
}
/**
* This constructor allows a specific HLogKey implementation to override that
* which would otherwise be chosen via configuration property.
*
* @param keyClass
*/
public SequenceFileLogReader(Class<? extends HLogKey> keyClass) {
this.keyClass = keyClass;
}
@Override
public void init(FileSystem fs, Path path, Configuration conf)
@ -132,7 +147,19 @@ public class SequenceFileLogReader implements HLog.Reader {
this.entryStart = this.reader.getPosition();
HLog.Entry e = reuse;
if (e == null) {
HLogKey key = HLog.newKey(conf);
HLogKey key;
if (keyClass == null) {
key = HLog.newKey(conf);
} else {
try {
key = keyClass.newInstance();
} catch (InstantiationException ie) {
throw new IOException(ie);
} catch (IllegalAccessException iae) {
throw new IOException(iae);
}
}
WALEdit val = new WALEdit();
e = new HLog.Entry(key, val);
}

View File

@ -48,16 +48,36 @@ public class SequenceFileLogWriter implements HLog.Writer {
// The syncFs method from hdfs-200 or null if not available.
private Method syncFs;
private Class<? extends HLogKey> keyClass;
/**
* Default constructor.
*/
public SequenceFileLogWriter() {
super();
}
/**
* This constructor allows a specific HLogKey implementation to override that
* which would otherwise be chosen via configuration property.
*
* @param keyClass
*/
public SequenceFileLogWriter(Class<? extends HLogKey> keyClass) {
this.keyClass = keyClass;
}
@Override
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
if (null == keyClass) {
keyClass = HLog.getKeyClass(conf);
}
// Create a SF.Writer instance.
this.writer = SequenceFile.createWriter(fs, conf, path,
HLog.getKeyClass(conf), WALEdit.class,
keyClass, WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
fs.getDefaultReplication()),

View File

@ -38,6 +38,11 @@ public interface WALObserver {
*/
public void logRollRequested();
/**
* The WAL is about to close.
*/
public void logCloseRequested();
/**
* Called before each write.
* @param info

View File

@ -162,4 +162,9 @@ public class Replication implements WALObserver {
public void logRollRequested() {
// Not interested
}
@Override
public void logCloseRequested() {
// not interested
}
}

View File

@ -25,6 +25,10 @@ import org.apache.hadoop.hbase.util.Bytes;
public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter {
public InstrumentedSequenceFileLogWriter() {
super(HLogKey.class);
}
public static boolean activateFailure = false;
@Override
public void append(HLog.Entry entry) throws IOException {

View File

@ -160,9 +160,11 @@ public class TestHLog {
log.rollWriter();
}
log.close();
Path splitsdir = new Path(dir, "splits");
Path splitsdir = new Path(this.dir, "splits");
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
List<Path> splits =
HLog.splitLog(splitsdir, logdir, oldLogDir, fs, conf);
logSplitter.splitLog(splitsdir, logdir,
this.oldLogDir, this.fs, conf);
verifySplits(splits, howmany);
log = null;
} finally {
@ -406,7 +408,7 @@ public class TestHLog {
// Make sure you can read all the content
SequenceFile.Reader reader
= new SequenceFile.Reader(fs, walPath, conf);
= new SequenceFile.Reader(this.fs, walPath, this.conf);
int count = 0;
HLogKey key = HLog.newKey(conf);
WALEdit val = new WALEdit();
@ -606,5 +608,10 @@ public class TestHLog {
// TODO Auto-generated method stub
}
@Override
public void logCloseRequested() {
// not interested
}
}
}

View File

@ -47,14 +47,16 @@ public class TestHLogMethods {
fs.delete(regiondir, true);
fs.mkdirs(regiondir);
Path recoverededits = HLog.getRegionDirRecoveredEditsDir(regiondir);
String first = HLog.formatRecoveredEditsFileName(-1);
String first = HLogSplitter.formatRecoveredEditsFileName(-1);
createFile(fs, recoverededits, first);
createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(0));
createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(1));
createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(11));
createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(2));
createFile(fs, recoverededits, HLog.formatRecoveredEditsFileName(50));
String last = HLog.formatRecoveredEditsFileName(Long.MAX_VALUE);
createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(0));
createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(1));
createFile(fs, recoverededits, HLogSplitter
.formatRecoveredEditsFileName(11));
createFile(fs, recoverededits, HLogSplitter.formatRecoveredEditsFileName(2));
createFile(fs, recoverededits, HLogSplitter
.formatRecoveredEditsFileName(50));
String last = HLogSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE);
createFile(fs, recoverededits, last);
createFile(fs, recoverededits,
Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
@ -63,13 +65,17 @@ public class TestHLogMethods {
assertEquals(files.pollFirst().getName(), first);
assertEquals(files.pollLast().getName(), last);
assertEquals(files.pollFirst().getName(),
HLog.formatRecoveredEditsFileName(0));
HLogSplitter
.formatRecoveredEditsFileName(0));
assertEquals(files.pollFirst().getName(),
HLog.formatRecoveredEditsFileName(1));
HLogSplitter
.formatRecoveredEditsFileName(1));
assertEquals(files.pollFirst().getName(),
HLog.formatRecoveredEditsFileName(2));
HLogSplitter
.formatRecoveredEditsFileName(2));
assertEquals(files.pollFirst().getName(),
HLog.formatRecoveredEditsFileName(11));
HLogSplitter
.formatRecoveredEditsFileName(11));
}
private void createFile(final FileSystem fs, final Path testdir,

View File

@ -141,7 +141,7 @@ public class TestHLogSplit {
HLog.Entry entry =
new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now),
new WALEdit());
Path p = HLog.getRegionSplitEditsPath(fs, entry, new Path("/"));
Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, new Path("/"));
String parentOfParent = p.getParent().getParent().getName();
assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
}
@ -154,7 +154,8 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
try {
(new ZombieNewLogWriterRegionServer(stop)).start();
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} finally {
stop.set(true);
}
@ -169,7 +170,8 @@ public class TestHLogSplit {
generateHLogs(1, 10, -1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@ -189,7 +191,8 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
@ -210,7 +213,8 @@ public class TestHLogSplit {
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@ -225,7 +229,8 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@ -243,8 +248,9 @@ public class TestHLogSplit {
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
@ -260,8 +266,9 @@ public class TestHLogSplit {
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
@ -278,7 +285,8 @@ public class TestHLogSplit {
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@ -306,7 +314,8 @@ public class TestHLogSplit {
corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
@ -331,7 +340,8 @@ public class TestHLogSplit {
corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
@ -354,7 +364,8 @@ public class TestHLogSplit {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
@ -372,7 +383,8 @@ public class TestHLogSplit {
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
}
// TODO: fix this test (HBASE-2935)
@ -384,7 +396,8 @@ public class TestHLogSplit {
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
try {
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException e) {/* expected */}
assertEquals("if skip.errors is false all files should remain in place",
@ -396,9 +409,8 @@ public class TestHLogSplit {
public void testSplit() throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
@ -412,7 +424,8 @@ public class TestHLogSplit {
throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus [] statuses = null;
try {
statuses = fs.listStatus(hlogDir);
@ -463,7 +476,8 @@ public class TestHLogSplit {
try {
zombie.start();
try {
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException ex) {/* expected */}
int logFilesNumber = fs.listStatus(hlogDir).length;
@ -491,7 +505,8 @@ public class TestHLogSplit {
try {
InstrumentedSequenceFileLogWriter.activateFailure = true;
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException e) {
assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage());
@ -513,7 +528,8 @@ public class TestHLogSplit {
generateHLogs(1, 100, -1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
fs.rename(oldLogDir, hlogDir);
Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
@ -522,7 +538,7 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
}
@ -780,14 +796,14 @@ public class TestHLogSplit {
FileStatus[] f1 = fs.listStatus(p1);
FileStatus[] f2 = fs.listStatus(p2);
for (int i=0; i<f1.length; i++) {
for (int i = 0; i < f1.length; i++) {
// Regions now have a directory named RECOVERED_EDITS_DIR and in here
// are split edit files. In below presume only 1.
Path rd1 = HLog.getRegionDirRecoveredEditsDir(f1[i].getPath());
FileStatus [] rd1fs = fs.listStatus(rd1);
FileStatus[] rd1fs = fs.listStatus(rd1);
assertEquals(1, rd1fs.length);
Path rd2 = HLog.getRegionDirRecoveredEditsDir(f2[i].getPath());
FileStatus [] rd2fs = fs.listStatus(rd2);
FileStatus[] rd2fs = fs.listStatus(rd2);
assertEquals(1, rd2fs.length);
if (!logsAreEqual(rd1fs[0].getPath(), rd2fs[0].getPath())) {
return -1;

View File

@ -105,8 +105,13 @@ public class TestWALObserver {
hlog.rollWriter();
}
}
hlog.close();
hlog.closeAndDelete();
assertEquals(11, observer.logRollCounter);
assertEquals(5, laterobserver.logRollCounter);
assertEquals(2, observer.closedCount);
}
/**
@ -114,6 +119,7 @@ public class TestWALObserver {
*/
static class DummyWALObserver implements WALObserver {
public int logRollCounter = 0;
public int closedCount = 0;
@Override
public void logRolled(Path newFile) {
@ -131,5 +137,10 @@ public class TestWALObserver {
// Not interested
}
@Override
public void logCloseRequested() {
closedCount++;
}
}
}

View File

@ -465,7 +465,8 @@ public class TestWALReplay {
*/
private Path runWALSplit(final Configuration c) throws IOException {
FileSystem fs = FileSystem.get(c);
List<Path> splits = HLog.splitLog(this.hbaseRootDir, this.logDir,
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c);
List<Path> splits = logSplitter.splitLog(this.hbaseRootDir, this.logDir,
this.oldLogDir, fs, c);
// Split should generate only 1 file since there's only 1 region
assertEquals(1, splits.size());