HBASE-2437 Refactor HLog splitLog

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@950174 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-06-01 18:01:53 +00:00
parent edf52b76df
commit dacf350835
7 changed files with 1185 additions and 372 deletions

View File

@ -654,6 +654,7 @@ Release 0.21.0 - Unreleased
HBASE-2630 HFile should use toStringBinary in various places
HBASE-2632 Shell should autodetect terminal width
HBASE-2636 Upgrade Jetty to 6.1.24
HBASE-2437 Refactor HLog splitLog (Cosmin Lehene via Stack)
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -448,6 +448,7 @@
<slf4j.version>1.5.8</slf4j.version>
<stax-api>1.0.1</stax-api>
<thrift.version>0.2.0</thrift.version>
<guava.version>r03</guava.version>
</properties>
<dependencyManagement>
@ -697,10 +698,15 @@
<version>${commons-math.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-test</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
<!--

View File

@ -30,6 +30,8 @@ import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
@ -37,9 +39,12 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -73,6 +78,9 @@ import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import com.google.common.util.concurrent.NamingThreadFactory;
import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
@ -131,8 +139,9 @@ public class HLog implements HConstants, Syncable {
private final List<LogActionsListener> actionListeners =
Collections.synchronizedList(new ArrayList<LogActionsListener>());
private static Class logWriterClass;
private static Class logReaderClass;
private static Class<? extends Writer> logWriterClass;
private static Class<? extends Reader> logReaderClass;
private OutputStream hdfs_out; // OutputStream associated with the current SequenceFile.writer
private int initialReplication; // initial replication factor of SequenceFile.writer
@ -484,23 +493,23 @@ public class HLog implements HConstants, Syncable {
* @return A WAL reader. Close when done with it.
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static Reader getReader(final FileSystem fs,
final Path path, Configuration conf)
throws IOException {
try {
if (logReaderClass == null) {
logReaderClass = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class.getCanonicalName()));
logReaderClass =conf.getClass("hbase.regionserver.hlog.reader.impl",
SequenceFileLogReader.class, Reader.class);
}
HLog.Reader reader = (HLog.Reader) logReaderClass.newInstance();
HLog.Reader reader = logReaderClass.newInstance();
reader.init(fs, path, conf);
return reader;
} catch (Exception e) {
IOException ie = new IOException("cannot get log reader");
ie.initCause(e);
throw ie;
} catch (IOException e) {
throw e;
}
catch (Exception e) {
throw new IOException("Cannot get log reader", e);
}
}
@ -511,14 +520,13 @@ public class HLog implements HConstants, Syncable {
* @return A WAL writer. Close when done with it.
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static Writer createWriter(final FileSystem fs,
final Path path, Configuration conf)
throws IOException {
try {
if (logWriterClass == null) {
logWriterClass = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl",
SequenceFileLogWriter.class.getCanonicalName()));
logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl",
SequenceFileLogWriter.class, Writer.class);
}
HLog.Writer writer = (HLog.Writer) logWriterClass.newInstance();
writer.init(fs, path, conf);
@ -1145,10 +1153,11 @@ public class HLog implements HConstants, Syncable {
* @param rootDir qualified root directory of the HBase instance
* @param srcDir Directory of log files to split: e.g.
* <code>${ROOTDIR}/log_HOST_PORT</code>
* @param oldLogDir
* @param oldLogDir directory where processed (split) logs will be archived to
* @param fs FileSystem
* @param conf Configuration
* @throws IOException
* @throws IOException will throw if corrupted hlogs aren't tolerated
* @return the list of splits
*/
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileSystem fs, final Configuration conf)
@ -1167,18 +1176,10 @@ public class HLog implements HConstants, Syncable {
}
LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
srcDir.toString());
splits = splitLog(rootDir, oldLogDir, logfiles, fs, conf);
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
try {
FileStatus[] files = fs.listStatus(srcDir);
for(FileStatus file : files) {
Path newPath = getHLogArchivePath(oldLogDir, file.getPath());
LOG.debug("Moving " + FSUtils.getPath(file.getPath()) + " to " +
FSUtils.getPath(newPath));
fs.rename(file.getPath(), newPath);
}
LOG.debug("Moved " + files.length + " log files to " +
FSUtils.getPath(oldLogDir));
fs.delete(srcDir, true);
LOG.info("Spliting is done. Removing old log dir "+srcDir);
fs.delete(srcDir, false);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
IOException io = new IOException("Cannot delete: " + srcDir);
@ -1218,313 +1219,99 @@ public class HLog implements HConstants, Syncable {
}
}
/*
* @param rootDir
* @param logfiles
/**
* Sorts the HLog edits in the given list of logfiles (that are a mix of edits on multiple regions)
* by region and then splits them per region directories, in batches of (hbase.hlog.split.batch.size)
*
* A batch consists of a set of log files that will be sorted in a single map of edits indexed by region
* the resulting map will be concurrently written by multiple threads to their corresponding regions
*
* Each batch consists of more more log files that are
* - recovered (files is opened for append then closed to ensure no process is writing into it)
* - parsed (each edit in the log is appended to a list of edits indexed by region
* see {@link #parseHLog} for more details)
* - marked as either processed or corrupt depending on parsing outcome
* - the resulting edits indexed by region are concurrently written to their corresponding region
* region directories
* - original files are then archived to a different directory
*
*
*
* @param rootDir hbase directory
* @param srcDir logs directory
* @param oldLogDir directory where processed logs are archived to
* @param logfiles the list of log files to split
* @param fs
* @param conf
* @return
* @throws IOException
* @return List of splits made.
*/
private static List<Path> splitLog(final Path rootDir,
private static List<Path> splitLog(final Path rootDir, final Path srcDir,
Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
final Configuration conf)
throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
final Map<byte [], WriterAndPath> logWriters =
Collections.synchronizedMap(
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR));
List<Path> splits = null;
// Number of threads to use when log splitting to rewrite the logs.
// More means faster but bigger mem consumption.
int logWriterThreads =
conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
// Number of logs to read into memory before writing to their appropriate
// regions when log splitting. More means faster but bigger mem consumption
// Number of logs in a read batch
// More means faster but bigger mem consumption
//TODO make a note on the conf rename and update hbase-site.xml if needed
int logFilesPerStep =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
conf.getInt("hbase.hlog.split.batch.size", 3);
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
// append support = we can avoid data loss (yay)
// we open for append, then close to recover the correct file length
final boolean appendSupport = isAppend(conf);
// store corrupt logs for post-mortem analysis (empty string = discard)
final String corruptDir =
conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt");
List<Path> finishedFiles = new LinkedList<Path>();
List<Path> corruptFiles = new LinkedList<Path>();
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
logFilesPerStep)).intValue();
for (int step = 0; step < maxSteps; step++) {
// Step 1: read N log files into memory
final Map<byte[], LinkedList<HLog.Entry>> logEntries =
new TreeMap<byte[], LinkedList<HLog.Entry>>(Bytes.BYTES_COMPARATOR);
int endIndex = step == maxSteps - 1? logfiles.length:
step * logFilesPerStep + logFilesPerStep;
for (int i = (step * logFilesPerStep); i < endIndex; i++) {
Path curLogFile = logfiles[i].getPath();
// make sure we get the right file length before opening for read
recoverLog(fs, curLogFile, appendSupport);
long length = fs.getFileStatus(curLogFile).getLen();
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + curLogFile + ", length=" + length);
int i = -1;
while (i < logfiles.length) {
final Map<byte[], LinkedList<Entry>> editsByRegion =
new TreeMap<byte[], LinkedList<Entry>>(Bytes.BYTES_COMPARATOR);
for (int j = 0; j < logFilesPerStep; j++) {
i++;
if (i == logfiles.length) {
break;
}
Reader in = null;
boolean cleanRead = false;
int count = 0;
FileStatus log = logfiles[i];
Path logPath = log.getPath();
long logLength = log.getLen();
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logPath + ", length=" + logLength );
try {
in = HLog.getReader(fs, curLogFile, conf);
try {
Entry entry;
while ((entry = in.next()) != null) {
byte [] regionName = entry.getKey().getRegionName();
LinkedList<HLog.Entry> queue = logEntries.get(regionName);
if (queue == null) {
queue = new LinkedList<HLog.Entry>();
LOG.debug("Adding queue for " + Bytes.toStringBinary(regionName));
logEntries.put(regionName, queue);
}
queue.push(entry);
count++;
}
LOG.debug("Pushed=" + count + " entries from " + curLogFile);
cleanRead = true;
} catch (IOException e) {
LOG.debug("IOE Pushed=" + count + " entries from " + curLogFile);
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
String msg = "Exception processing " + curLogFile +
" -- continuing. Possible DATA LOSS!";
if (corruptDir.length() > 0) {
msg += " Storing in hlog corruption directory.";
}
LOG.warn(msg, e);
}
}
} catch (IOException e) {
if (length <= 0) {
LOG.warn("Empty hlog, continuing: " + logfiles[i]);
cleanRead = true;
continue;
}
throw e;
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
LOG.warn("File.close() threw exception -- continuing, "
+ "but marking file as corrupt.", e);
cleanRead = false;
}
if (cleanRead) {
finishedFiles.add(curLogFile);
} else {
corruptFiles.add(curLogFile);
}
/* TODO FOR J-D REVIEW
// Archive the input file now so we do not replay edits. We could
// have gotten here because of an exception. If so, probably
// nothing we can do about it. Replaying it, it could work but we
// could be stuck replaying for ever. Just continue though we
// could have lost some edits.
fs.rename(logfiles[i].getPath(),
getHLogArchivePath(oldLogDir, logfiles[i].getPath()));
*/
recoverFileLease(fs, logPath, conf);
parseHLog(log, editsByRegion, fs, conf);
processedLogs.add(logPath);
} catch (IOException e) {
if (skipErrors) {
LOG.warn("Got while parsing hlog " + logPath +
". Marking as corrupted", e);
corruptedLogs.add(logPath);
} else {
throw e;
}
}
}
// Step 2: Some regionserver log files have been read into memory.
// Assign them to the appropriate region directory.
class ThreadWithException extends Thread {
ThreadWithException(String name) { super(name); }
public IOException exception = null;
}
List<ThreadWithException> threadList =
new ArrayList<ThreadWithException>(logEntries.size());
ExecutorService threadPool =
Executors.newFixedThreadPool(logWriterThreads);
for (final byte [] region: logEntries.keySet()) {
ThreadWithException thread =
new ThreadWithException(Bytes.toStringBinary(region)) {
@Override
public void run() {
LinkedList<HLog.Entry> entries = logEntries.get(region);
LOG.debug("Thread got " + entries.size() + " to process");
if(entries.size() <= 0) {
LOG.warn("Got a region with no entries to process.");
return;
}
long threadTime = System.currentTimeMillis();
try {
int count = 0;
// get the logfile associated with this region. 2 logs often
// write to the same region, so persist this info across logs
WriterAndPath wap = logWriters.get(region);
if (wap == null) {
// first write to this region, make new logfile
assert entries.size() > 0;
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
.getTableDir(rootDir,
entries.getFirst().getKey().getTablename()),
HRegionInfo.encodeRegionName(region)),
HREGION_OLDLOGFILE_NAME);
// If splitLog() was running when the user restarted his
// cluster, then we could already have a 'logfile'.
// Since we don't delete logs until everything is written to
// their respective regions, we can safely remove this tmp.
if (fs.exists(logfile)) {
LOG.warn("Deleting old hlog file: " + logfile);
// TODO: Archive?
fs.delete(logfile, true);
}
// associate an OutputStream with this logfile
Writer w = createWriter(fs, logfile, conf);
wap = new WriterAndPath(logfile, w);
logWriters.put(region, wap);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new hlog file writer for path "
+ logfile + " and region " + Bytes.toStringBinary(region));
}
}
// Items were added to the linkedlist oldest first. Pull them
// out in that order.
for (ListIterator<HLog.Entry> i = entries.listIterator(entries.size());
i.hasPrevious();) {
wap.w.append(i.previous());
count++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + count + " total edits to "
+ Bytes.toStringBinary(region) + " in "
+ (System.currentTimeMillis() - threadTime) + "ms");
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Got while writing region "
+ Bytes.toStringBinary(region) + " log " + e);
e.printStackTrace();
exception = e;
}
}
};
threadList.add(thread);
threadPool.execute(thread);
}
threadPool.shutdown();
// Wait for all threads to terminate
try {
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
}
} catch (InterruptedException ex) {
LOG.warn("Hlog writers were interrupted during splitLog(). "
+"Retaining log files to avoid data loss.");
throw new IOException(ex.getMessage(), ex.getCause());
}
// throw an exception if one of the threads reported one
for (ThreadWithException t : threadList) {
if (t.exception != null) {
throw t.exception;
}
}
// End of for loop. Rinse and repeat
writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
}
if (fs.listStatus(srcDir).length > processedLogs.size() + corruptedLogs.size()) {
throw new IOException("Discovered orphan hlog after split. Maybe " +
"HRegionServer was not dead when we started");
}
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
} finally {
splits = new ArrayList<Path>(logWriters.size());
for (WriterAndPath wap : logWriters.values()) {
wap.w.close();
LOG.debug("Closed " + wap.p);
splits.add(wap.p);
LOG.debug("Closed " + wap.p);
}
}
// Step 3: All writes succeeded! Get rid of the now-unnecessary logs
for(Path p : finishedFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully split Hlog file. Deleting " + p);
}
try {
if (!fs.delete(p, true) && LOG.isDebugEnabled()) {
LOG.debug("Delete of split Hlog (" + p + ") failed.");
}
} catch (IOException e) {
// don't throw error here. worst case = double-read
LOG.warn("Error deleting successfully split Hlog (" + p + ") -- " + e);
}
}
for (Path p : corruptFiles) {
if (corruptDir.length() > 0) {
// store any corrupt logs for later analysis
Path cp = new Path(conf.get(HBASE_DIR), corruptDir);
if(!fs.exists(cp)) {
fs.mkdirs(cp);
}
Path newp = new Path(cp, p.getName());
if (!fs.exists(newp)) {
if (!fs.rename(p, newp)) {
LOG.warn("Rename of " + p + " to " + newp + " failed.");
} else {
LOG.warn("Corrupt Hlog (" + p + ") moved to " + newp);
}
} else {
LOG.warn("Corrupt Hlog (" + p + ") already moved to " + newp +
". Ignoring");
}
} else {
// data loss is less important than disk space, delete
try {
if (!fs.delete(p, true) ) {
if (LOG.isDebugEnabled()) {
LOG.debug("Delete of split Hlog " + p + " failed.");
}
} else {
LOG.warn("Corrupt Hlog (" + p + ") deleted!");
}
} catch (IOException e) {
LOG.warn("Error deleting corrupt Hlog (" + p + ") -- " + e);
}
}
}
return splits;
}
/*
* @param conf
* @return True if append enabled and we have the syncFs in our path.
*/
static boolean isAppend(final Configuration conf) {
boolean append = conf.getBoolean("dfs.support.append", false);
if (append) {
try {
// TODO: The implementation that comes back when we do a createWriter
// may not be using SequenceFile so the below is not a definitive test.
// Will do for now (hdfs-200).
SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
append = true;
} catch (SecurityException e) {
} catch (NoSuchMethodException e) {
append = false;
}
}
return append;
}
/**
* Utility class that lets us keep track of the edit with it's key
@ -1582,64 +1369,6 @@ public class HLog implements HConstants, Syncable {
}
}
/*
* Recover log.
* Try and open log in append mode.
* Doing this, we get a hold of the file that crashed writer
* was writing to. Once we have it, close it. This will
* allow subsequent reader to see up to last sync.
* @param fs
* @param p
* @param append
*/
public static void recoverLog(final FileSystem fs, final Path p,
final boolean append) throws IOException {
if (!append) {
return;
}
// lease recovery not needed for local file system case.
// currently, local file system doesn't implement append either.
if (!(fs instanceof DistributedFileSystem)) {
return;
}
LOG.debug("Recovering DFS lease for path " + p);
long startWaiting = System.currentTimeMillis();
// Trying recovery
boolean recovered = false;
while (!recovered) {
try {
FSDataOutputStream out = fs.append(p);
out.close();
recovered = true;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (e instanceof AlreadyBeingCreatedException) {
// We expect that we'll get this message while the lease is still
// within its soft limit, but if we get it past that, it means
// that the RS is holding onto the file even though it lost its
// znode. We could potentially abort after some time here.
long waitedFor = System.currentTimeMillis() - startWaiting;
if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p
+ ":" + e.getMessage());
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore it and try again
}
} else {
throw new IOException("Failed to open " + p + " for append", e);
}
}
}
LOG.info("Past out lease recovery");
}
/**
* Construct the HLog directory name
*
@ -1687,28 +1416,247 @@ public class HLog implements HConstants, Syncable {
return new Path(oldLogDir, p.getName());
}
private static void usage() {
System.err.println("Usage: java org.apache.hbase.HLog" +
" {--dump <logfile>... | --split <logdir>...}");
/**
* Takes splitLogsMap and concurrently writes them to region directories using a thread pool
*
* @param splitLogsMap map that contains the log splitting result indexed by region
* @param logWriters map that contains a writer per region
* @param rootDir hbase root dir
* @param fs
* @param conf
* @throws IOException
*/
private static void writeEditsBatchToRegions(
final Map<byte[], LinkedList<Entry>> splitLogsMap,
final Map<byte[], WriterAndPath> logWriters,
final Path rootDir, final FileSystem fs, final Configuration conf)
throws IOException {
// Number of threads to use when log splitting to rewrite the logs.
// More means faster but bigger mem consumption.
int logWriterThreads =
conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
NamingThreadFactory f = new NamingThreadFactory(
"SplitWriter-%1$d", Executors.defaultThreadFactory());
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, f);
for (final byte[] region : splitLogsMap.keySet()) {
Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap, region, fs, conf);
writeFutureResult.put(region, threadPool.submit(splitter));
}
threadPool.shutdown();
// Wait for all threads to terminate
try {
for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
String message = "Waiting for hlog writers to terminate, elapsed " + j * 5 + " seconds";
if (j < 30) {
LOG.debug(message);
} else {
LOG.info(message);
}
}
} catch(InterruptedException ex) {
LOG.warn("Hlog writers were interrupted, possible data loss!");
if (!skipErrors) {
throw new IOException("Could not finish writing log entries", ex);
//TODO maybe we should fail here regardless if skipErrors is active or not
}
}
for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
try {
entry.getValue().get();
} catch (ExecutionException e) {
throw (new IOException(e.getCause()));
} catch (InterruptedException e1) {
LOG.warn("Writer for region " + Bytes.toString(entry.getKey()) +
" was interrupted, however the write process should have " +
"finished. Throwing up ", e1);
throw (new IOException(e1.getCause()));
}
}
}
/*
* Parse a single hlog and put the edits in @splitLogsMap
*
* @param logfile to split
* @param splitLogsMap output parameter: a map with region names as keys and a
* list of edits as values
* @param fs the filesystem
* @param conf the configuration
* @throws IOException if hlog is corrupted, or can't be open
*/
private static void parseHLog(final FileStatus logfile,
final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
final Configuration conf)
throws IOException {
// Check for possibly empty file. With appends, currently Hadoop reports a
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed.
long length = logfile.getLen();
if (length <= 0) {
LOG.warn("File " + logfile.getPath() + " might be still open, length is 0");
}
Path path = logfile.getPath();
Reader in;
int editsCount = 0;
try {
in = HLog.getReader(fs, path, conf);
} catch (EOFException e) {
if (length <= 0) {
//TODO should we ignore an empty, not-last log file if skip.errors is false?
//Either way, the caller should decide what to do. E.g. ignore if this is the last
//log in sequence.
//TODO is this scenario still possible if the log has been recovered (i.e. closed)
LOG.warn("Could not open " + path + " for reading. File is empty" + e);
return;
} else {
throw e;
}
}
try {
Entry entry;
while ((entry = in.next()) != null) {
byte[] region = entry.getKey().getRegionName();
LinkedList<Entry> queue = splitLogsMap.get(region);
if (queue == null) {
queue = new LinkedList<Entry>();
splitLogsMap.put(region, queue);
}
queue.addFirst(entry);
editsCount++;
}
LOG.debug("Pushed=" + editsCount + " entries from " + path);
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
LOG.warn("Close log reader in finally threw exception -- continuing", e);
}
}
}
private static Callable<Void> createNewSplitter(final Path rootDir,
final Map<byte[], WriterAndPath> logWriters,
final Map<byte[], LinkedList<Entry>> logEntries,
final byte[] region, final FileSystem fs, final Configuration conf) {
return new Callable<Void>() {
public String getName() {
return "Split writer thread for region " + Bytes.toStringBinary(region);
}
@Override
public Void call() throws IOException {
LinkedList<Entry> entries = logEntries.get(region);
LOG.debug(this.getName()+" got " + entries.size() + " to process");
long threadTime = System.currentTimeMillis();
try {
int editsCount = 0;
WriterAndPath wap = logWriters.get(region);
for (ListIterator<Entry> iterator = entries.listIterator();
iterator.hasNext();) {
Entry logEntry = iterator.next();
if (wap == null) {
Path logFile = getRegionLogPath(logEntry, rootDir);
if (fs.exists(logFile)) {
LOG.warn("Found existing old hlog file. It could be the result of a previous" +
"failed split attempt. Deleting " + logFile +
", length=" + fs.getFileStatus(logFile).getLen());
fs.delete(logFile, false);
}
Writer w = createWriter(fs, logFile, conf);
wap = new WriterAndPath(logFile, w);
logWriters.put(region, wap);
LOG.debug("Creating writer path=" + logFile +
" region=" + Bytes.toStringBinary(region));
}
wap.w.append(logEntry);
editsCount++;
}
LOG.debug(this.getName() + " Applied " + editsCount +
" total edits to " + Bytes.toStringBinary(region) +
" in " + (System.currentTimeMillis() - threadTime) + "ms");
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.fatal(this.getName() + " Got while writing log entry to log", e);
throw e;
}
return null;
}
};
}
/**
* Moves processed logs to a oldLogDir after successful processing
* Moves corrupted logs (any log that couldn't be successfully parsed
* to corruptDir (.corrupt) for later investigation
*
* @param list
* @param corruptedLogs
* @param processedLogs
* @param oldLogDir
* @param fs
* @param conf
* @throws IOException
*/
private static void archiveLogs(final List<Path> corruptedLogs,
final List<Path> processedLogs, final Path oldLogDir,
final FileSystem fs, final Configuration conf)
throws IOException{
final Path corruptDir = new Path(conf.get(HBASE_DIR),
conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
fs.mkdirs(corruptDir);
fs.mkdirs(oldLogDir);
for (Path corrupted: corruptedLogs) {
Path p = new Path(corruptDir, corrupted.getName());
LOG.info("Moving corrupted log " + corrupted + " to " + p);
fs.rename(corrupted, p);
}
for (Path p: processedLogs) {
Path newPath = getHLogArchivePath(oldLogDir, p);
fs.rename(p, newPath);
LOG.info("Archived processed log " + p + " to " + newPath);
}
}
private static Path getRegionLogPath(Entry logEntry, Path rootDir) {
Path tableDir =
HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
Path regionDir =
HRegion.getRegionDir(tableDir, HRegionInfo.encodeRegionName(logEntry.getKey().getRegionName()));
return new Path(regionDir, HREGION_OLDLOGFILE_NAME);
}
public void addLogActionsListerner(LogActionsListener list) {
LOG.info("Adding a listener");
this.actionListeners.add(list);
}
/**
*
* @param list
*/
public boolean removeLogActionsListener(LogActionsListener list) {
return this.actionListeners.remove(list);
}
private static void usage() {
System.err.println("Usage: java org.apache.hbase.HLog" +
" {--dump <logfile>... | --split <logdir>...}");
}
/**
* Pass one or more log file names and it will either dump out a text version
* on <code>stdout</code> or split the specified log files.

View File

@ -34,7 +34,9 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.SequenceFile;
import java.io.DataInputStream;
import java.io.IOException;
@ -557,4 +559,88 @@ public class FSUtils {
return isdir;
}
}
/**
* Heuristic to determine whether is safe or not to open a file for append
* Looks both for dfs.support.append and use reflection to search
* for SequenceFile.Writer.syncFs() or FSDataOutputStream.hflush()
* @param conf
* @return True if append support
*/
public static boolean isAppendSupported(final Configuration conf) {
boolean append = conf.getBoolean("dfs.support.append", false);
if (append) {
try {
// TODO: The implementation that comes back when we do a createWriter
// may not be using SequenceFile so the below is not a definitive test.
// Will do for now (hdfs-200).
SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
append = true;
} catch (SecurityException e) {
} catch (NoSuchMethodException e) {
append = false;
}
} else {
try {
FSDataOutputStream.class.getMethod("hflush", new Class<?> []{});
} catch (NoSuchMethodException e) {
append = false;
}
}
return append;
}
/*
* Recover file lease. Used when a file might be suspect to be had been left open by another process. <code>p</code>
* @param fs
* @param p
* @param append True if append supported
* @throws IOException
*/
public static void recoverFileLease(final FileSystem fs, final Path p, Configuration conf)
throws IOException{
if (!isAppendSupported(conf)) {
return;
}
// lease recovery not needed for local file system case.
// currently, local file system doesn't implement append either.
if (!(fs instanceof DistributedFileSystem)) {
return;
}
LOG.info("Recovering file" + p);
long startWaiting = System.currentTimeMillis();
// Trying recovery
boolean recovered = false;
while (!recovered) {
try {
FSDataOutputStream out = fs.append(p);
out.close();
recovered = true;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (e instanceof AlreadyBeingCreatedException) {
// We expect that we'll get this message while the lease is still
// within its soft limit, but if we get it past that, it means
// that the RS is holding onto the file even though it lost its
// znode. We could potentially abort after some time here.
long waitedFor = System.currentTimeMillis() - startWaiting;
if (waitedFor > FSConstants.LEASE_SOFTLIMIT_PERIOD) {
LOG.warn("Waited " + waitedFor + "ms for lease recovery on " + p +
":" + e.getMessage());
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore it and try again
}
} else {
throw new IOException("Failed to open " + p + " for append", e);
}
}
}
LOG.info("Finished lease recover attempt for " + p);
}
}

