diff --git a/CHANGES.txt b/CHANGES.txt index 2191fe01c83..0b4c0b7e8e9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -86,9 +86,7 @@ Release 0.20.0 - Unreleased HBASE-1310 Off by one error in Bytes.vintToBytes HBASE-1202 getRow does not always work when specifying number of versions HBASE-1324 hbase-1234 broke testget2 unit test (and broke the build) - HBASE-1321 hbase-1234 broke TestCompaction; fix and reenable - HBASE-1330 binary keys broken on trunk (Ryan Rawson via Stack) - HBASE-1332 regionserver carrying .META. starts sucking all cpu, drives load + HBASE-1321 hbase-1234 broke TestCompaction; fix and reenable HBASE-1330 binary keys broken on trunk (Ryan Rawson via Stack) HBASE-1332 regionserver carrying .META. starts sucking all cpu, drives load up - infinite loop? (Ryan Rawson via Stack) HBASE-1334 .META. region running into hfile errors (Ryan Rawson via Stack) HBASE-1338 lost use of compaction.dir; we were compacting into live store @@ -163,6 +161,8 @@ Release 0.20.0 - Unreleased HBASE-1471 During cluster shutdown, deleting zookeeper regionserver nodes causes exceptions HBASE-1483 HLog split loses track of edits (Clint Morgan via Stack) + HBASE-1484 commit log split writes files with newest edits first + (since hbase-1430); should be other way round IMPROVEMENTS HBASE-1089 Add count of regions on filesystem to master UI; add percentage diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java index cd76af936a5..6c6745629d8 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java @@ -23,9 +23,11 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; @@ -96,7 +98,7 @@ import org.apache.hadoop.io.compress.DefaultCodec; * */ public class HLog implements HConstants, Syncable { - private static final Log LOG = LogFactory.getLog(HLog.class); + static final Log LOG = LogFactory.getLog(HLog.class); private static final String HLOG_DATFILE = "hlog.dat."; static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:"); static final byte [] METAROW = Bytes.toBytes("METAROW"); @@ -224,7 +226,7 @@ public class HLog implements HConstants, Syncable { * @param c Configuration to use. * @return the kind of compression to use */ - private static CompressionType getCompressionType(final Configuration c) { + static CompressionType getCompressionType(final Configuration c) { // Compression makes no sense for commit log. Always return NONE. return CompressionType.NONE; } @@ -551,6 +553,7 @@ public class HLog implements HConstants, Syncable { for (KeyValue kv: edits) { HLogKey logKey = new HLogKey(regionName, tableName, seqNum[counter++], now); + System.out.println("REMOVE " + logKey); doWrite(logKey, kv, sync, now); this.numEntries.incrementAndGet(); } @@ -733,22 +736,23 @@ public class HLog implements HConstants, Syncable { * @param conf HBaseConfiguration * @throws IOException */ - public static void splitLog(final Path rootDir, final Path srcDir, + public static List splitLog(final Path rootDir, final Path srcDir, final FileSystem fs, final Configuration conf) throws IOException { long millis = System.currentTimeMillis(); + List splits = null; if (!fs.exists(srcDir)) { // Nothing to do - return; + return splits; } FileStatus [] logfiles = fs.listStatus(srcDir); if (logfiles == null || logfiles.length == 0) { // Nothing to do - return; + return splits; } LOG.info("Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); - splitLog(rootDir, logfiles, fs, conf); + splits = splitLog(rootDir, logfiles, fs, conf); try { fs.delete(srcDir, true); } catch (IOException e) { @@ -760,21 +764,33 @@ public class HLog implements HConstants, Syncable { long endMillis = System.currentTimeMillis(); LOG.info("hlog file splitting completed in " + (endMillis - millis) + " millis for " + srcDir.toString()); + return splits; } - + + // Private immutable datastructure to hold Writer and its Path. + private final static class WriterAndPath { + final Path p; + final SequenceFile.Writer w; + WriterAndPath(final Path p, final SequenceFile.Writer w) { + this.p = p; + this.w = w; + } + } + /* * @param rootDir * @param logfiles * @param fs * @param conf * @throws IOException + * @return List of splits made. */ - private static void splitLog(final Path rootDir, final FileStatus [] logfiles, - final FileSystem fs, final Configuration conf) + private static List splitLog(final Path rootDir, + final FileStatus [] logfiles, final FileSystem fs, final Configuration conf) throws IOException { - final Map logWriters = - new TreeMap(Bytes.BYTES_COMPARATOR); - + final Map logWriters = + new TreeMap(Bytes.BYTES_COMPARATOR); + List splits = null; try { int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / DEFAULT_NUMBER_CONCURRENT_LOG_READS)).intValue(); @@ -796,7 +812,7 @@ public class HLog implements HConstants, Syncable { // HADOOP-4751 is committed. long length = logfiles[i].getLen(); SequenceFile.Reader in = null; - int count = 0; + int count = 0; try { in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf); try { @@ -810,7 +826,9 @@ public class HLog implements HConstants, Syncable { LOG.debug("Adding queue for " + Bytes.toString(regionName)); logEntries.put(regionName, queue); } - queue.push(new HLogEntry(val, key)); + HLogEntry hle = new HLogEntry(val, key); + System.out.println("REMOVE !! " + hle); + queue.push(hle); count++; // Make the key and value new each time; otherwise same instance // is used over and over. @@ -853,7 +871,6 @@ public class HLog implements HConstants, Syncable { ExecutorService threadPool = Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD); for (final byte[] key : logEntries.keySet()) { - Thread thread = new Thread(Bytes.toString(key)) { public void run() { LinkedList entries = logEntries.get(key); @@ -861,9 +878,14 @@ public class HLog implements HConstants, Syncable { long threadTime = System.currentTimeMillis(); try { int count = 0; - for (HLogEntry logEntry : entries) { - SequenceFile.Writer w = logWriters.get(key); - if (w == null) { + // Items were added to the linkedlist oldest first. Pull them + // out in that order. + for (ListIterator i = + entries.listIterator(entries.size()); + i.hasPrevious();) { + HLogEntry logEntry = i.previous(); + WriterAndPath wap = logWriters.get(key); + if (wap == null) { Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor .getTableDir(rootDir, logEntry.getKey().getTablename()), HRegionInfo.encodeRegionName(key)), @@ -877,9 +899,11 @@ public class HLog implements HConstants, Syncable { fs.rename(logfile, oldlogfile); old = new SequenceFile.Reader(fs, oldlogfile, conf); } - w = SequenceFile.createWriter(fs, conf, logfile, + SequenceFile.Writer w = + SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, KeyValue.class, getCompressionType(conf)); - logWriters.put(key, w); + wap = new WriterAndPath(logfile, w); + logWriters.put(key, wap); if (LOG.isDebugEnabled()) { LOG.debug("Creating new hlog file writer for path " + logfile + " and region " + Bytes.toString(key)); @@ -900,7 +924,10 @@ public class HLog implements HConstants, Syncable { fs.delete(oldlogfile, true); } } - w.append(logEntry.getKey(), logEntry.getEdit()); + if (wap == null) { + throw new NullPointerException(); + } + wap.w.append(logEntry.getKey(), logEntry.getEdit()); count++; } if (LOG.isDebugEnabled()) { @@ -929,12 +956,15 @@ public class HLog implements HConstants, Syncable { } } } finally { - for (SequenceFile.Writer w : logWriters.values()) { - w.close(); + splits = new ArrayList(logWriters.size()); + for (WriterAndPath wap : logWriters.values()) { + wap.w.close(); + splits.add(wap.p); } } + return splits; } - + /** * Utility class that lets us keep track of the edit with it's key * Only used when splitting logs @@ -967,6 +997,9 @@ public class HLog implements HConstants, Syncable { return key; } + public String toString() { + return this.key + "=" + this.edit; + } } /** diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java b/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java index 54a7f4472d2..bd53e3c2996 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestHLog.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Reader; + /** JUnit test case for HLog */ public class TestHLog extends HBaseTestCase implements HConstants { private Path dir; @@ -68,22 +69,26 @@ public class TestHLog extends HBaseTestCase implements HConstants { final byte [] tableName = Bytes.toBytes(getName()); final byte [] rowName = tableName; HLog log = new HLog(this.fs, this.dir, this.conf, null); + final int howmany = 3; // Add edits for three regions. try { - for (int ii = 0; ii < 3; ii++) { - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { + for (int ii = 0; ii < howmany; ii++) { + for (int i = 0; i < howmany; i++) { + for (int j = 0; j < howmany; j++) { List edit = new ArrayList(); byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); edit.add(new KeyValue(rowName, column, System.currentTimeMillis(), column)); - log.append(Bytes.toBytes(Integer.toString(i)), tableName, edit, + System.out.println("Region " + i + ": " + edit); + log.append(Bytes.toBytes("" + i), tableName, edit, false, System.currentTimeMillis()); } } log.rollWriter(); } - HLog.splitLog(this.testDir, this.dir, this.fs, this.conf); + List splits = + HLog.splitLog(this.testDir, this.dir, this.fs, this.conf); + verifySplits(splits, howmany); log = null; } finally { if (log != null) { @@ -92,6 +97,37 @@ public class TestHLog extends HBaseTestCase implements HConstants { } } + private void verifySplits(List splits, final int howmany) + throws IOException { + assertEquals(howmany, splits.size()); + for (int i = 0; i < splits.size(); i++) { + SequenceFile.Reader r = + new SequenceFile.Reader(this.fs, splits.get(i), this.conf); + try { + HLogKey key = new HLogKey(); + KeyValue kv = new KeyValue(); + int count = 0; + String previousRegion = null; + long seqno = -1; + while(r.next(key, kv)) { + String region = Bytes.toString(key.getRegionName()); + // Assert that all edits are for same region. + if (previousRegion != null) { + assertEquals(previousRegion, region); + } + assertTrue(seqno < key.getLogSeqNum()); + seqno = key.getLogSeqNum(); + previousRegion = region; + System.out.println(key + " " + kv); + count++; + } + assertEquals(howmany * howmany, count); + } finally { + r.close(); + } + } + } + /** * @throws IOException */