diff --git a/CHANGES.txt b/CHANGES.txt
index e058748ca54..ac72e698ff2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,7 @@ Trunk (unreleased changes)
HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
HADOOP-1813 OOME makes zombie of region server
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
+ HADOOP-1820 Regionserver creates hlogs without bound
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables
HADOOP-1834 Scanners ignore timestamp passed on creation
diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java
index 7fe8761b639..c9b833d52ec 100644
--- a/src/java/org/apache/hadoop/hbase/HLog.java
+++ b/src/java/org/apache/hadoop/hbase/HLog.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.conf.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* HLog stores all the edits to the HStore.
@@ -53,6 +56,11 @@ import java.util.concurrent.atomic.AtomicInteger;
* older (smaller) than the most-recent CACHEFLUSH message for every HRegion
* that has a message in F.
*
+ *
synchronized methods can never execute in parallel. However, between the
+ * start of a cache flush and the completion point, appends are allowed but log
+ * rolling is not. To prevent log rolling taking place during this period, a
+ * separate reentrant lock is used.
+ *
*
TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs
* in HDFS is currently flawed. HBase writes edits to logs and to a memcache.
* The 'atomic' write to the log is meant to serve as insurance against
@@ -74,20 +82,21 @@ public class HLog implements HConstants {
SequenceFile.Writer writer;
TreeMap outputfiles = new TreeMap();
- volatile boolean insideCacheFlush = false;
-
- TreeMap regionToLastFlush = new TreeMap();
+ HashMap lastSeqWritten = new HashMap();
volatile boolean closed = false;
- volatile long logSeqNum = 0;
- long filenum = 0;
+ AtomicLong logSeqNum = new AtomicLong(0);
+ volatile long filenum = 0;
AtomicInteger numEntries = new AtomicInteger(0);
- Integer rollLock = new Integer(0);
+ // This lock prevents starting a log roll during a cache flush.
+ // synchronized is insufficient because a cache flush spans two method calls.
+ private final Lock cacheFlushLock = new ReentrantLock();
/**
* Split up a bunch of log files, that are no longer being written to,
- * into new files, one per region. Delete the old log files when ready.
+ * into new files, one per region. Delete the old log files when finished.
+ *
* @param rootDir Root directory of the HBase instance
* @param srcDir Directory of log files to split:
* e.g. ${ROOTDIR}/log_HOST_PORT
@@ -180,109 +189,105 @@ public class HLog implements HConstants {
fs.mkdirs(dir);
rollWriter();
}
-
+
+ /**
+ * Called by HRegionServer when it opens a new region to ensure that log
+ * sequence numbers are always greater than the latest sequence number of
+ * the region being brought on-line.
+ *
+ * @param newvalue
+ */
synchronized void setSequenceNumber(long newvalue) {
- if (newvalue > logSeqNum) {
+ if (newvalue > logSeqNum.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("changing sequence number from " + logSeqNum + " to " +
newvalue);
}
- logSeqNum = newvalue;
+ logSeqNum.set(newvalue);
}
}
/**
* Roll the log writer. That is, start writing log messages to a new file.
- *
- * The 'rollLock' prevents us from entering rollWriter() more than
- * once at a time.
- *
- * The 'this' lock limits access to the current writer so
- * we don't append multiple items simultaneously.
+ *
+ * Because a log cannot be rolled during a cache flush, and a cache flush
+ * spans two method calls, a special lock needs to be obtained so that a
+ * cache flush cannot start when the log is being rolled and the log cannot
+ * be rolled during a cache flush.
+ *
+ * Note that this method cannot be synchronized because it is possible that
+ * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
+ * start which would obtain the lock on this but block on obtaining the
+ * cacheFlushLock and then completeCacheFlush could be called which would
+ * wait for the lock on this and consequently never release the cacheFlushLock
*
* @throws IOException
*/
void rollWriter() throws IOException {
- synchronized(rollLock) {
+ if(closed) {
+ throw new IOException("Cannot roll log; log is closed");
+ }
- // Try to roll the writer to a new file. We may have to
- // wait for a cache-flush to complete. In the process,
- // compute a list of old log files that can be deleted.
+ cacheFlushLock.lock(); // prevent cache flushes
+ try {
+ // Now that we have locked out cache flushes, lock this to prevent other
+ // changes.
- Vector toDeleteList = new Vector();
- synchronized(this) {
- if(closed) {
- throw new IOException("Cannot roll log; log is closed");
- }
-
- // Make sure we do not roll the log while inside a
- // cache-flush. Otherwise, the log sequence number for
- // the CACHEFLUSH operation will appear in a "newer" log file
- // than it should.
- while(insideCacheFlush) {
- try {
- wait();
- } catch (InterruptedException ie) {
- // continue;
- }
- }
-
- // Close the current writer (if any), and grab a new one.
- if(writer != null) {
+ synchronized (this) {
+ if (writer != null) { // Close the current writer (if any), get a new one.
writer.close();
Path p = computeFilename(filenum - 1);
if(LOG.isDebugEnabled()) {
LOG.debug("Closing current log writer " + p.toString() +
- " to get a new one");
+ " to get a new one");
}
if (filenum > 0) {
- outputfiles.put(logSeqNum - 1, p);
+ outputfiles.put(logSeqNum.get() - 1, p);
}
}
Path newPath = computeFilename(filenum++);
- this.writer = SequenceFile.createWriter(fs, conf, newPath,
- HLogKey.class, HLogEdit.class);
- if(LOG.isDebugEnabled()) {
+ this.writer = SequenceFile.createWriter(fs, conf, newPath, HLogKey.class,
+ HLogEdit.class);
+
+ if (LOG.isDebugEnabled()) {
LOG.debug("new log writer created at " + newPath);
}
-
+
// Can we delete any of the old log files?
// First, compute the oldest relevant log operation
// over all the regions.
long oldestOutstandingSeqNum = Long.MAX_VALUE;
- for(Long l: regionToLastFlush.values()) {
+ for (Long l: lastSeqWritten.values()) {
long curSeqNum = l.longValue();
-
- if(curSeqNum < oldestOutstandingSeqNum) {
+
+ if (curSeqNum < oldestOutstandingSeqNum) {
oldestOutstandingSeqNum = curSeqNum;
}
}
- // Next, remove all files with a final ID that's older
- // than the oldest pending region-operation.
- for(Iterator it = outputfiles.keySet().iterator(); it.hasNext();) {
- long maxSeqNum = it.next().longValue();
- if(maxSeqNum < oldestOutstandingSeqNum) {
- Path p = outputfiles.get(maxSeqNum);
- it.remove();
- toDeleteList.add(p);
-
- } else {
- break;
- }
- }
- }
+ // Get the set of all sequence numbers that are older than the oldest
+ // pending region operation
- // Actually delete them, if any!
- for(Iterator it = toDeleteList.iterator(); it.hasNext(); ) {
- Path p = it.next();
- if(LOG.isDebugEnabled()) {
- LOG.debug("removing old log file " + p.toString());
+ TreeSet sequenceNumbers = new TreeSet();
+ sequenceNumbers.addAll(
+ outputfiles.headMap(oldestOutstandingSeqNum).keySet());
+
+ // Remove all files with a final ID that's older than the oldest
+ // pending region-operation.
+
+ for (Long seq: sequenceNumbers) {
+ Path p = outputfiles.remove(seq);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("removing old log file " + p.toString());
+ }
+ fs.delete(p);
}
- fs.delete(p);
+ this.numEntries.set(0);
}
- this.numEntries.set(0);
+
+ } finally {
+ cacheFlushLock.unlock();
}
}
@@ -328,7 +333,9 @@ public class HLog implements HConstants {
* other systems should process the log appropriately upon each startup
* (and prior to initializing HLog).
*
- * We need to seize a lock on the writer so that writes are atomic.
+ * synchronized prevents appends during the completion of a cache flush or
+ * for the duration of a log roll.
+ *
* @param regionName
* @param tableName
* @param row
@@ -337,21 +344,19 @@ public class HLog implements HConstants {
* @throws IOException
*/
synchronized void append(Text regionName, Text tableName, Text row,
- TreeMap columns, long timestamp)
- throws IOException {
+ TreeMap columns, long timestamp) throws IOException {
if(closed) {
throw new IOException("Cannot append; log is closed");
}
-
- long seqNum[] = obtainSeqNum(columns.size());
- // The 'regionToLastFlush' map holds the sequence id of the
- // most recent flush for every regionName. However, for regions
- // that don't have any flush yet, the relevant operation is the
- // first one that's been added.
- if (regionToLastFlush.get(regionName) == null) {
- regionToLastFlush.put(regionName, seqNum[0]);
- }
+ long seqNum[] = obtainSeqNum(columns.size());
+
+ // The 'lastSeqWritten' map holds the sequence number of the most recent
+ // write for each region. When the cache is flushed, the entry for the
+ // region being flushed is removed if the sequence number of the flush
+ // is greater than or equal to the value in lastSeqWritten
+
+ lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
int counter = 0;
for (Map.Entry es: columns.entrySet()) {
@@ -363,29 +368,39 @@ public class HLog implements HConstants {
}
}
- /** @return How many items have been added to the log */
+ /**
+ * @return How many items have been added to the log
+ *
+ * Because numEntries is an AtomicInteger, no locking is required.
+ */
int getNumEntries() {
return numEntries.get();
}
/**
- * Obtain a log sequence number. This seizes the whole HLog
- * lock, but it shouldn't last too long.
+ * Obtain a log sequence number.
+ *
+ * Because it is only called from a synchronized method, no additional locking
+ * is required.
*/
- synchronized long obtainSeqNum() {
- return logSeqNum++;
+ private long obtainSeqNum() {
+ return logSeqNum.getAndIncrement();
}
/**
* Obtain a specified number of sequence numbers
*
+ * Because it is only called from a synchronized method, no additional locking
+ * is required.
+ *
* @param num - number of sequence numbers to obtain
* @return - array of sequence numbers
*/
- synchronized long[] obtainSeqNum(int num) {
+ private long[] obtainSeqNum(int num) {
+ long sequenceNumber = logSeqNum.getAndAdd(num);
long[] results = new long[num];
for (int i = 0; i < num; i++) {
- results[i] = logSeqNum++;
+ results[i] = sequenceNumber++;
}
return results;
}
@@ -394,54 +409,50 @@ public class HLog implements HConstants {
* By acquiring a log sequence ID, we can allow log messages
* to continue while we flush the cache.
*
- * Set a flag so that we do not roll the log between the start
- * and complete of a cache-flush. Otherwise the log-seq-id for
+ * Acquire a lock so that we do not roll the log between the start
+ * and completion of a cache-flush. Otherwise the log-seq-id for
* the flush will not appear in the correct logfile.
+ *
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
* @see #completeCacheFlush(Text, Text, long)
* @see #abortCacheFlush()
*/
synchronized long startCacheFlush() {
- while (this.insideCacheFlush) {
- try {
- wait();
- } catch (InterruptedException ie) {
- // continue
- }
- }
- this.insideCacheFlush = true;
- notifyAll();
+ cacheFlushLock.lock();
return obtainSeqNum();
}
- /** Complete the cache flush
+ /**
+ * Complete the cache flush
+ *
+ * Protected by this.lock()
+ *
* @param regionName
* @param tableName
* @param logSeqId
* @throws IOException
*/
synchronized void completeCacheFlush(final Text regionName,
- final Text tableName, final long logSeqId)
- throws IOException {
- if(this.closed) {
- return;
- }
-
- if (!this.insideCacheFlush) {
- throw new IOException("Impossible situation: inside " +
- "completeCacheFlush(), but 'insideCacheFlush' flag is false");
- }
- HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
- this.writer.append(key,
- new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
- System.currentTimeMillis()));
- this.numEntries.getAndIncrement();
+ final Text tableName, final long logSeqId) throws IOException {
- // Remember the most-recent flush for each region.
- // This is used to delete obsolete log files.
- this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId));
+ try {
+ if(this.closed) {
+ return;
+ }
+
+ writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
+ new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
+ System.currentTimeMillis()));
+
+ numEntries.getAndIncrement();
+ Long seq = lastSeqWritten.get(regionName);
+ if (seq != null && logSeqId >= seq) {
+ lastSeqWritten.remove(regionName);
+ }
- cleanup();
+ } finally {
+ cacheFlushLock.unlock();
+ }
}
/**
@@ -451,23 +462,8 @@ public class HLog implements HConstants {
* is a restart of the regionserver so the snapshot content dropped by the
* failure gets restored to the memcache.
*/
- synchronized void abortCacheFlush() {
- cleanup();
- }
-
- private synchronized void cleanup() {
- this.insideCacheFlush = false;
- notifyAll();
- }
-
- /**
- * Abort a cache flush.
- * This method will clear waits on {@link #insideCacheFlush} but if this
- * method is called, we are losing data. TODO: Fix.
- */
- synchronized void abort() {
- this.insideCacheFlush = false;
- notifyAll();
+ void abortCacheFlush() {
+ this.cacheFlushLock.unlock();
}
private static void usage() {
diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java
index 69911669920..c02e6a73db9 100644
--- a/src/java/org/apache/hadoop/hbase/HRegion.java
+++ b/src/java/org/apache/hadoop/hbase/HRegion.java
@@ -210,6 +210,7 @@ public class HRegion implements HConstants {
final int memcacheFlushSize;
final int blockingMemcacheSize;
protected final long threadWakeFrequency;
+ protected final int optionalFlushCount;
private final HLocking lock = new HLocking();
private long desiredMaxFileSize;
private final long maxSequenceId;
@@ -247,6 +248,8 @@ public class HRegion implements HConstants {
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.optionalFlushCount =
+ conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
@@ -728,11 +731,13 @@ public class HRegion implements HConstants {
void optionallyFlush() throws IOException {
if(this.memcache.getSize() > this.memcacheFlushSize) {
flushcache(false);
- } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) {
- LOG.info("Optional flush called " + this.noFlushCount +
- " times when data present without flushing. Forcing one.");
- flushcache(false);
- if (this.memcache.getSize() > 0) {
+ } else if (this.memcache.getSize() > 0) {
+ if (this.noFlushCount >= this.optionalFlushCount) {
+ LOG.info("Optional flush called " + this.noFlushCount +
+ " times when data present without flushing. Forcing one.");
+ flushcache(false);
+
+ } else {
// Only increment if something in the cache.
// Gets zero'd when a flushcache is called.
this.noFlushCount++;
@@ -864,25 +869,31 @@ public class HRegion implements HConstants {
retval.memcacheSnapshot.size());
}
- // A. Flush memcache to all the HStores.
- // Keep running vector of all store files that includes both old and the
- // just-made new flush store file.
- for (HStore hstore: stores.values()) {
- hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+ try {
+ // A. Flush memcache to all the HStores.
+ // Keep running vector of all store files that includes both old and the
+ // just-made new flush store file.
+ for (HStore hstore: stores.values()) {
+ hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+ }
+ } catch (IOException e) {
+ // An exception here means that the snapshot was not persisted.
+ // The hlog needs to be replayed so its content is restored to memcache.
+ // Currently, only a server restart will do this.
+ this.log.abortCacheFlush();
+ throw new DroppedSnapshotException(e.getMessage());
}
+ // If we get to here, the HStores have been written. If we get an
+ // error in completeCacheFlush it will release the lock it is holding
+
// B. Write a FLUSHCACHE-COMPLETE message to the log.
// This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.regionName,
- regionInfo.tableDesc.getName(), logCacheFlushId);
- } catch (IOException e) {
- // An exception here means that the snapshot was not persisted.
- // The hlog needs to be replayed so its content is restored to memcache.
- // Currently, only a server restart will do this.
- this.log.abortCacheFlush();
- throw new DroppedSnapshotException(e.getMessage());
+ regionInfo.tableDesc.getName(), logCacheFlushId);
+
} finally {
// C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores or, if error, clear aborted snapshot.
diff --git a/src/test/org/apache/hadoop/hbase/TestDFSAbort.java b/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
index 64e5ca1cb8b..47bb56624e1 100644
--- a/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
+++ b/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
@@ -33,8 +33,8 @@ public class TestDFSAbort extends HBaseClusterTestCase {
/** constructor */
public TestDFSAbort() {
super();
- conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
- conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
+// conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout
+// conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
}
diff --git a/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java b/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
index f4a2151c22a..e2777e93577 100644
--- a/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
+++ b/src/test/org/apache/hadoop/hbase/TestHBaseCluster.java
@@ -25,6 +25,8 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.io.Text;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
/**
* Test HBase Master and Region servers, client API
@@ -41,6 +43,14 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
this.desc = null;
this.admin = null;
this.table = null;
+ Logger.getRootLogger().setLevel(Level.INFO);
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+ conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+
+ // Increase the amount of time between client retries
+ conf.setLong("hbase.client.pause", 15 * 1000);
}
/**
diff --git a/src/test/org/apache/hadoop/hbase/TestLogRolling.java b/src/test/org/apache/hadoop/hbase/TestLogRolling.java
new file mode 100644
index 00000000000..aea2f8764ee
--- /dev/null
+++ b/src/test/org/apache/hadoop/hbase/TestLogRolling.java
@@ -0,0 +1,174 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Test log deletion as logs are rolled.
+ */
+public class TestLogRolling extends HBaseTestCase {
+ private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
+ private MiniDFSCluster dfs;
+ private MiniHBaseCluster cluster;
+ private Path logdir;
+ private String tableName;
+ private byte[] value;
+
+ /**
+ * constructor
+ * @throws Exception
+ */
+ public TestLogRolling() throws Exception {
+ super();
+ this.dfs = null;
+ this.cluster = null;
+ this.logdir = null;
+ this.tableName = null;
+ this.value = null;
+
+ // We roll the log after every 256 writes
+ conf.setInt("hbase.regionserver.maxlogentries", 256);
+
+ // For less frequently updated regions flush after every 2 flushes
+ conf.setInt("hbase.hregion.memcache.optionalflushcount", 2);
+
+ // We flush the cache after every 8192 bytes
+ conf.setInt("hbase.hregion.memcache.flush.size", 8192);
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+ conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+
+ // Increase the amount of time between client retries
+ conf.setLong("hbase.client.pause", 15 * 1000);
+
+ String className = this.getClass().getName();
+ StringBuilder v = new StringBuilder(className);
+ while (v.length() < 1000) {
+ v.append(className);
+ }
+ value = v.toString().getBytes(HConstants.UTF8_ENCODING);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ dfs = new MiniDFSCluster(conf, 2, true, (String[]) null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ if (cluster != null) { // shutdown mini HBase cluster
+ cluster.shutdown();
+ }
+
+ assertEquals(0, countLogFiles(true));
+
+ if (dfs != null) { // shutdown mini DFS cluster
+ FileSystem fs = dfs.getFileSystem();
+ try {
+ dfs.shutdown();
+ } finally {
+ fs.close();
+ }
+ }
+ }
+
+ private void startAndWriteData() throws Exception {
+ cluster = new MiniHBaseCluster(conf, 1, dfs);
+ logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
+
+ // When the META table can be opened, the region servers are running
+ @SuppressWarnings("unused")
+ HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
+
+ // Create the test table and open it
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ HTable table = new HTable(conf, new Text(tableName));
+
+ for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls
+ long lockid =
+ table.startUpdate(new Text("row" + String.format("%1$04d", i)));
+ table.put(lockid, HConstants.COLUMN_FAMILY, value);
+ table.commit(lockid);
+
+ if (i % 256 == 0) {
+ // After every 256 writes sleep to let the log roller run
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+
+ private int countLogFiles(boolean print) throws IOException {
+ Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir});
+ if (print) {
+ for (int i = 0; i < logfiles.length; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("logfile: " + logfiles[i].toString());
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("number of log files: " + logfiles.length);
+ }
+ return logfiles.length;
+ }
+
+ /**
+ * Tests that logs are deleted
+ *
+ * @throws Exception
+ */
+ public void testLogRolling() throws Exception {
+ tableName = getName();
+ // Force a region split after every 768KB
+ conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
+ startAndWriteData();
+ LOG.info("Finished writing. Sleeping to let cache flusher and log roller run");
+ try {
+ // Wait for log roller and cache flusher to run a few times...
+ Thread.sleep(30L * 1000L);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted", e);
+ }
+ LOG.info("Wake from sleep");
+ assertTrue(countLogFiles(true) <= 2);
+ }
+
+}
diff --git a/src/test/org/apache/hadoop/hbase/TestSplit.java b/src/test/org/apache/hadoop/hbase/TestSplit.java
index dab41a8f3f7..405415aa2ee 100644
--- a/src/test/org/apache/hadoop/hbase/TestSplit.java
+++ b/src/test/org/apache/hadoop/hbase/TestSplit.java
@@ -38,6 +38,15 @@ public class TestSplit extends MultiRegionTable {
/** constructor */
public TestSplit() {
+ super();
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+ conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+
+ // Increase the amount of time between client retries
+ conf.setLong("hbase.client.pause", 15 * 1000);
+
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger(this.getClass().getPackage().getName()).
setLevel(Level.DEBUG);
diff --git a/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java b/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
index 632f9305e71..2d5a8ada072 100644
--- a/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
+++ b/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
@@ -76,6 +76,15 @@ public class TestTableMapReduce extends MultiRegionTable {
fail();
}
}
+
+ /** constructor */
+ public TestTableMapReduce() {
+ super();
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+ conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+ }
/**
* {@inheritDoc}