View File

@ -0,0 +1,37 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.hadoop.hbase.util.Bytes;
public class InstrumentedSequenceFileLogWriter extends SequenceFileLogWriter {
public static boolean activateFailure = false;
@Override
public void append(HLog.Entry entry) throws IOException {
super.append(entry);
if (activateFailure && Bytes.equals(entry.getKey().getRegionName(), "break".getBytes())) {
System.out.println(getClass().getName() + ": I will throw an exception now...");
throw(new IOException("This exception is instrumented and should only be thrown for testing"));
}
}
}

View File

@ -0,0 +1,735 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.naming.InsufficientResourcesException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Testing {@link HLog} splitting code.
*/
public class TestHLogSplit {
private Configuration conf;
private FileSystem fs;
private final static HBaseTestingUtility
TEST_UTIL = new HBaseTestingUtility();
private static final Path hbaseDir = new Path("/hbase");
private static final Path hlogDir = new Path(hbaseDir, "hlog");
private static final Path oldLogDir = new Path(hbaseDir, "hlog.old");
private static final Path corruptDir = new Path(hbaseDir, ".corrupt");
private static final int NUM_WRITERS = 10;
private static final int ENTRIES = 10; // entries per writer per region
private HLog.Writer[] writer = new HLog.Writer[NUM_WRITERS];
private long seq = 0;
private static final byte[] TABLE_NAME = "t1".getBytes();
private static final byte[] FAMILY = "f1".getBytes();
private static final byte[] QUALIFIER = "q1".getBytes();
private static final byte[] VALUE = "v1".getBytes();
private static final String HLOG_FILE_PREFIX = "hlog.dat.";
private static List<String> regions;
private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
static enum Corruptions {
INSERT_GARBAGE_ON_FIRST_LINE,
INSERT_GARBAGE_IN_THE_MIDDLE,
APPEND_GARBAGE,
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().
setInt("hbase.regionserver.flushlogentries", 1);
TEST_UTIL.getConfiguration().
setBoolean("dfs.support.append", true);
TEST_UTIL.getConfiguration().
setStrings("hbase.rootdir", hbaseDir.toString());
TEST_UTIL.getConfiguration().
setClass("hbase.regionserver.hlog.writer.impl",
InstrumentedSequenceFileLogWriter.class, HLog.Writer.class);
TEST_UTIL.startMiniDFSCluster(2);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniDFSCluster();
}
@Before
public void setUp() throws Exception {
conf = TEST_UTIL.getConfiguration();
fs = TEST_UTIL.getDFSCluster().getFileSystem();
FileStatus[] entries = fs.listStatus(new Path("/"));
for (FileStatus dir : entries){
fs.delete(dir.getPath(), true);
}
seq = 0;
regions = new ArrayList<String>();
Collections.addAll(regions, "bbb", "ccc");
InstrumentedSequenceFileLogWriter.activateFailure = false;
// Set the soft lease for hdfs to be down from default of 5 minutes or so.
// TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another.
// Not available in 0.20 hdfs
// TEST_UTIL.getDFSCluster().getNamesystem().leaseManager.
// setLeasePeriod(100, 50000);
// Use reflection to get at the 0.20 version of above.
MiniDFSCluster dfsCluster = TEST_UTIL.getDFSCluster();
// private NameNode nameNode;
Field field = dfsCluster.getClass().getDeclaredField("nameNode");
field.setAccessible(true);
NameNode nn = (NameNode)field.get(dfsCluster);
nn.namesystem.leaseManager.setLeasePeriod(100, 50000);
}
@After
public void tearDown() throws Exception {
}
@Test(expected = IOException.class)
public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
throws IOException {
AtomicBoolean stop = new AtomicBoolean(false);
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
try {
(new ZombieNewLogWriterRegionServer(stop)).start();
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} finally {
stop.set(true);
}
}
//TODO: check the edits order is respected (scenarios)
@Test
public void testEmptyLogFiles() throws IOException {
injectEmptyFile(".empty", true);
generateHLogs(Integer.MAX_VALUE);
injectEmptyFile("empty", true);
// make fs act as a different client now
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
}
}
@Test
public void testEmptyOpenLogFiles() throws IOException {
injectEmptyFile(".empty", false);
generateHLogs(Integer.MAX_VALUE);
injectEmptyFile("empty", false);
// make fs act as a different client now
// initialize will create a new DFSClient with a new client ID
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
}
}
@Test
public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
// generate logs but leave hlog.dat.5 open.
generateHLogs(5);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
}
}
@Test
public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
generateHLogs(Integer.MAX_VALUE);
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
}
}
@Test
public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
generateHLogs(Integer.MAX_VALUE);
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
}
}
@Test
public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
generateHLogs(Integer.MAX_VALUE);
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
// the entries in the original logs are alternating regions
// considering the sequence file header, the middle corruption should
// affect at least half of the entries
int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
assertTrue("The file up to the corrupted area hasn't been parsed",
goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf));
}
}
@Test
public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, true);
Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
Path c2 = new Path(hlogDir, HLOG_FILE_PREFIX + "5");
Path c3 = new Path(hlogDir, HLOG_FILE_PREFIX + (NUM_WRITERS - 1));
generateHLogs(-1);
corruptHLog(c1, Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
corruptHLog(c2, Corruptions.APPEND_GARBAGE, true, fs);
corruptHLog(c3, Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
assertEquals("expected a different file", c1.getName(), archivedLogs[0].getPath().getName());
assertEquals("expected a different file", c2.getName(), archivedLogs[1].getPath().getName());
assertEquals("expected a different file", c3.getName(), archivedLogs[2].getPath().getName());
assertEquals(archivedLogs.length, 3);
}
@Test
public void testLogsGetArchivedAfterSplit() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
}
@Test(expected = IOException.class)
public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateHLogs(Integer.MAX_VALUE);
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
}
@Test
public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs() throws IOException {
conf.setBoolean(HBASE_SKIP_ERRORS, false);
generateHLogs(-1);
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
Corruptions.APPEND_GARBAGE, true, fs);
fs.initialize(fs.getUri(), conf);
try {
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException e) {/* expected */}
assertEquals("if skip.errors is false all files should remain in place",
NUM_WRITERS, fs.listStatus(hlogDir).length);
}
@Test
public void testSplit() throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
for (String region : regions) {
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
}
}
@Test
public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
throws IOException {
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus [] statuses = null;
try {
statuses = fs.listStatus(hlogDir);
assertNull(statuses);
} catch (FileNotFoundException e) {
// hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
}
}
/* DISABLED for now. TODO: HBASE-2645
@Test
public void testLogCannotBeWrittenOnceParsed() throws IOException {
AtomicLong counter = new AtomicLong(0);
AtomicBoolean stop = new AtomicBoolean(false);
generateHLogs(9);
fs.initialize(fs.getUri(), conf);
Thread zombie = new ZombieLastLogWriterRegionServer(writer[9], counter, stop);
try {
zombie.start();
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, "juliet");
// It's possible that the writer got an error while appending and didn't count it
// however the entry will in fact be written to file and split with the rest
long numberOfEditsInRegion = countHLog(logfile, fs, conf);
assertTrue("The log file could have at most 1 extra log entry, but " +
"can't have less. Zombie could write "+counter.get() +" and logfile had only"+ numberOfEditsInRegion+" " + logfile, counter.get() == numberOfEditsInRegion ||
counter.get() + 1 == numberOfEditsInRegion);
} finally {
stop.set(true);
}
}
*/
@Test
public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
throws IOException {
AtomicBoolean stop = new AtomicBoolean(false);
generateHLogs(-1);
fs.initialize(fs.getUri(), conf);
Thread zombie = new ZombieNewLogWriterRegionServer(stop);
try {
zombie.start();
try {
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException ex) {/* expected */}
int logFilesNumber = fs.listStatus(hlogDir).length;
assertEquals("Log files should not be archived if there's an extra file after split",
NUM_WRITERS + 1, logFilesNumber);
} finally {
stop.set(true);
}
}
@Test(expected = IOException.class)
public void testSplitWillFailIfWritingToRegionFails() throws Exception {
//leave 5th log open so we could append the "trap"
generateHLogs(4);
fs.initialize(fs.getUri(), conf);
InstrumentedSequenceFileLogWriter.activateFailure = false;
appendEntry(writer[4], TABLE_NAME, Bytes.toBytes("break"), ("r" + 999).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
writer[4].close();
try {
InstrumentedSequenceFileLogWriter.activateFailure = true;
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
} catch (IOException e) {
assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage());
throw e;
} finally {
InstrumentedSequenceFileLogWriter.activateFailure = false;
}
}
// @Test
public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
regions.removeAll(regions);
for (int i=0; i<500; i++) {
regions.add("region__"+i);
}
generateHLogs(1, 100, -1);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
fs.rename(oldLogDir, hlogDir);
Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
fs.rename(splitPath,
firstSplitPath);
fs.initialize(fs.getUri(), conf);
HLog.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
FileStatus[] f1 = fs.listStatus(firstSplitPath);
FileStatus[] f2 = fs.listStatus(splitPath);
for (int i=0; i<f1.length; i++) {
HLog.Reader in1, in2;
in1 = HLog.getReader(fs, new Path(f1[i].getPath(), "oldlogfile.log"), conf);
in2 = HLog.getReader(fs, new Path(f2[i].getPath(), "oldlogfile.log"), conf);
HLog.Entry entry1;
HLog.Entry entry2;
while ((entry1 = in1.next()) != null) {
entry2 = in2.next();
assertEquals(0, entry1.getKey().compareTo(entry2.getKey()));
assertEquals(entry1.getEdit().toString(), entry2.getEdit().toString());
}
}
}
/**
* This thread will keep writing to the file after the split process has started
* It simulates a region server that was considered dead but woke up and wrote
* some more to he last log entry
*/
class ZombieLastLogWriterRegionServer extends Thread {
AtomicLong editsCount;
AtomicBoolean stop;
Path log;
HLog.Writer lastLogWriter;
public ZombieLastLogWriterRegionServer(HLog.Writer writer, AtomicLong counter, AtomicBoolean stop) {
this.stop = stop;
this.editsCount = counter;
this.lastLogWriter = writer;
}
@Override
public void run() {
if (stop.get()){
return;
}
flushToConsole("starting");
while (true) {
try {
appendEntry(lastLogWriter, TABLE_NAME, "juliet".getBytes(),
("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
lastLogWriter.sync();
editsCount.incrementAndGet();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
//
}
} catch (IOException ex) {
if (ex instanceof RemoteException) {
flushToConsole("Juliet: got RemoteException " +
ex.getMessage() + " while writing " + (editsCount.get() + 1));
break;
} else {
assertTrue("Failed to write " + editsCount.get(), false);
}
}
}
}
}
/**
* This thread will keep adding new log files
* It simulates a region server that was considered dead but woke up and wrote
* some more to a new hlog
*/
class ZombieNewLogWriterRegionServer extends Thread {
AtomicBoolean stop;
public ZombieNewLogWriterRegionServer(AtomicBoolean stop) {
super("ZombieNewLogWriterRegionServer");
this.stop = stop;
}
@Override
public void run() {
if (stop.get()) {
return;
}
boolean splitStarted = false;
Path p = new Path(hbaseDir, new String(TABLE_NAME));
while (!splitStarted) {
try {
FileStatus [] statuses = fs.listStatus(p);
// In 0.20, listStatus comes back with a null if file doesn't exit.
// In 0.21, it throws FNFE.
if (statuses != null && statuses.length > 0) {
// Done.
break;
}
} catch (FileNotFoundException e) {
// Expected in hadoop 0.21
} catch (IOException e1) {
assertTrue("Failed to list status ", false);
}
flushToConsole("Juliet: split not started, sleeping a bit...");
Threads.sleep(100);
}
Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
try {
HLog.Writer writer = HLog.createWriter(fs,
julietLog, conf);
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
writer.close();
flushToConsole("Juliet file creator: created file " + julietLog);
} catch (IOException e1) {
assertTrue("Failed to create file " + julietLog, false);
}
}
}
private void flushToConsole(String s) {
System.out.println(s);
System.out.flush();
}
private void generateHLogs(int leaveOpen) throws IOException {
generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
}
private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
for (int i = 0; i < writers; i++) {
writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
for (int j = 0; j < entries; j++) {
int prefix = 0;
for (String region : regions) {
String row_key = region + prefix++ + i + j;
appendEntry(writer[i], TABLE_NAME, region.getBytes(),
row_key.getBytes(), FAMILY, QUALIFIER, VALUE, seq);
}
}
if (i != leaveOpen) {
writer[i].close();
flushToConsole("Closing writer " + i);
}
}
}
private Path getLogForRegion(Path rootdir, byte[] table, String region) {
return new Path(HRegion.getRegionDir(HTableDescriptor
.getTableDir(rootdir, table),
HRegionInfo.encodeRegionName(region.getBytes())),
"oldlogfile.log");
}
private void corruptHLog(Path path, Corruptions corruption, boolean close,
FileSystem fs) throws IOException {
FSDataOutputStream out;
int fileSize = (int) fs.listStatus(path)[0].getLen();
FSDataInputStream in = fs.open(path);
byte[] corrupted_bytes = new byte[fileSize];
in.readFully(0, corrupted_bytes, 0, fileSize);
in.close();
switch (corruption) {
case APPEND_GARBAGE:
out = fs.append(path);
out.write("-----".getBytes());
closeOrFlush(close, out);
break;
case INSERT_GARBAGE_ON_FIRST_LINE:
fs.delete(path, false);
out = fs.create(path);
out.write(0);
out.write(corrupted_bytes);
closeOrFlush(close, out);
break;
case INSERT_GARBAGE_IN_THE_MIDDLE:
fs.delete(path, false);
out = fs.create(path);
int middle = (int) Math.floor(corrupted_bytes.length / 2);
out.write(corrupted_bytes, 0, middle);
out.write(0);
out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
closeOrFlush(close, out);
break;
}
}
private void closeOrFlush(boolean close, FSDataOutputStream out)
throws IOException {
if (close) {
out.close();
} else {
out.sync();
// Not in 0out.hflush();
}
}
@SuppressWarnings("unused")
private void dumpHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
HLog.Entry entry;
HLog.Reader in = HLog.getReader(fs, log, conf);
while ((entry = in.next()) != null) {
System.out.println(entry);
}
}
private int countHLog(Path log, FileSystem fs, Configuration conf) throws IOException {
int count = 0;
HLog.Reader in = HLog.getReader(fs, log, conf);
while (in.next() != null) {
count++;
}
return count;
}
public long appendEntry(HLog.Writer writer, byte[] table, byte[] region,
byte[] row, byte[] family, byte[] qualifier,
byte[] value, long seq)
throws IOException {
long time = System.nanoTime();
WALEdit edit = new WALEdit();
seq++;
edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit));
writer.sync();
return seq;
}
private void injectEmptyFile(String suffix, boolean closeFile)
throws IOException {
HLog.Writer writer = HLog.createWriter(
fs, new Path(hlogDir, HLOG_FILE_PREFIX + suffix), conf);
if (closeFile) writer.close();
}
@SuppressWarnings("unused")
private void listLogs(FileSystem fs, Path dir) throws IOException {
for (FileStatus file : fs.listStatus(dir)) {
System.out.println(file.getPath());
}
}
}

View File

@ -31,13 +31,13 @@ import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -220,7 +220,7 @@ public class TestLogRolling extends HBaseClusterTestCase {
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test", HLog.isAppend(conf));
assertTrue("Need append support for this test", FSUtils.isAppendSupported(conf));
// add up the datanode count, to ensure proper replication when we kill 1
dfsCluster.startDataNodes(conf, 1, true, null, null);