diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 2af14b88518..bf6ad394c58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1125,6 +1125,7 @@ class FSHLog implements HLog, Syncable { pending = logSyncer.getPendingWrites(); try { logSyncer.hlogFlush(tempWriter, pending); + postAppend(pending); } catch(IOException io) { ioe = io; LOG.error("syncer encountered error, will retry. txid=" + txid, ioe); @@ -1136,6 +1137,7 @@ class FSHLog implements HLog, Syncable { // HBASE-4387, HBASE-5623, retry with updateLock held tempWriter = this.writer; logSyncer.hlogFlush(tempWriter, pending); + postAppend(pending); } } } @@ -1144,14 +1146,20 @@ class FSHLog implements HLog, Syncable { return; } try { - if (tempWriter != null) tempWriter.sync(); + if (tempWriter != null) { + tempWriter.sync(); + postSync(); + } } catch(IOException ex) { synchronized (this.updateLock) { // HBASE-4387, HBASE-5623, retry with updateLock held // TODO: we don't actually need to do it for concurrent close - what is the point // of syncing new unrelated writer? Keep behavior for now. tempWriter = this.writer; - if (tempWriter != null) tempWriter.sync(); + if (tempWriter != null) { + tempWriter.sync(); + postSync(); + } } } this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); @@ -1177,6 +1185,12 @@ class FSHLog implements HLog, Syncable { } } + @Override + public void postSync() {} + + @Override + public void postAppend(List entries) {} + private void checkLowReplication() { // if the number of replicas in HDFS has fallen below the configured // value, then roll logs. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ccf77e16dca..ca5a812ad45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -266,6 +266,17 @@ public interface HLog { public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException; + /** + * For notification post append to the writer. + * @param entries + */ + void postAppend(final List entries); + + /** + * For notification post writer sync. + */ + void postSync(); + /** * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and * log-sequence-id. The HLog is not flushed after this transaction is written to the log. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 64acde68388..d92fac48078 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,6 +54,10 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.MetricsRegistry; +import com.yammer.metrics.reporting.ConsoleReporter; + /** * This class runs performance benchmarks for {@link HLog}. * See usage for this tool by running: @@ -61,6 +66,11 @@ import org.apache.hadoop.util.ToolRunner; @InterfaceAudience.Private public final class HLogPerformanceEvaluation extends Configured implements Tool { static final Log LOG = LogFactory.getLog(HLogPerformanceEvaluation.class.getName()); + private final MetricsRegistry metrics = new MetricsRegistry(); + private final Meter syncMeter = + metrics.newMeter(HLogPerformanceEvaluation.class, "syncMeter", "syncs", TimeUnit.MILLISECONDS); + private final Meter appendMeter = + metrics.newMeter(HLogPerformanceEvaluation.class, "append", "bytes", TimeUnit.MILLISECONDS); private HBaseTestingUtility TEST_UTIL; @@ -88,12 +98,14 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool private final int numFamilies; private final boolean noSync; private final HRegion region; + private final int syncInterval; private final HTableDescriptor htd; HLogPutBenchmark(final HRegion region, final HTableDescriptor htd, - final long numIterations, final boolean noSync) { + final long numIterations, final boolean noSync, final int syncInterval) { this.numIterations = numIterations; this.noSync = noSync; + this.syncInterval = syncInterval; this.numFamilies = htd.getColumnFamilies().length; this.region = region; this.htd = htd; @@ -110,17 +122,20 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool try { long startTime = System.currentTimeMillis(); + int lastSync = 0; for (int i = 0; i < numIterations; ++i) { Put put = setupPut(rand, key, value, numFamilies); long now = System.currentTimeMillis(); WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); HRegionInfo hri = region.getRegionInfo(); - if (this.noSync) { - hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd, - region.getSequenceId(), true, nonce, nonce); - } else { - hlog.append(hri, hri.getTable(), walEdit, now, htd, region.getSequenceId()); + hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd, + region.getSequenceId(), true, nonce, nonce); + if (!this.noSync) { + if (++lastSync >= this.syncInterval) { + hlog.sync(); + lastSync = 0; + } } } long totalTime = (System.currentTimeMillis() - startTime); @@ -135,8 +150,9 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool public int run(String[] args) throws Exception { Path rootRegionDir = null; int numThreads = 1; - long numIterations = 10000; + long numIterations = 1000000; int numFamilies = 1; + int syncInterval = 0; boolean noSync = false; boolean verify = false; boolean verbose = false; @@ -163,6 +179,8 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool keySize = Integer.parseInt(args[++i]); } else if (cmd.equals("-valueSize")) { valueSize = Integer.parseInt(args[++i]); + } else if (cmd.equals("-syncInterval")) { + syncInterval = Integer.parseInt(args[++i]); } else if (cmd.equals("-nosync")) { noSync = true; } else if (cmd.equals("-verify")) { @@ -237,14 +255,32 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool } super.doWrite(info, logKey, logEdit, htd); }; + + @Override + public void postSync() { + super.postSync(); + syncMeter.mark(); + } + + @Override + public void postAppend(List entries) { + super.postAppend(entries); + int size = 0; + for (Entry e: entries) size += e.getEdit().heapSize(); + appendMeter.mark(size); + } }; hlog.rollWriter(); HRegion region = null; try { region = openRegion(fs, rootRegionDir, htd, hlog); - long putTime = runBenchmark(new HLogPutBenchmark(region, htd, numIterations, noSync), numThreads); - logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations, - numIterations * numThreads, putTime); + ConsoleReporter.enable(this.metrics, 1, TimeUnit.SECONDS); + long putTime = + runBenchmark(new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval), + numThreads); + logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations + + ", syncInterval=" + syncInterval, numIterations * numThreads, putTime); + if (region != null) { closeRegion(region); region = null; @@ -327,6 +363,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool private static void logBenchmarkResult(String testName, long numTests, long totalTime) { float tsec = totalTime / 1000.0f; LOG.info(String.format("%s took %.3fs %.3fops/s", testName, tsec, numTests / tsec)); + } private void printUsageAndExit() { @@ -343,6 +380,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool System.err.println(" -nocleanup Do NOT remove test data when done."); System.err.println(" -noclosefs Do NOT close the filesystem when done."); System.err.println(" -nosync Append without syncing"); + System.err.println(" -syncInterval Append N edits and then sync. Default=0, i.e. sync every edit."); System.err.println(" -verify Verify edits written in sequence"); System.err.println(" -verbose Output extra info; e.g. all edit seq ids when verifying"); System.err.println(" -roll Roll the way every N appends"); @@ -425,4 +463,4 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool public static void main(String[] args) throws Exception { System.exit(innerMain(HBaseConfiguration.create(), args)); } -} +} \ No newline at end of file