HBASE-1484 commit log split writes files with newest edits first (since hbase-1430); should be other way round
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@781905 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4346eec59d
commit
c72bd42fb4
|
@ -86,9 +86,7 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1310 Off by one error in Bytes.vintToBytes
|
||||
HBASE-1202 getRow does not always work when specifying number of versions
|
||||
HBASE-1324 hbase-1234 broke testget2 unit test (and broke the build)
|
||||
HBASE-1321 hbase-1234 broke TestCompaction; fix and reenable
|
||||
HBASE-1330 binary keys broken on trunk (Ryan Rawson via Stack)
|
||||
HBASE-1332 regionserver carrying .META. starts sucking all cpu, drives load
|
||||
HBASE-1321 hbase-1234 broke TestCompaction; fix and reenable HBASE-1330 binary keys broken on trunk (Ryan Rawson via Stack) HBASE-1332 regionserver carrying .META. starts sucking all cpu, drives load
|
||||
up - infinite loop? (Ryan Rawson via Stack)
|
||||
HBASE-1334 .META. region running into hfile errors (Ryan Rawson via Stack)
|
||||
HBASE-1338 lost use of compaction.dir; we were compacting into live store
|
||||
|
@ -163,6 +161,8 @@ Release 0.20.0 - Unreleased
|
|||
HBASE-1471 During cluster shutdown, deleting zookeeper regionserver nodes
|
||||
causes exceptions
|
||||
HBASE-1483 HLog split loses track of edits (Clint Morgan via Stack)
|
||||
HBASE-1484 commit log split writes files with newest edits first
|
||||
(since hbase-1430); should be other way round
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
|
||||
|
|
|
@ -23,9 +23,11 @@ import java.io.EOFException;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
@ -96,7 +98,7 @@ import org.apache.hadoop.io.compress.DefaultCodec;
|
|||
*
|
||||
*/
|
||||
public class HLog implements HConstants, Syncable {
|
||||
private static final Log LOG = LogFactory.getLog(HLog.class);
|
||||
static final Log LOG = LogFactory.getLog(HLog.class);
|
||||
private static final String HLOG_DATFILE = "hlog.dat.";
|
||||
static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
|
||||
static final byte [] METAROW = Bytes.toBytes("METAROW");
|
||||
|
@ -224,7 +226,7 @@ public class HLog implements HConstants, Syncable {
|
|||
* @param c Configuration to use.
|
||||
* @return the kind of compression to use
|
||||
*/
|
||||
private static CompressionType getCompressionType(final Configuration c) {
|
||||
static CompressionType getCompressionType(final Configuration c) {
|
||||
// Compression makes no sense for commit log. Always return NONE.
|
||||
return CompressionType.NONE;
|
||||
}
|
||||
|
@ -551,6 +553,7 @@ public class HLog implements HConstants, Syncable {
|
|||
for (KeyValue kv: edits) {
|
||||
HLogKey logKey =
|
||||
new HLogKey(regionName, tableName, seqNum[counter++], now);
|
||||
System.out.println("REMOVE " + logKey);
|
||||
doWrite(logKey, kv, sync, now);
|
||||
this.numEntries.incrementAndGet();
|
||||
}
|
||||
|
@ -733,22 +736,23 @@ public class HLog implements HConstants, Syncable {
|
|||
* @param conf HBaseConfiguration
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void splitLog(final Path rootDir, final Path srcDir,
|
||||
public static List<Path> splitLog(final Path rootDir, final Path srcDir,
|
||||
final FileSystem fs, final Configuration conf)
|
||||
throws IOException {
|
||||
long millis = System.currentTimeMillis();
|
||||
List<Path> splits = null;
|
||||
if (!fs.exists(srcDir)) {
|
||||
// Nothing to do
|
||||
return;
|
||||
return splits;
|
||||
}
|
||||
FileStatus [] logfiles = fs.listStatus(srcDir);
|
||||
if (logfiles == null || logfiles.length == 0) {
|
||||
// Nothing to do
|
||||
return;
|
||||
return splits;
|
||||
}
|
||||
LOG.info("Splitting " + logfiles.length + " hlog(s) in " +
|
||||
srcDir.toString());
|
||||
splitLog(rootDir, logfiles, fs, conf);
|
||||
splits = splitLog(rootDir, logfiles, fs, conf);
|
||||
try {
|
||||
fs.delete(srcDir, true);
|
||||
} catch (IOException e) {
|
||||
|
@ -760,6 +764,17 @@ public class HLog implements HConstants, Syncable {
|
|||
long endMillis = System.currentTimeMillis();
|
||||
LOG.info("hlog file splitting completed in " + (endMillis - millis) +
|
||||
" millis for " + srcDir.toString());
|
||||
return splits;
|
||||
}
|
||||
|
||||
// Private immutable datastructure to hold Writer and its Path.
|
||||
private final static class WriterAndPath {
|
||||
final Path p;
|
||||
final SequenceFile.Writer w;
|
||||
WriterAndPath(final Path p, final SequenceFile.Writer w) {
|
||||
this.p = p;
|
||||
this.w = w;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -768,13 +783,14 @@ public class HLog implements HConstants, Syncable {
|
|||
* @param fs
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
* @return List of splits made.
|
||||
*/
|
||||
private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
|
||||
final FileSystem fs, final Configuration conf)
|
||||
private static List<Path> splitLog(final Path rootDir,
|
||||
final FileStatus [] logfiles, final FileSystem fs, final Configuration conf)
|
||||
throws IOException {
|
||||
final Map<byte [], SequenceFile.Writer> logWriters =
|
||||
new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
final Map<byte [], WriterAndPath> logWriters =
|
||||
new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR);
|
||||
List<Path> splits = null;
|
||||
try {
|
||||
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
|
||||
DEFAULT_NUMBER_CONCURRENT_LOG_READS)).intValue();
|
||||
|
@ -796,7 +812,7 @@ public class HLog implements HConstants, Syncable {
|
|||
// HADOOP-4751 is committed.
|
||||
long length = logfiles[i].getLen();
|
||||
SequenceFile.Reader in = null;
|
||||
int count = 0;
|
||||
int count = 0;
|
||||
try {
|
||||
in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
|
||||
try {
|
||||
|
@ -810,7 +826,9 @@ public class HLog implements HConstants, Syncable {
|
|||
LOG.debug("Adding queue for " + Bytes.toString(regionName));
|
||||
logEntries.put(regionName, queue);
|
||||
}
|
||||
queue.push(new HLogEntry(val, key));
|
||||
HLogEntry hle = new HLogEntry(val, key);
|
||||
System.out.println("REMOVE !! " + hle);
|
||||
queue.push(hle);
|
||||
count++;
|
||||
// Make the key and value new each time; otherwise same instance
|
||||
// is used over and over.
|
||||
|
@ -853,7 +871,6 @@ public class HLog implements HConstants, Syncable {
|
|||
ExecutorService threadPool =
|
||||
Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
|
||||
for (final byte[] key : logEntries.keySet()) {
|
||||
|
||||
Thread thread = new Thread(Bytes.toString(key)) {
|
||||
public void run() {
|
||||
LinkedList<HLogEntry> entries = logEntries.get(key);
|
||||
|
@ -861,9 +878,14 @@ public class HLog implements HConstants, Syncable {
|
|||
long threadTime = System.currentTimeMillis();
|
||||
try {
|
||||
int count = 0;
|
||||
for (HLogEntry logEntry : entries) {
|
||||
SequenceFile.Writer w = logWriters.get(key);
|
||||
if (w == null) {
|
||||
// Items were added to the linkedlist oldest first. Pull them
|
||||
// out in that order.
|
||||
for (ListIterator<HLogEntry> i =
|
||||
entries.listIterator(entries.size());
|
||||
i.hasPrevious();) {
|
||||
HLogEntry logEntry = i.previous();
|
||||
WriterAndPath wap = logWriters.get(key);
|
||||
if (wap == null) {
|
||||
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
|
||||
.getTableDir(rootDir, logEntry.getKey().getTablename()),
|
||||
HRegionInfo.encodeRegionName(key)),
|
||||
|
@ -877,9 +899,11 @@ public class HLog implements HConstants, Syncable {
|
|||
fs.rename(logfile, oldlogfile);
|
||||
old = new SequenceFile.Reader(fs, oldlogfile, conf);
|
||||
}
|
||||
w = SequenceFile.createWriter(fs, conf, logfile,
|
||||
SequenceFile.Writer w =
|
||||
SequenceFile.createWriter(fs, conf, logfile,
|
||||
HLogKey.class, KeyValue.class, getCompressionType(conf));
|
||||
logWriters.put(key, w);
|
||||
wap = new WriterAndPath(logfile, w);
|
||||
logWriters.put(key, wap);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new hlog file writer for path "
|
||||
+ logfile + " and region " + Bytes.toString(key));
|
||||
|
@ -900,7 +924,10 @@ public class HLog implements HConstants, Syncable {
|
|||
fs.delete(oldlogfile, true);
|
||||
}
|
||||
}
|
||||
w.append(logEntry.getKey(), logEntry.getEdit());
|
||||
if (wap == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
wap.w.append(logEntry.getKey(), logEntry.getEdit());
|
||||
count++;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -929,10 +956,13 @@ public class HLog implements HConstants, Syncable {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
for (SequenceFile.Writer w : logWriters.values()) {
|
||||
w.close();
|
||||
splits = new ArrayList<Path>(logWriters.size());
|
||||
for (WriterAndPath wap : logWriters.values()) {
|
||||
wap.w.close();
|
||||
splits.add(wap.p);
|
||||
}
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -967,6 +997,9 @@ public class HLog implements HConstants, Syncable {
|
|||
return key;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return this.key + "=" + this.edit;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.SequenceFile.Reader;
|
||||
|
||||
|
||||
/** JUnit test case for HLog */
|
||||
public class TestHLog extends HBaseTestCase implements HConstants {
|
||||
private Path dir;
|
||||
|
@ -68,22 +69,26 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
final byte [] tableName = Bytes.toBytes(getName());
|
||||
final byte [] rowName = tableName;
|
||||
HLog log = new HLog(this.fs, this.dir, this.conf, null);
|
||||
final int howmany = 3;
|
||||
// Add edits for three regions.
|
||||
try {
|
||||
for (int ii = 0; ii < 3; ii++) {
|
||||
for (int i = 0; i < 3; i++) {
|
||||
for (int j = 0; j < 3; j++) {
|
||||
for (int ii = 0; ii < howmany; ii++) {
|
||||
for (int i = 0; i < howmany; i++) {
|
||||
for (int j = 0; j < howmany; j++) {
|
||||
List<KeyValue> edit = new ArrayList<KeyValue>();
|
||||
byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
|
||||
edit.add(new KeyValue(rowName, column, System.currentTimeMillis(),
|
||||
column));
|
||||
log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit,
|
||||
System.out.println("Region " + i + ": " + edit);
|
||||
log.append(Bytes.toBytes("" + i), tableName, edit,
|
||||
false, System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
log.rollWriter();
|
||||
}
|
||||
HLog.splitLog(this.testDir, this.dir, this.fs, this.conf);
|
||||
List<Path> splits =
|
||||
HLog.splitLog(this.testDir, this.dir, this.fs, this.conf);
|
||||
verifySplits(splits, howmany);
|
||||
log = null;
|
||||
} finally {
|
||||
if (log != null) {
|
||||
|
@ -92,6 +97,37 @@ public class TestHLog extends HBaseTestCase implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
private void verifySplits(List<Path> splits, final int howmany)
|
||||
throws IOException {
|
||||
assertEquals(howmany, splits.size());
|
||||
for (int i = 0; i < splits.size(); i++) {
|
||||
SequenceFile.Reader r =
|
||||
new SequenceFile.Reader(this.fs, splits.get(i), this.conf);
|
||||
try {
|
||||
HLogKey key = new HLogKey();
|
||||
KeyValue kv = new KeyValue();
|
||||
int count = 0;
|
||||
String previousRegion = null;
|
||||
long seqno = -1;
|
||||
while(r.next(key, kv)) {
|
||||
String region = Bytes.toString(key.getRegionName());
|
||||
// Assert that all edits are for same region.
|
||||
if (previousRegion != null) {
|
||||
assertEquals(previousRegion, region);
|
||||
}
|
||||
assertTrue(seqno < key.getLogSeqNum());
|
||||
seqno = key.getLogSeqNum();
|
||||
previousRegion = region;
|
||||
System.out.println(key + " " + kv);
|
||||
count++;
|
||||
}
|
||||
assertEquals(howmany * howmany, count);
|
||||
} finally {
|
||||
r.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue