HBASE-3323 OOME in master splitting logs
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1051278 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9685e81958
commit
d000821c1e
|
@ -797,6 +797,7 @@ Release 0.90.0 - Unreleased
|
||||||
HBASE-3370 ReplicationSource.openReader fails to locate HLogs when they
|
HBASE-3370 ReplicationSource.openReader fails to locate HLogs when they
|
||||||
aren't split yet
|
aren't split yet
|
||||||
HBASE-3371 Race in TestReplication can make it fail
|
HBASE-3371 Race in TestReplication can make it fail
|
||||||
|
HBASE-3323 OOME in master splitting logs
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -190,12 +190,13 @@ public class MasterFileSystem {
|
||||||
long splitTime = 0, splitLogSize = 0;
|
long splitTime = 0, splitLogSize = 0;
|
||||||
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
|
Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
|
||||||
try {
|
try {
|
||||||
HLogSplitter splitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter splitter = HLogSplitter.createLogSplitter(
|
||||||
|
conf, rootdir, logDir, oldLogDir, this.fs);
|
||||||
try {
|
try {
|
||||||
splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
|
splitter.splitLog();
|
||||||
} catch (OrphanHLogAfterSplitException e) {
|
} catch (OrphanHLogAfterSplitException e) {
|
||||||
LOG.warn("Retrying splitting because of:", e);
|
LOG.warn("Retrying splitting because of:", e);
|
||||||
splitter.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
|
splitter.splitLog();
|
||||||
}
|
}
|
||||||
splitTime = splitter.getTime();
|
splitTime = splitter.getTime();
|
||||||
splitLogSize = splitter.getSize();
|
splitLogSize = splitter.getSize();
|
||||||
|
|
|
@ -1439,8 +1439,9 @@ public class HLog implements Syncable {
|
||||||
throw new IOException(p + " is not a directory");
|
throw new IOException(p + " is not a directory");
|
||||||
}
|
}
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
|
||||||
logSplitter.splitLog(baseDir, p, oldLogDir, fs, conf);
|
conf, baseDir, p, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -160,6 +160,32 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Drop this instance's tablename byte array and instead
|
||||||
|
* hold a reference to the provided tablename. This is not
|
||||||
|
* meant to be a general purpose setter - it's only used
|
||||||
|
* to collapse references to conserve memory.
|
||||||
|
*/
|
||||||
|
void internTableName(byte []tablename) {
|
||||||
|
// We should not use this as a setter - only to swap
|
||||||
|
// in a new reference to the same table name.
|
||||||
|
assert Bytes.equals(tablename, this.tablename);
|
||||||
|
this.tablename = tablename;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Drop this instance's region name byte array and instead
|
||||||
|
* hold a reference to the provided region name. This is not
|
||||||
|
* meant to be a general purpose setter - it's only used
|
||||||
|
* to collapse references to conserve memory.
|
||||||
|
*/
|
||||||
|
void internEncodedRegionName(byte []encodedRegionName) {
|
||||||
|
// We should not use this as a setter - only to swap
|
||||||
|
// in a new reference to the same table name.
|
||||||
|
assert Bytes.equals(this.encodedRegionName, encodedRegionName);
|
||||||
|
this.encodedRegionName = encodedRegionName;
|
||||||
|
}
|
||||||
|
|
||||||
public void write(DataOutput out) throws IOException {
|
public void write(DataOutput out) throws IOException {
|
||||||
Bytes.writeByteArray(out, this.encodedRegionName);
|
Bytes.writeByteArray(out, this.encodedRegionName);
|
||||||
Bytes.writeByteArray(out, this.tablename);
|
Bytes.writeByteArray(out, this.tablename);
|
||||||
|
|
|
@ -23,21 +23,18 @@ import static org.apache.hadoop.hbase.util.FSUtils.recoverFileLease;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Constructor;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -45,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
|
@ -53,9 +51,11 @@ import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Writer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class is responsible for splitting up a bunch of regionserver commit log
|
* This class is responsible for splitting up a bunch of regionserver commit log
|
||||||
|
@ -66,49 +66,94 @@ public class HLogSplitter {
|
||||||
|
|
||||||
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
|
private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
|
|
||||||
|
|
||||||
private long splitTime = 0;
|
|
||||||
private long splitSize = 0;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of file that holds recovered edits written by the wal log splitting
|
* Name of file that holds recovered edits written by the wal log splitting
|
||||||
* code, one per region
|
* code, one per region
|
||||||
*/
|
*/
|
||||||
public static final String RECOVERED_EDITS = "recovered.edits";
|
public static final String RECOVERED_EDITS = "recovered.edits";
|
||||||
|
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(HLogSplitter.class);
|
||||||
|
|
||||||
|
private boolean hasSplit = false;
|
||||||
|
private long splitTime = 0;
|
||||||
|
private long splitSize = 0;
|
||||||
|
|
||||||
|
|
||||||
|
// Parameters for split process
|
||||||
|
protected final Path rootDir;
|
||||||
|
protected final Path srcDir;
|
||||||
|
protected final Path oldLogDir;
|
||||||
|
protected final FileSystem fs;
|
||||||
|
protected final Configuration conf;
|
||||||
|
|
||||||
|
// Major subcomponents of the split process.
|
||||||
|
// These are separated into inner classes to make testing easier.
|
||||||
|
OutputSink outputSink;
|
||||||
|
EntryBuffers entryBuffers;
|
||||||
|
|
||||||
|
// If an exception is thrown by one of the other threads, it will be
|
||||||
|
// stored here.
|
||||||
|
protected AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
|
||||||
|
|
||||||
|
// Wait/notify for when data has been produced by the reader thread,
|
||||||
|
// consumed by the reader thread, or an exception occurred
|
||||||
|
Object dataAvailable = new Object();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new HLogSplitter using the given {@link Configuration} and the
|
* Create a new HLogSplitter using the given {@link Configuration} and the
|
||||||
* <code>hbase.hlog.splitter.impl</code> property to derived the instance
|
* <code>hbase.hlog.splitter.impl</code> property to derived the instance
|
||||||
* class to use.
|
* class to use.
|
||||||
*
|
*
|
||||||
* @param conf
|
* @param rootDir hbase directory
|
||||||
* @return New HLogSplitter instance
|
* @param srcDir logs directory
|
||||||
|
* @param oldLogDir directory where processed logs are archived to
|
||||||
|
* @param logfiles the list of log files to split
|
||||||
*/
|
*/
|
||||||
public static HLogSplitter createLogSplitter(Configuration conf) {
|
public static HLogSplitter createLogSplitter(Configuration conf,
|
||||||
|
final Path rootDir, final Path srcDir,
|
||||||
|
Path oldLogDir, final FileSystem fs) {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
|
Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
|
||||||
.getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
|
.getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
|
||||||
try {
|
try {
|
||||||
return splitterClass.newInstance();
|
Constructor<? extends HLogSplitter> constructor =
|
||||||
|
splitterClass.getConstructor(
|
||||||
|
Configuration.class, // conf
|
||||||
|
Path.class, // rootDir
|
||||||
|
Path.class, // srcDir
|
||||||
|
Path.class, // oldLogDir
|
||||||
|
FileSystem.class); // fs
|
||||||
|
return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
} catch (InstantiationException e) {
|
} catch (InstantiationException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} catch (IllegalAccessException e) {
|
} catch (IllegalAccessException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
|
||||||
|
Path oldLogDir, FileSystem fs) {
|
||||||
|
this.conf = conf;
|
||||||
|
this.rootDir = rootDir;
|
||||||
|
this.srcDir = srcDir;
|
||||||
|
this.oldLogDir = oldLogDir;
|
||||||
|
this.fs = fs;
|
||||||
|
|
||||||
|
entryBuffers = new EntryBuffers(
|
||||||
// Private immutable datastructure to hold Writer and its Path.
|
conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
|
||||||
private final static class WriterAndPath {
|
128*1024*1024));
|
||||||
final Path p;
|
outputSink = new OutputSink();
|
||||||
final Writer w;
|
|
||||||
|
|
||||||
WriterAndPath(final Path p, final Writer w) {
|
|
||||||
this.p = p;
|
|
||||||
this.w = w;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -116,24 +161,14 @@ public class HLogSplitter {
|
||||||
* written to, into new files, one per region for region to replay on startup.
|
* written to, into new files, one per region for region to replay on startup.
|
||||||
* Delete the old log files when finished.
|
* Delete the old log files when finished.
|
||||||
*
|
*
|
||||||
* @param rootDir
|
* @throws IOException will throw if corrupted hlogs aren't tolerated
|
||||||
* qualified root directory of the HBase instance
|
|
||||||
* @param srcDir
|
|
||||||
* Directory of log files to split: e.g.
|
|
||||||
* <code>${ROOTDIR}/log_HOST_PORT</code>
|
|
||||||
* @param oldLogDir
|
|
||||||
* directory where processed (split) logs will be archived to
|
|
||||||
* @param fs
|
|
||||||
* FileSystem
|
|
||||||
* @param conf
|
|
||||||
* Configuration
|
|
||||||
* @throws IOException
|
|
||||||
* will throw if corrupted hlogs aren't tolerated
|
|
||||||
* @return the list of splits
|
* @return the list of splits
|
||||||
*/
|
*/
|
||||||
public List<Path> splitLog(final Path rootDir, final Path srcDir,
|
public List<Path> splitLog()
|
||||||
Path oldLogDir, final FileSystem fs, final Configuration conf)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
Preconditions.checkState(!hasSplit,
|
||||||
|
"An HLogSplitter instance may only be used once");
|
||||||
|
hasSplit = true;
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
List<Path> splits = null;
|
List<Path> splits = null;
|
||||||
|
@ -148,29 +183,8 @@ public class HLogSplitter {
|
||||||
}
|
}
|
||||||
LOG.info("Splitting " + logfiles.length + " hlog(s) in "
|
LOG.info("Splitting " + logfiles.length + " hlog(s) in "
|
||||||
+ srcDir.toString());
|
+ srcDir.toString());
|
||||||
splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf);
|
splits = splitLog(logfiles);
|
||||||
try {
|
|
||||||
FileStatus[] files = fs.listStatus(srcDir);
|
|
||||||
for (FileStatus file : files) {
|
|
||||||
Path newPath = HLog.getHLogArchivePath(oldLogDir, file.getPath());
|
|
||||||
LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to "
|
|
||||||
+ FSUtils.getPath(newPath));
|
|
||||||
if (!fs.rename(file.getPath(), newPath)) {
|
|
||||||
throw new IOException("Unable to rename " + file.getPath() +
|
|
||||||
" to " + newPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.debug("Moved " + files.length + " log files to "
|
|
||||||
+ FSUtils.getPath(oldLogDir));
|
|
||||||
if (!fs.delete(srcDir, true)) {
|
|
||||||
throw new IOException("Unable to delete " + srcDir);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
|
||||||
IOException io = new IOException("Cannot delete: " + srcDir);
|
|
||||||
io.initCause(e);
|
|
||||||
throw io;
|
|
||||||
}
|
|
||||||
splitTime = System.currentTimeMillis() - startTime;
|
splitTime = System.currentTimeMillis() - startTime;
|
||||||
LOG.info("hlog file splitting completed in " + splitTime +
|
LOG.info("hlog file splitting completed in " + splitTime +
|
||||||
" ms for " + srcDir.toString());
|
" ms for " + srcDir.toString());
|
||||||
|
@ -192,79 +206,58 @@ public class HLogSplitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sorts the HLog edits in the given list of logfiles (that are a mix of edits
|
* @return a map from encoded region ID to the number of edits written out
|
||||||
|
* for that region.
|
||||||
|
*/
|
||||||
|
Map<byte[], Long> getOutputCounts() {
|
||||||
|
Preconditions.checkState(hasSplit);
|
||||||
|
return outputSink.getOutputCounts();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Splits the HLog edits in the given list of logfiles (that are a mix of edits
|
||||||
* on multiple regions) by region and then splits them per region directories,
|
* on multiple regions) by region and then splits them per region directories,
|
||||||
* in batches of (hbase.hlog.split.batch.size)
|
* in batches of (hbase.hlog.split.batch.size)
|
||||||
*
|
*
|
||||||
* A batch consists of a set of log files that will be sorted in a single map
|
* This process is split into multiple threads. In the main thread, we loop
|
||||||
* of edits indexed by region the resulting map will be concurrently written
|
* through the logs to be split. For each log, we:
|
||||||
* by multiple threads to their corresponding regions
|
* <ul>
|
||||||
|
* <li> Recover it (take and drop HDFS lease) to ensure no other process can write</li>
|
||||||
|
* <li> Read each edit (see {@link #parseHLog}</li>
|
||||||
|
* <li> Mark as "processed" or "corrupt" depending on outcome</li>
|
||||||
|
* </ul>
|
||||||
*
|
*
|
||||||
* Each batch consists of more more log files that are - recovered (files is
|
* Each edit is passed into the EntryBuffers instance, which takes care of
|
||||||
* opened for append then closed to ensure no process is writing into it) -
|
* memory accounting and splitting the edits by region.
|
||||||
* parsed (each edit in the log is appended to a list of edits indexed by
|
|
||||||
* region see {@link #parseHLog} for more details) - marked as either
|
|
||||||
* processed or corrupt depending on parsing outcome - the resulting edits
|
|
||||||
* indexed by region are concurrently written to their corresponding region
|
|
||||||
* region directories - original files are then archived to a different
|
|
||||||
* directory
|
|
||||||
*
|
*
|
||||||
|
* The OutputSink object then manages N other WriterThreads which pull chunks
|
||||||
|
* of edits from EntryBuffers and write them to the output region directories.
|
||||||
*
|
*
|
||||||
*
|
* After the process is complete, the log files are archived to a separate
|
||||||
* @param rootDir
|
* directory.
|
||||||
* hbase directory
|
|
||||||
* @param srcDir
|
|
||||||
* logs directory
|
|
||||||
* @param oldLogDir
|
|
||||||
* directory where processed logs are archived to
|
|
||||||
* @param logfiles
|
|
||||||
* the list of log files to split
|
|
||||||
* @param fs
|
|
||||||
* @param conf
|
|
||||||
* @return
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
private List<Path> splitLog(final Path rootDir, final Path srcDir,
|
private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {
|
||||||
Path oldLogDir, final FileStatus[] logfiles, final FileSystem fs,
|
|
||||||
final Configuration conf) throws IOException {
|
|
||||||
List<Path> processedLogs = new ArrayList<Path>();
|
List<Path> processedLogs = new ArrayList<Path>();
|
||||||
List<Path> corruptedLogs = new ArrayList<Path>();
|
List<Path> corruptedLogs = new ArrayList<Path>();
|
||||||
final Map<byte[], WriterAndPath> logWriters = Collections
|
|
||||||
.synchronizedMap(new TreeMap<byte[], WriterAndPath>(
|
|
||||||
Bytes.BYTES_COMPARATOR));
|
|
||||||
List<Path> splits = null;
|
List<Path> splits = null;
|
||||||
|
|
||||||
// Number of logs in a read batch
|
|
||||||
// More means faster but bigger mem consumption
|
|
||||||
// TODO make a note on the conf rename and update hbase-site.xml if needed
|
|
||||||
int logFilesPerStep = conf.getInt("hbase.hlog.split.batch.size", 3);
|
|
||||||
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
|
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", false);
|
||||||
|
|
||||||
splitSize = 0;
|
splitSize = 0;
|
||||||
|
|
||||||
|
outputSink.startWriterThreads(entryBuffers);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
int i = -1;
|
int i = 0;
|
||||||
while (i < logfiles.length) {
|
for (FileStatus log : logfiles) {
|
||||||
final Map<byte[], LinkedList<Entry>> editsByRegion = new TreeMap<byte[], LinkedList<Entry>>(
|
|
||||||
Bytes.BYTES_COMPARATOR);
|
|
||||||
for (int j = 0; j < logFilesPerStep; j++) {
|
|
||||||
i++;
|
|
||||||
if (i == logfiles.length) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
FileStatus log = logfiles[i];
|
|
||||||
Path logPath = log.getPath();
|
Path logPath = log.getPath();
|
||||||
long logLength = log.getLen();
|
long logLength = log.getLen();
|
||||||
splitSize += logLength;
|
splitSize += logLength;
|
||||||
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length
|
LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
|
||||||
+ ": " + logPath + ", length=" + logLength);
|
+ ": " + logPath + ", length=" + logLength);
|
||||||
try {
|
try {
|
||||||
recoverFileLease(fs, logPath, conf);
|
recoverFileLease(fs, logPath, conf);
|
||||||
parseHLog(log, editsByRegion, fs, conf);
|
parseHLog(log, entryBuffers, fs, conf);
|
||||||
processedLogs.add(logPath);
|
|
||||||
} catch (EOFException eof) {
|
|
||||||
// truncated files are expected if a RS crashes (see HBASE-2643)
|
|
||||||
LOG.info("EOF from hlog " + logPath + ". continuing");
|
|
||||||
processedLogs.add(logPath);
|
processedLogs.add(logPath);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// If the IOE resulted from bad file format,
|
// If the IOE resulted from bad file format,
|
||||||
|
@ -283,94 +276,19 @@ public class HLogSplitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf);
|
|
||||||
}
|
|
||||||
if (fs.listStatus(srcDir).length > processedLogs.size()
|
if (fs.listStatus(srcDir).length > processedLogs.size()
|
||||||
+ corruptedLogs.size()) {
|
+ corruptedLogs.size()) {
|
||||||
throw new OrphanHLogAfterSplitException(
|
throw new OrphanHLogAfterSplitException(
|
||||||
"Discovered orphan hlog after split. Maybe the "
|
"Discovered orphan hlog after split. Maybe the "
|
||||||
+ "HRegionServer was not dead when we started");
|
+ "HRegionServer was not dead when we started");
|
||||||
}
|
}
|
||||||
archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
|
archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
|
||||||
} finally {
|
} finally {
|
||||||
splits = new ArrayList<Path>(logWriters.size());
|
splits = outputSink.finishWritingAndClose();
|
||||||
for (WriterAndPath wap : logWriters.values()) {
|
|
||||||
wap.w.close();
|
|
||||||
splits.add(wap.p);
|
|
||||||
LOG.debug("Closed " + wap.p);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return splits;
|
return splits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Takes splitLogsMap and concurrently writes them to region directories using a thread pool
|
|
||||||
*
|
|
||||||
* @param splitLogsMap map that contains the log splitting result indexed by region
|
|
||||||
* @param logWriters map that contains a writer per region
|
|
||||||
* @param rootDir hbase root dir
|
|
||||||
* @param fs
|
|
||||||
* @param conf
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private void writeEditsBatchToRegions(
|
|
||||||
final Map<byte[], LinkedList<Entry>> splitLogsMap,
|
|
||||||
final Map<byte[], WriterAndPath> logWriters, final Path rootDir,
|
|
||||||
final FileSystem fs, final Configuration conf)
|
|
||||||
throws IOException {
|
|
||||||
// Number of threads to use when log splitting to rewrite the logs.
|
|
||||||
// More means faster but bigger mem consumption.
|
|
||||||
int logWriterThreads = conf.getInt(
|
|
||||||
"hbase.regionserver.hlog.splitlog.writer.threads", 3);
|
|
||||||
boolean skipErrors = conf.getBoolean("hbase.skip.errors", false);
|
|
||||||
HashMap<byte[], Future> writeFutureResult = new HashMap<byte[], Future>();
|
|
||||||
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
|
||||||
builder.setNameFormat("SplitWriter-%1$d");
|
|
||||||
ThreadFactory factory = builder.build();
|
|
||||||
ThreadPoolExecutor threadPool =
|
|
||||||
(ThreadPoolExecutor)Executors.newFixedThreadPool(logWriterThreads, factory);
|
|
||||||
for (final byte[] region : splitLogsMap.keySet()) {
|
|
||||||
Callable splitter = createNewSplitter(rootDir, logWriters, splitLogsMap,
|
|
||||||
region, fs, conf);
|
|
||||||
writeFutureResult.put(region, threadPool.submit(splitter));
|
|
||||||
}
|
|
||||||
|
|
||||||
threadPool.shutdown();
|
|
||||||
// Wait for all threads to terminate
|
|
||||||
try {
|
|
||||||
for (int j = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); j++) {
|
|
||||||
String message = "Waiting for hlog writers to terminate, elapsed " + j
|
|
||||||
* 5 + " seconds";
|
|
||||||
if (j < 30) {
|
|
||||||
LOG.debug(message);
|
|
||||||
} else {
|
|
||||||
LOG.info(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
LOG.warn("Hlog writers were interrupted, possible data loss!");
|
|
||||||
if (!skipErrors) {
|
|
||||||
throw new IOException("Could not finish writing log entries", ex);
|
|
||||||
// TODO maybe we should fail here regardless if skipErrors is active or not
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Map.Entry<byte[], Future> entry : writeFutureResult.entrySet()) {
|
|
||||||
try {
|
|
||||||
entry.getValue().get();
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
throw (new IOException(e.getCause()));
|
|
||||||
} catch (InterruptedException e1) {
|
|
||||||
LOG.warn("Writer for region " + Bytes.toString(entry.getKey())
|
|
||||||
+ " was interrupted, however the write process should have "
|
|
||||||
+ "finished. Throwing up ", e1);
|
|
||||||
throw (new IOException(e1.getCause()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Moves processed logs to a oldLogDir after successful processing Moves
|
* Moves processed logs to a oldLogDir after successful processing Moves
|
||||||
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
|
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
|
||||||
|
@ -383,7 +301,9 @@ public class HLogSplitter {
|
||||||
* @param conf
|
* @param conf
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private static void archiveLogs(final List<Path> corruptedLogs,
|
private static void archiveLogs(
|
||||||
|
final Path srcDir,
|
||||||
|
final List<Path> corruptedLogs,
|
||||||
final List<Path> processedLogs, final Path oldLogDir,
|
final List<Path> processedLogs, final Path oldLogDir,
|
||||||
final FileSystem fs, final Configuration conf) throws IOException {
|
final FileSystem fs, final Configuration conf) throws IOException {
|
||||||
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
|
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
|
||||||
|
@ -411,6 +331,10 @@ public class HLogSplitter {
|
||||||
LOG.info("Archived processed log " + p + " to " + newPath);
|
LOG.info("Archived processed log " + p + " to " + newPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!fs.delete(srcDir, true)) {
|
||||||
|
throw new IOException("Unable to delete src dir: " + srcDir);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -460,7 +384,7 @@ public class HLogSplitter {
|
||||||
* @throws IOException if hlog is corrupted, or can't be open
|
* @throws IOException if hlog is corrupted, or can't be open
|
||||||
*/
|
*/
|
||||||
private void parseHLog(final FileStatus logfile,
|
private void parseHLog(final FileStatus logfile,
|
||||||
final Map<byte[], LinkedList<Entry>> splitLogsMap, final FileSystem fs,
|
EntryBuffers entryBuffers, final FileSystem fs,
|
||||||
final Configuration conf)
|
final Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Check for possibly empty file. With appends, currently Hadoop reports a
|
// Check for possibly empty file. With appends, currently Hadoop reports a
|
||||||
|
@ -490,15 +414,11 @@ public class HLogSplitter {
|
||||||
try {
|
try {
|
||||||
Entry entry;
|
Entry entry;
|
||||||
while ((entry = in.next()) != null) {
|
while ((entry = in.next()) != null) {
|
||||||
byte[] region = entry.getKey().getEncodedRegionName();
|
entryBuffers.appendEntry(entry);
|
||||||
LinkedList<Entry> queue = splitLogsMap.get(region);
|
|
||||||
if (queue == null) {
|
|
||||||
queue = new LinkedList<Entry>();
|
|
||||||
splitLogsMap.put(region, queue);
|
|
||||||
}
|
|
||||||
queue.addLast(entry);
|
|
||||||
editsCount++;
|
editsCount++;
|
||||||
}
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw new RuntimeException(ie);
|
||||||
} finally {
|
} finally {
|
||||||
LOG.debug("Pushed=" + editsCount + " entries from " + path);
|
LOG.debug("Pushed=" + editsCount + " entries from " + path);
|
||||||
try {
|
try {
|
||||||
|
@ -506,37 +426,390 @@ public class HLogSplitter {
|
||||||
in.close();
|
in.close();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG
|
LOG.warn("Close log reader in finally threw exception -- continuing",
|
||||||
.warn("Close log reader in finally threw exception -- continuing",
|
|
||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Callable<Void> createNewSplitter(final Path rootDir,
|
private void writerThreadError(Throwable t) {
|
||||||
final Map<byte[], WriterAndPath> logWriters,
|
thrown.compareAndSet(null, t);
|
||||||
final Map<byte[], LinkedList<Entry>> logEntries, final byte[] region,
|
|
||||||
final FileSystem fs, final Configuration conf) {
|
|
||||||
return new Callable<Void>() {
|
|
||||||
public String getName() {
|
|
||||||
return "Split writer thread for region " + Bytes.toStringBinary(region);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public Void call() throws IOException {
|
* Check for errors in the writer threads. If any is found, rethrow it.
|
||||||
LinkedList<Entry> entries = logEntries.get(region);
|
*/
|
||||||
LOG.debug(this.getName() + " got " + entries.size() + " to process");
|
private void checkForErrors() throws IOException {
|
||||||
long threadTime = System.currentTimeMillis();
|
Throwable thrown = this.thrown.get();
|
||||||
|
if (thrown == null) return;
|
||||||
|
if (thrown instanceof IOException) {
|
||||||
|
throw (IOException)thrown;
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException(thrown);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Create a new {@link Writer} for writing log splits.
|
||||||
|
*/
|
||||||
|
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
return HLog.createWriter(fs, logfile, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new {@link Reader} for reading logs to split.
|
||||||
|
*/
|
||||||
|
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
return HLog.getReader(fs, curLogFile, conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class which accumulates edits and separates them into a buffer per region
|
||||||
|
* while simultaneously accounting RAM usage. Blocks if the RAM usage crosses
|
||||||
|
* a predefined threshold.
|
||||||
|
*
|
||||||
|
* Writer threads then pull region-specific buffers from this class.
|
||||||
|
*/
|
||||||
|
class EntryBuffers {
|
||||||
|
Map<byte[], RegionEntryBuffer> buffers =
|
||||||
|
new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
|
/* Track which regions are currently in the middle of writing. We don't allow
|
||||||
|
an IO thread to pick up bytes from a region if we're already writing
|
||||||
|
data for that region in a different IO thread. */
|
||||||
|
Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
|
long totalBuffered = 0;
|
||||||
|
long maxHeapUsage;
|
||||||
|
|
||||||
|
EntryBuffers(long maxHeapUsage) {
|
||||||
|
this.maxHeapUsage = maxHeapUsage;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append a log entry into the corresponding region buffer.
|
||||||
|
* Blocks if the total heap usage has crossed the specified threshold.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void appendEntry(Entry entry) throws InterruptedException, IOException {
|
||||||
|
HLogKey key = entry.getKey();
|
||||||
|
|
||||||
|
RegionEntryBuffer buffer;
|
||||||
|
synchronized (this) {
|
||||||
|
buffer = buffers.get(key.getEncodedRegionName());
|
||||||
|
if (buffer == null) {
|
||||||
|
buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
|
||||||
|
buffers.put(key.getEncodedRegionName(), buffer);
|
||||||
|
}
|
||||||
|
long incrHeap = buffer.appendEntry(entry);
|
||||||
|
totalBuffered += incrHeap;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we crossed the chunk threshold, wait for more space to be available
|
||||||
|
synchronized (dataAvailable) {
|
||||||
|
while (totalBuffered > maxHeapUsage && thrown == null) {
|
||||||
|
LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
|
||||||
|
dataAvailable.wait(3000);
|
||||||
|
}
|
||||||
|
dataAvailable.notifyAll();
|
||||||
|
}
|
||||||
|
checkForErrors();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized RegionEntryBuffer getChunkToWrite() {
|
||||||
|
long biggestSize=0;
|
||||||
|
byte[] biggestBufferKey=null;
|
||||||
|
|
||||||
|
for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
|
||||||
|
long size = entry.getValue().heapSize();
|
||||||
|
if (size > biggestSize && !currentlyWriting.contains(entry.getKey())) {
|
||||||
|
biggestSize = size;
|
||||||
|
biggestBufferKey = entry.getKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (biggestBufferKey == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
|
||||||
|
currentlyWriting.add(biggestBufferKey);
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
void doneWriting(RegionEntryBuffer buffer) {
|
||||||
|
synchronized (this) {
|
||||||
|
boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
|
||||||
|
assert removed;
|
||||||
|
}
|
||||||
|
long size = buffer.heapSize();
|
||||||
|
|
||||||
|
synchronized (dataAvailable) {
|
||||||
|
totalBuffered -= size;
|
||||||
|
// We may unblock writers
|
||||||
|
dataAvailable.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
|
||||||
|
return currentlyWriting.contains(region);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A buffer of some number of edits for a given region.
|
||||||
|
* This accumulates edits and also provides a memory optimization in order to
|
||||||
|
* share a single byte array instance for the table and region name.
|
||||||
|
* Also tracks memory usage of the accumulated edits.
|
||||||
|
*/
|
||||||
|
static class RegionEntryBuffer implements HeapSize {
|
||||||
|
long heapInBuffer = 0;
|
||||||
|
List<Entry> entryBuffer;
|
||||||
|
byte[] tableName;
|
||||||
|
byte[] encodedRegionName;
|
||||||
|
|
||||||
|
RegionEntryBuffer(byte[] table, byte[] region) {
|
||||||
|
this.tableName = table;
|
||||||
|
this.encodedRegionName = region;
|
||||||
|
this.entryBuffer = new LinkedList<Entry>();
|
||||||
|
}
|
||||||
|
|
||||||
|
long appendEntry(Entry entry) {
|
||||||
|
internify(entry);
|
||||||
|
entryBuffer.add(entry);
|
||||||
|
long incrHeap = entry.getEdit().heapSize() +
|
||||||
|
ClassSize.align(2 * ClassSize.REFERENCE) + // HLogKey pointers
|
||||||
|
0; // TODO linkedlist entry
|
||||||
|
heapInBuffer += incrHeap;
|
||||||
|
return incrHeap;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internify(Entry entry) {
|
||||||
|
HLogKey k = entry.getKey();
|
||||||
|
k.internTableName(this.tableName);
|
||||||
|
k.internEncodedRegionName(this.encodedRegionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long heapSize() {
|
||||||
|
return heapInBuffer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class WriterThread extends Thread {
|
||||||
|
private volatile boolean shouldStop = false;
|
||||||
|
|
||||||
|
WriterThread(int i) {
|
||||||
|
super("WriterThread-" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
doRun();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("Error in log splitting write thread", t);
|
||||||
|
writerThreadError(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doRun() throws IOException {
|
||||||
|
LOG.debug("Writer thread " + this + ": starting");
|
||||||
|
while (true) {
|
||||||
|
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
|
||||||
|
if (buffer == null) {
|
||||||
|
// No data currently available, wait on some more to show up
|
||||||
|
synchronized (dataAvailable) {
|
||||||
|
if (shouldStop) return;
|
||||||
|
try {
|
||||||
|
dataAvailable.wait(1000);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
if (!shouldStop) {
|
||||||
|
throw new RuntimeException(ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert buffer != null;
|
||||||
|
try {
|
||||||
|
writeBuffer(buffer);
|
||||||
|
} finally {
|
||||||
|
entryBuffers.doneWriting(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
|
||||||
|
List<Entry> entries = buffer.entryBuffer;
|
||||||
|
if (entries.isEmpty()) {
|
||||||
|
LOG.warn(this.getName() + " got an empty buffer, skipping");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
WriterAndPath wap = null;
|
||||||
|
|
||||||
|
long startTime = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
int editsCount = 0;
|
int editsCount = 0;
|
||||||
WriterAndPath wap = logWriters.get(region);
|
|
||||||
for (Entry logEntry : entries) {
|
for (Entry logEntry : entries) {
|
||||||
if (wap == null) {
|
if (wap == null) {
|
||||||
Path regionedits = getRegionSplitEditsPath(fs, logEntry, rootDir);
|
wap = outputSink.getWriterAndPath(logEntry);
|
||||||
if (regionedits == null) {
|
if (wap == null) {
|
||||||
// we already print a message if it's null in getRegionSplitEditsPath
|
// getWriterAndPath decided we don't need to write these edits
|
||||||
break;
|
// Message was already logged
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
wap.w.append(logEntry);
|
||||||
|
editsCount++;
|
||||||
|
}
|
||||||
|
// Pass along summary statistics
|
||||||
|
wap.incrementEdits(editsCount);
|
||||||
|
wap.incrementNanoTime(System.nanoTime() - startTime);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e = RemoteExceptionHandler.checkIOException(e);
|
||||||
|
LOG.fatal(this.getName() + " Got while writing log entry to log", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void finish() {
|
||||||
|
shouldStop = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that manages the output streams from the log splitting process.
|
||||||
|
*/
|
||||||
|
class OutputSink {
|
||||||
|
private final Map<byte[], WriterAndPath> logWriters = Collections.synchronizedMap(
|
||||||
|
new TreeMap<byte[], WriterAndPath>(Bytes.BYTES_COMPARATOR));
|
||||||
|
private final List<WriterThread> writerThreads = Lists.newArrayList();
|
||||||
|
|
||||||
|
/* Set of regions which we've decided should not output edits */
|
||||||
|
private final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(
|
||||||
|
new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
|
||||||
|
|
||||||
|
private boolean hasClosed = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the threads that will pump data from the entryBuffers
|
||||||
|
* to the output files.
|
||||||
|
* @return the list of started threads
|
||||||
|
*/
|
||||||
|
synchronized void startWriterThreads(EntryBuffers entryBuffers) {
|
||||||
|
// More threads could potentially write faster at the expense
|
||||||
|
// of causing more disk seeks as the logs are split.
|
||||||
|
// 3. After a certain setting (probably around 3) the
|
||||||
|
// process will be bound on the reader in the current
|
||||||
|
// implementation anyway.
|
||||||
|
int numThreads = conf.getInt(
|
||||||
|
"hbase.regionserver.hlog.splitlog.writer.threads", 3);
|
||||||
|
|
||||||
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
WriterThread t = new WriterThread(i);
|
||||||
|
t.start();
|
||||||
|
writerThreads.add(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Path> finishWritingAndClose() throws IOException {
|
||||||
|
LOG.info("Waiting for split writer threads to finish");
|
||||||
|
for (WriterThread t : writerThreads) {
|
||||||
|
t.finish();
|
||||||
|
}
|
||||||
|
for (WriterThread t: writerThreads) {
|
||||||
|
try {
|
||||||
|
t.join();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw new IOException(ie);
|
||||||
|
}
|
||||||
|
checkForErrors();
|
||||||
|
}
|
||||||
|
LOG.info("Split writers finished");
|
||||||
|
|
||||||
|
return closeStreams();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all of the output streams.
|
||||||
|
* @return the list of paths written.
|
||||||
|
*/
|
||||||
|
private List<Path> closeStreams() throws IOException {
|
||||||
|
Preconditions.checkState(!hasClosed);
|
||||||
|
|
||||||
|
List<Path> paths = new ArrayList<Path>();
|
||||||
|
List<IOException> thrown = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (WriterAndPath wap : logWriters.values()) {
|
||||||
|
try {
|
||||||
|
wap.w.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Couldn't close log at " + wap.p, ioe);
|
||||||
|
thrown.add(ioe);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
paths.add(wap.p);
|
||||||
|
LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
|
||||||
|
+ (wap.nanosSpent / 1000/ 1000) + "ms)");
|
||||||
|
}
|
||||||
|
if (!thrown.isEmpty()) {
|
||||||
|
throw MultipleIOException.createIOException(thrown);
|
||||||
|
}
|
||||||
|
|
||||||
|
hasClosed = true;
|
||||||
|
return paths;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a writer and path for a log starting at the given entry.
|
||||||
|
*
|
||||||
|
* This function is threadsafe so long as multiple threads are always
|
||||||
|
* acting on different regions.
|
||||||
|
*
|
||||||
|
* @return null if this region shouldn't output any logs
|
||||||
|
*/
|
||||||
|
WriterAndPath getWriterAndPath(Entry entry) throws IOException {
|
||||||
|
|
||||||
|
byte region[] = entry.getKey().getEncodedRegionName();
|
||||||
|
WriterAndPath ret = logWriters.get(region);
|
||||||
|
if (ret != null) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we already decided that this region doesn't get any output
|
||||||
|
// we don't need to check again.
|
||||||
|
if (blacklistedRegions.contains(region)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Need to create writer
|
||||||
|
Path regionedits = getRegionSplitEditsPath(fs,
|
||||||
|
entry, rootDir);
|
||||||
|
if (regionedits == null) {
|
||||||
|
// Edits dir doesn't exist
|
||||||
|
blacklistedRegions.add(region);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
deletePreexistingOldEdits(regionedits);
|
||||||
|
Writer w = createWriter(fs, regionedits, conf);
|
||||||
|
ret = new WriterAndPath(regionedits, w);
|
||||||
|
logWriters.put(region, ret);
|
||||||
|
LOG.debug("Creating writer path=" + regionedits + " region="
|
||||||
|
+ Bytes.toStringBinary(region));
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the specified path exists, issue a warning and delete it.
|
||||||
|
*/
|
||||||
|
private void deletePreexistingOldEdits(Path regionedits) throws IOException {
|
||||||
if (fs.exists(regionedits)) {
|
if (fs.exists(regionedits)) {
|
||||||
LOG.warn("Found existing old edits file. It could be the "
|
LOG.warn("Found existing old edits file. It could be the "
|
||||||
+ "result of a previous failed split attempt. Deleting "
|
+ "result of a previous failed split attempt. Deleting "
|
||||||
|
@ -546,54 +819,49 @@ public class HLogSplitter {
|
||||||
LOG.warn("Failed delete of old " + regionedits);
|
LOG.warn("Failed delete of old " + regionedits);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Writer w = createWriter(fs, regionedits, conf);
|
|
||||||
wap = new WriterAndPath(regionedits, w);
|
|
||||||
logWriters.put(region, wap);
|
|
||||||
LOG.debug("Creating writer path=" + regionedits + " region="
|
|
||||||
+ Bytes.toStringBinary(region));
|
|
||||||
}
|
|
||||||
wap.w.append(logEntry);
|
|
||||||
editsCount++;
|
|
||||||
}
|
|
||||||
LOG.debug(this.getName() + " Applied " + editsCount
|
|
||||||
+ " total edits to " + Bytes.toStringBinary(region) + " in "
|
|
||||||
+ (System.currentTimeMillis() - threadTime) + "ms");
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = RemoteExceptionHandler.checkIOException(e);
|
|
||||||
LOG.fatal(this.getName() + " Got while writing log entry to log", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new {@link Writer} for writing log splits.
|
* @return a map from encoded region ID to the number of edits written out
|
||||||
*
|
* for that region.
|
||||||
* @param fs
|
|
||||||
* @param logfile
|
|
||||||
* @param conf
|
|
||||||
* @return A new Writer instance
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
protected Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
|
private Map<byte[], Long> getOutputCounts() {
|
||||||
throws IOException {
|
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(
|
||||||
return HLog.createWriter(fs, logfile, conf);
|
Bytes.BYTES_COMPARATOR);
|
||||||
|
synchronized (logWriters) {
|
||||||
|
for (Map.Entry<byte[], WriterAndPath> entry : logWriters.entrySet()) {
|
||||||
|
ret.put(entry.getKey(), entry.getValue().editsWritten);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new {@link Reader} for reading logs to split.
|
* Private data structure that wraps a Writer and its Path,
|
||||||
*
|
* also collecting statistics about the data written to this
|
||||||
* @param fs
|
* output.
|
||||||
* @param curLogFile
|
|
||||||
* @param conf
|
|
||||||
* @return A new Reader instance
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
|
private final static class WriterAndPath {
|
||||||
throws IOException {
|
final Path p;
|
||||||
return HLog.getReader(fs, curLogFile, conf);
|
final Writer w;
|
||||||
|
|
||||||
|
/* Count of edits written to this path */
|
||||||
|
long editsWritten = 0;
|
||||||
|
/* Number of nanos spent writing to this log */
|
||||||
|
long nanosSpent = 0;
|
||||||
|
|
||||||
|
WriterAndPath(final Path p, final Writer w) {
|
||||||
|
this.p = p;
|
||||||
|
this.w = w;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void incrementEdits(int edits) {
|
||||||
|
editsWritten += edits;
|
||||||
|
}
|
||||||
|
|
||||||
|
void incrementNanoTime(long nanos) {
|
||||||
|
nanosSpent += nanos;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
@ -66,7 +67,7 @@ import org.apache.hadoop.io.Writable;
|
||||||
* is an old style KeyValue or the new style WALEdit.
|
* is an old style KeyValue or the new style WALEdit.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class WALEdit implements Writable {
|
public class WALEdit implements Writable, HeapSize {
|
||||||
|
|
||||||
private final int VERSION_2 = -1;
|
private final int VERSION_2 = -1;
|
||||||
|
|
||||||
|
@ -154,7 +155,19 @@ public class WALEdit implements Writable {
|
||||||
out.writeInt(scopes.get(key));
|
out.writeInt(scopes.get(key));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long heapSize() {
|
||||||
|
long ret = 0;
|
||||||
|
for (KeyValue kv : kvs) {
|
||||||
|
ret += kv.heapSize();
|
||||||
|
}
|
||||||
|
if (scopes != null) {
|
||||||
|
ret += ClassSize.TREEMAP;
|
||||||
|
ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
|
||||||
|
// TODO this isn't quite right, need help here
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
|
|
@ -164,10 +164,10 @@ public class TestHLog {
|
||||||
log.rollWriter();
|
log.rollWriter();
|
||||||
}
|
}
|
||||||
log.close();
|
log.close();
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
|
hbaseDir, logdir, this.oldLogDir, this.fs);
|
||||||
List<Path> splits =
|
List<Path> splits =
|
||||||
logSplitter.splitLog(hbaseDir, logdir,
|
logSplitter.splitLog();
|
||||||
this.oldLogDir, this.fs, conf);
|
|
||||||
verifySplits(splits, howmany);
|
verifySplits(splits, howmany);
|
||||||
log = null;
|
log = null;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -24,16 +24,28 @@ import static org.junit.Assert.*;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.KeyValueTestUtil;
|
||||||
|
import org.apache.hadoop.hbase.MultithreadedTestUtil;
|
||||||
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
||||||
|
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.EntryBuffers;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.RegionEntryBuffer;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple testing of a few HLog methods.
|
* Simple testing of a few HLog methods.
|
||||||
*/
|
*/
|
||||||
public class TestHLogMethods {
|
public class TestHLogMethods {
|
||||||
|
private static final byte[] TEST_REGION = Bytes.toBytes("test_region");;
|
||||||
|
private static final byte[] TEST_TABLE = Bytes.toBytes("test_table");
|
||||||
|
|
||||||
private final HBaseTestingUtility util = new HBaseTestingUtility();
|
private final HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,4 +96,71 @@ public class TestHLogMethods {
|
||||||
FSDataOutputStream fdos = fs.create(new Path(testdir, name), true);
|
FSDataOutputStream fdos = fs.create(new Path(testdir, name), true);
|
||||||
fdos.close();
|
fdos.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegionEntryBuffer() throws Exception {
|
||||||
|
HLogSplitter.RegionEntryBuffer reb = new HLogSplitter.RegionEntryBuffer(
|
||||||
|
TEST_TABLE, TEST_REGION);
|
||||||
|
assertEquals(0, reb.heapSize());
|
||||||
|
|
||||||
|
reb.appendEntry(createTestLogEntry(1));
|
||||||
|
assertTrue(reb.heapSize() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEntrySink() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
HLogSplitter splitter = HLogSplitter.createLogSplitter(
|
||||||
|
conf, mock(Path.class), mock(Path.class), mock(Path.class),
|
||||||
|
mock(FileSystem.class));
|
||||||
|
|
||||||
|
EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
HLog.Entry entry = createTestLogEntry(i);
|
||||||
|
sink.appendEntry(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(sink.totalBuffered > 0);
|
||||||
|
long amountInChunk = sink.totalBuffered;
|
||||||
|
// Get a chunk
|
||||||
|
RegionEntryBuffer chunk = sink.getChunkToWrite();
|
||||||
|
assertEquals(chunk.heapSize(), amountInChunk);
|
||||||
|
|
||||||
|
// Make sure it got marked that a thread is "working on this"
|
||||||
|
assertTrue(sink.isRegionCurrentlyWriting(TEST_REGION));
|
||||||
|
|
||||||
|
// Insert some more entries
|
||||||
|
for (int i = 0; i < 500; i++) {
|
||||||
|
HLog.Entry entry = createTestLogEntry(i);
|
||||||
|
sink.appendEntry(entry);
|
||||||
|
}
|
||||||
|
// Asking for another chunk shouldn't work since the first one
|
||||||
|
// is still writing
|
||||||
|
assertNull(sink.getChunkToWrite());
|
||||||
|
|
||||||
|
// If we say we're done writing the first chunk, then we should be able
|
||||||
|
// to get the second
|
||||||
|
sink.doneWriting(chunk);
|
||||||
|
|
||||||
|
RegionEntryBuffer chunk2 = sink.getChunkToWrite();
|
||||||
|
assertNotNull(chunk2);
|
||||||
|
assertNotSame(chunk, chunk2);
|
||||||
|
long amountInChunk2 = sink.totalBuffered;
|
||||||
|
// The second chunk had fewer rows than the first
|
||||||
|
assertTrue(amountInChunk2 < amountInChunk);
|
||||||
|
|
||||||
|
sink.doneWriting(chunk2);
|
||||||
|
assertEquals(0, sink.totalBuffered);
|
||||||
|
}
|
||||||
|
|
||||||
|
private HLog.Entry createTestLogEntry(int i) {
|
||||||
|
long seq = i;
|
||||||
|
long now = i * 1000;
|
||||||
|
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val"));
|
||||||
|
HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now);
|
||||||
|
HLog.Entry entry = new HLog.Entry(key, edit);
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -19,16 +19,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -39,7 +37,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -52,9 +52,16 @@ import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testing {@link HLog} splitting code.
|
* Testing {@link HLog} splitting code.
|
||||||
|
@ -119,11 +126,15 @@ public class TestHLogSplit {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
flushToConsole("Cleaning up cluster for new test\n"
|
||||||
|
+ "--------------------------");
|
||||||
conf = TEST_UTIL.getConfiguration();
|
conf = TEST_UTIL.getConfiguration();
|
||||||
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
FileStatus[] entries = fs.listStatus(new Path("/"));
|
FileStatus[] entries = fs.listStatus(new Path("/"));
|
||||||
|
flushToConsole("Num entries in /:" + entries.length);
|
||||||
for (FileStatus dir : entries){
|
for (FileStatus dir : entries){
|
||||||
fs.delete(dir.getPath(), true);
|
assertTrue("Deleting " + dir.getPath(),
|
||||||
|
fs.delete(dir.getPath(), true));
|
||||||
}
|
}
|
||||||
seq = 0;
|
seq = 0;
|
||||||
regions = new ArrayList<String>();
|
regions = new ArrayList<String>();
|
||||||
|
@ -161,18 +172,23 @@ public class TestHLogSplit {
|
||||||
public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
|
public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
AtomicBoolean stop = new AtomicBoolean(false);
|
AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
FileStatus[] stats = fs.listStatus(new Path("/hbase/t1"));
|
||||||
|
assertTrue("Previous test should clean up table dir",
|
||||||
|
stats == null || stats.length == 0);
|
||||||
|
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
|
||||||
try {
|
try {
|
||||||
(new ZombieNewLogWriterRegionServer(stop)).start();
|
(new ZombieNewLogWriterRegionServer(stop)).start();
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
} finally {
|
} finally {
|
||||||
stop.set(true);
|
stop.set(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplitPreservesEdits() throws IOException{
|
public void testSplitPreservesEdits() throws IOException{
|
||||||
final String REGION = "region__1";
|
final String REGION = "region__1";
|
||||||
|
@ -181,8 +197,9 @@ public class TestHLogSplit {
|
||||||
|
|
||||||
generateHLogs(1, 10, -1);
|
generateHLogs(1, 10, -1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
|
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
|
||||||
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
|
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
|
||||||
|
@ -202,8 +219,9 @@ public class TestHLogSplit {
|
||||||
// initialize will create a new DFSClient with a new client ID
|
// initialize will create a new DFSClient with a new client ID
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
|
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
|
@ -224,8 +242,9 @@ public class TestHLogSplit {
|
||||||
// initialize will create a new DFSClient with a new client ID
|
// initialize will create a new DFSClient with a new client ID
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
||||||
|
@ -240,8 +259,9 @@ public class TestHLogSplit {
|
||||||
|
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
||||||
|
@ -260,8 +280,9 @@ public class TestHLogSplit {
|
||||||
Corruptions.APPEND_GARBAGE, true, fs);
|
Corruptions.APPEND_GARBAGE, true, fs);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
||||||
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
|
assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
|
||||||
|
@ -278,8 +299,9 @@ public class TestHLogSplit {
|
||||||
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
|
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
||||||
assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
|
assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
|
||||||
|
@ -296,8 +318,9 @@ public class TestHLogSplit {
|
||||||
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
|
corruptHLog(new Path(hlogDir, HLOG_FILE_PREFIX + "5"),
|
||||||
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
|
Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
||||||
|
@ -323,13 +346,13 @@ public class TestHLogSplit {
|
||||||
Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
|
Path c1 = new Path(hlogDir, HLOG_FILE_PREFIX + "0");
|
||||||
conf.setClass("hbase.regionserver.hlog.reader.impl",
|
conf.setClass("hbase.regionserver.hlog.reader.impl",
|
||||||
FaultySequenceFileLogReader.class, HLog.Reader.class);
|
FaultySequenceFileLogReader.class, HLog.Reader.class);
|
||||||
String[] failureTypes = { "begin", "middle", "end" };
|
|
||||||
for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) {
|
for (FaultySequenceFileLogReader.FailureType failureType : FaultySequenceFileLogReader.FailureType.values()) {
|
||||||
conf.set("faultysequencefilelogreader.failuretype", failureType.name());
|
conf.set("faultysequencefilelogreader.failuretype", failureType.name());
|
||||||
generateHLogs(1, ENTRIES, -1);
|
generateHLogs(1, ENTRIES, -1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
|
FileStatus[] archivedLogs = fs.listStatus(corruptDir);
|
||||||
assertEquals("expected a different file", c1.getName(), archivedLogs[0]
|
assertEquals("expected a different file", c1.getName(), archivedLogs[0]
|
||||||
.getPath().getName());
|
.getPath().getName());
|
||||||
|
@ -358,8 +381,9 @@ public class TestHLogSplit {
|
||||||
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
|
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
|
||||||
generateHLogs(Integer.MAX_VALUE);
|
generateHLogs(Integer.MAX_VALUE);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
} finally {
|
} finally {
|
||||||
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
|
conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
|
||||||
Reader.class);
|
Reader.class);
|
||||||
|
@ -383,9 +407,10 @@ public class TestHLogSplit {
|
||||||
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
|
conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
try {
|
try {
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
logSplitter.splitLog();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"if skip.errors is false all files should remain in place",
|
"if skip.errors is false all files should remain in place",
|
||||||
|
@ -413,8 +438,9 @@ public class TestHLogSplit {
|
||||||
corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
|
corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
|
||||||
|
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
|
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
|
||||||
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
|
Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
|
||||||
|
@ -437,8 +463,9 @@ public class TestHLogSplit {
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
|
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
|
FileStatus[] archivedLogs = fs.listStatus(oldLogDir);
|
||||||
|
|
||||||
|
@ -449,8 +476,9 @@ public class TestHLogSplit {
|
||||||
public void testSplit() throws IOException {
|
public void testSplit() throws IOException {
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
Path logfile = getLogForRegion(hbaseDir, TABLE_NAME, region);
|
||||||
|
@ -464,12 +492,16 @@ public class TestHLogSplit {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
generateHLogs(-1);
|
generateHLogs(-1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
FileStatus [] statuses = null;
|
FileStatus [] statuses = null;
|
||||||
try {
|
try {
|
||||||
statuses = fs.listStatus(hlogDir);
|
statuses = fs.listStatus(hlogDir);
|
||||||
assertNull(statuses);
|
if (statuses != null) {
|
||||||
|
Assert.fail("Files left in log dir: " +
|
||||||
|
Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
|
||||||
|
}
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
// hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
|
// hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
|
||||||
}
|
}
|
||||||
|
@ -516,8 +548,9 @@ public class TestHLogSplit {
|
||||||
try {
|
try {
|
||||||
zombie.start();
|
zombie.start();
|
||||||
try {
|
try {
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
} catch (IOException ex) {/* expected */}
|
} catch (IOException ex) {/* expected */}
|
||||||
int logFilesNumber = fs.listStatus(hlogDir).length;
|
int logFilesNumber = fs.listStatus(hlogDir).length;
|
||||||
|
|
||||||
|
@ -549,11 +582,12 @@ public class TestHLogSplit {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
InstrumentedSequenceFileLogWriter.activateFailure = true;
|
InstrumentedSequenceFileLogWriter.activateFailure = true;
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertEquals("java.io.IOException: This exception is instrumented and should only be thrown for testing", e.getMessage());
|
assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage());
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
InstrumentedSequenceFileLogWriter.activateFailure = false;
|
InstrumentedSequenceFileLogWriter.activateFailure = false;
|
||||||
|
@ -561,7 +595,10 @@ public class TestHLogSplit {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// @Test
|
// @Test TODO this test has been disabled since it was created!
|
||||||
|
// It currently fails because the second split doesn't output anything
|
||||||
|
// -- because there are no region dirs after we move aside the first
|
||||||
|
// split result
|
||||||
public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
|
public void testSplittingLargeNumberOfRegionsConsistency() throws IOException {
|
||||||
|
|
||||||
regions.removeAll(regions);
|
regions.removeAll(regions);
|
||||||
|
@ -572,8 +609,9 @@ public class TestHLogSplit {
|
||||||
generateHLogs(1, 100, -1);
|
generateHLogs(1, 100, -1);
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
fs.rename(oldLogDir, hlogDir);
|
fs.rename(oldLogDir, hlogDir);
|
||||||
Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
|
Path firstSplitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME) + ".first");
|
||||||
Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
|
Path splitPath = new Path(hbaseDir, Bytes.toString(TABLE_NAME));
|
||||||
|
@ -582,7 +620,9 @@ public class TestHLogSplit {
|
||||||
|
|
||||||
|
|
||||||
fs.initialize(fs.getUri(), conf);
|
fs.initialize(fs.getUri(), conf);
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
|
assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
|
||||||
}
|
}
|
||||||
|
@ -600,12 +640,162 @@ public class TestHLogSplit {
|
||||||
Path regiondir = new Path(tabledir, region);
|
Path regiondir = new Path(tabledir, region);
|
||||||
fs.delete(regiondir, true);
|
fs.delete(regiondir, true);
|
||||||
|
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
|
||||||
logSplitter.splitLog(hbaseDir, hlogDir, oldLogDir, fs, conf);
|
hbaseDir, hlogDir, oldLogDir, fs);
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
assertFalse(fs.exists(regiondir));
|
assertFalse(fs.exists(regiondir));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIOEOnOutputThread() throws Exception {
|
||||||
|
conf.setBoolean(HBASE_SKIP_ERRORS, false);
|
||||||
|
|
||||||
|
generateHLogs(-1);
|
||||||
|
|
||||||
|
fs.initialize(fs.getUri(), conf);
|
||||||
|
// Set up a splitter that will throw an IOE on the output side
|
||||||
|
HLogSplitter logSplitter = new HLogSplitter(
|
||||||
|
conf, hbaseDir, hlogDir, oldLogDir, fs) {
|
||||||
|
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
|
||||||
|
Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
|
||||||
|
return mockWriter;
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
logSplitter.splitLog();
|
||||||
|
fail("Didn't throw!");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
assertTrue(ioe.toString().contains("Injected"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test log split process with fake data and lots of edits to trigger threading
|
||||||
|
* issues.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testThreading() throws Exception {
|
||||||
|
doTestThreading(20000, 128*1024*1024, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test blocking behavior of the log split process if writers are writing slower
|
||||||
|
* than the reader is reading.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testThreadingSlowWriterSmallBuffer() throws Exception {
|
||||||
|
doTestThreading(200, 1024, 50);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up a log splitter with a mock reader and writer. The mock reader generates
|
||||||
|
* a specified number of edits spread across 5 regions. The mock writer optionally
|
||||||
|
* sleeps for each edit it is fed.
|
||||||
|
* *
|
||||||
|
* After the split is complete, verifies that the statistics show the correct number
|
||||||
|
* of edits output into each region.
|
||||||
|
*
|
||||||
|
* @param numFakeEdits number of fake edits to push through pipeline
|
||||||
|
* @param bufferSize size of in-memory buffer
|
||||||
|
* @param writerSlowness writer threads will sleep this many ms per edit
|
||||||
|
*/
|
||||||
|
private void doTestThreading(final int numFakeEdits,
|
||||||
|
final int bufferSize,
|
||||||
|
final int writerSlowness) throws Exception {
|
||||||
|
|
||||||
|
Configuration localConf = new Configuration(conf);
|
||||||
|
localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
|
||||||
|
|
||||||
|
// Create a fake log file (we'll override the reader to produce a stream of edits)
|
||||||
|
FSDataOutputStream out = fs.create(new Path(hlogDir, HLOG_FILE_PREFIX + ".fake"));
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// Make region dirs for our destination regions so the output doesn't get skipped
|
||||||
|
final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
|
||||||
|
makeRegionDirs(fs, regions);
|
||||||
|
|
||||||
|
// Create a splitter that reads and writes the data without touching disk
|
||||||
|
HLogSplitter logSplitter = new HLogSplitter(
|
||||||
|
localConf, hbaseDir, hlogDir, oldLogDir, fs) {
|
||||||
|
|
||||||
|
/* Produce a mock writer that doesn't write anywhere */
|
||||||
|
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
|
||||||
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
|
int expectedIndex = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) {
|
||||||
|
if (writerSlowness > 0) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(writerSlowness);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
HLog.Entry entry = (Entry) invocation.getArguments()[0];
|
||||||
|
WALEdit edit = entry.getEdit();
|
||||||
|
List<KeyValue> keyValues = edit.getKeyValues();
|
||||||
|
assertEquals(1, keyValues.size());
|
||||||
|
KeyValue kv = keyValues.get(0);
|
||||||
|
|
||||||
|
// Check that the edits come in the right order.
|
||||||
|
assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
|
||||||
|
expectedIndex++;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(mockWriter).append(Mockito.<HLog.Entry>any());
|
||||||
|
return mockWriter;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Produce a mock reader that generates fake entries */
|
||||||
|
protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
Reader mockReader = Mockito.mock(Reader.class);
|
||||||
|
Mockito.doAnswer(new Answer<HLog.Entry>() {
|
||||||
|
int index = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HLog.Entry answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
if (index >= numFakeEdits) return null;
|
||||||
|
|
||||||
|
// Generate r0 through r4 in round robin fashion
|
||||||
|
int regionIdx = index % regions.size();
|
||||||
|
byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
|
||||||
|
|
||||||
|
HLog.Entry ret = createTestEntry(TABLE_NAME, region,
|
||||||
|
Bytes.toBytes((int)(index / regions.size())),
|
||||||
|
FAMILY, QUALIFIER, VALUE, index);
|
||||||
|
index++;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}).when(mockReader).next();
|
||||||
|
return mockReader;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
logSplitter.splitLog();
|
||||||
|
|
||||||
|
// Verify number of written edits per region
|
||||||
|
|
||||||
|
Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
|
||||||
|
for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
|
||||||
|
LOG.info("Got " + entry.getValue() + " output edits for region " +
|
||||||
|
Bytes.toString(entry.getKey()));
|
||||||
|
|
||||||
|
assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
|
||||||
|
}
|
||||||
|
assertEquals(regions.size(), outputCounts.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This thread will keep writing to the file after the split process has started
|
* This thread will keep writing to the file after the split process has started
|
||||||
* It simulates a region server that was considered dead but woke up and wrote
|
* It simulates a region server that was considered dead but woke up and wrote
|
||||||
|
@ -677,29 +867,19 @@ public class TestHLogSplit {
|
||||||
if (stop.get()) {
|
if (stop.get()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
boolean splitStarted = false;
|
Path tableDir = new Path(hbaseDir, new String(TABLE_NAME));
|
||||||
Path p = new Path(hbaseDir, new String(TABLE_NAME));
|
Path regionDir = new Path(tableDir, regions.get(0));
|
||||||
while (!splitStarted) {
|
Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS);
|
||||||
try {
|
|
||||||
FileStatus [] statuses = fs.listStatus(p);
|
|
||||||
// In 0.20, listStatus comes back with a null if file doesn't exit.
|
|
||||||
// In 0.21, it throws FNFE.
|
|
||||||
if (statuses != null && statuses.length > 0) {
|
|
||||||
// Done.
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (FileNotFoundException e) {
|
|
||||||
// Expected in hadoop 0.21
|
|
||||||
} catch (IOException e1) {
|
|
||||||
assertTrue("Failed to list status ", false);
|
|
||||||
}
|
|
||||||
flushToConsole("Juliet: split not started, sleeping a bit...");
|
|
||||||
Threads.sleep(100);
|
|
||||||
}
|
|
||||||
String region = "juliet";
|
String region = "juliet";
|
||||||
Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
|
Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
|
||||||
try {
|
try {
|
||||||
fs.mkdirs(new Path(new Path(hbaseDir, region), region));
|
|
||||||
|
while (!fs.exists(recoveredEdits) && !stop.get()) {
|
||||||
|
flushToConsole("Juliet: split not started, sleeping a bit...");
|
||||||
|
Threads.sleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.mkdirs(new Path(tableDir, region));
|
||||||
HLog.Writer writer = HLog.createWriter(fs,
|
HLog.Writer writer = HLog.createWriter(fs,
|
||||||
julietLog, conf);
|
julietLog, conf);
|
||||||
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
|
appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
|
||||||
|
@ -722,10 +902,15 @@ public class TestHLogSplit {
|
||||||
generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
|
generateHLogs(NUM_WRITERS, ENTRIES, leaveOpen);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
|
private void makeRegionDirs(FileSystem fs, List<String> regions) throws IOException {
|
||||||
for (String region : regions) {
|
for (String region : regions) {
|
||||||
|
flushToConsole("Creating dir for region " + region);
|
||||||
fs.mkdirs(new Path(tabledir, region));
|
fs.mkdirs(new Path(tabledir, region));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException {
|
||||||
|
makeRegionDirs(fs, regions);
|
||||||
for (int i = 0; i < writers; i++) {
|
for (int i = 0; i < writers; i++) {
|
||||||
writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
|
writer[i] = HLog.createWriter(fs, new Path(hlogDir, HLOG_FILE_PREFIX + i), conf);
|
||||||
for (int j = 0; j < entries; j++) {
|
for (int j = 0; j < entries; j++) {
|
||||||
|
@ -835,14 +1020,20 @@ public class TestHLogSplit {
|
||||||
byte[] value, long seq)
|
byte[] value, long seq)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
|
||||||
|
writer.sync();
|
||||||
|
return seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
private HLog.Entry createTestEntry(
|
||||||
|
byte[] table, byte[] region,
|
||||||
|
byte[] row, byte[] family, byte[] qualifier,
|
||||||
|
byte[] value, long seq) {
|
||||||
long time = System.nanoTime();
|
long time = System.nanoTime();
|
||||||
WALEdit edit = new WALEdit();
|
WALEdit edit = new WALEdit();
|
||||||
seq++;
|
seq++;
|
||||||
edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
|
edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
|
||||||
writer.append(new HLog.Entry(new HLogKey(region, table, seq, time), edit));
|
return new HLog.Entry(new HLogKey(region, table, seq, time), edit);
|
||||||
writer.sync();
|
|
||||||
return seq;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -864,6 +1055,14 @@ public class TestHLogSplit {
|
||||||
private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
|
private int compareHLogSplitDirs(Path p1, Path p2) throws IOException {
|
||||||
FileStatus[] f1 = fs.listStatus(p1);
|
FileStatus[] f1 = fs.listStatus(p1);
|
||||||
FileStatus[] f2 = fs.listStatus(p2);
|
FileStatus[] f2 = fs.listStatus(p2);
|
||||||
|
assertNotNull("Path " + p1 + " doesn't exist", f1);
|
||||||
|
assertNotNull("Path " + p2 + " doesn't exist", f2);
|
||||||
|
|
||||||
|
System.out.println("Files in " + p1 + ": " +
|
||||||
|
Joiner.on(",").join(FileUtil.stat2Paths(f1)));
|
||||||
|
System.out.println("Files in " + p2 + ": " +
|
||||||
|
Joiner.on(",").join(FileUtil.stat2Paths(f2)));
|
||||||
|
assertEquals(f1.length, f2.length);
|
||||||
|
|
||||||
for (int i = 0; i < f1.length; i++) {
|
for (int i = 0; i < f1.length; i++) {
|
||||||
// Regions now have a directory named RECOVERED_EDITS_DIR and in here
|
// Regions now have a directory named RECOVERED_EDITS_DIR and in here
|
||||||
|
|
|
@ -487,9 +487,9 @@ public class TestWALReplay {
|
||||||
*/
|
*/
|
||||||
private Path runWALSplit(final Configuration c) throws IOException {
|
private Path runWALSplit(final Configuration c) throws IOException {
|
||||||
FileSystem fs = FileSystem.get(c);
|
FileSystem fs = FileSystem.get(c);
|
||||||
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c);
|
HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
|
||||||
List<Path> splits = logSplitter.splitLog(this.hbaseRootDir, this.logDir,
|
this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
|
||||||
this.oldLogDir, fs, c);
|
List<Path> splits = logSplitter.splitLog();
|
||||||
// Split should generate only 1 file since there's only 1 region
|
// Split should generate only 1 file since there's only 1 region
|
||||||
assertEquals(1, splits.size());
|
assertEquals(1, splits.size());
|
||||||
// Make sure the file exists
|
// Make sure the file exists
|
||||||
|
|
Loading…
Reference in New Issue