diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 4c99e01e2e4..f5ff01dd5e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -210,8 +210,8 @@ class ScannerCallableWithReplicas implements RetryingCallable { currentScannerCallable = scanner; // store where to start the replica scanner from if we need to. if (result != null && result.length != 0) this.lastResult = result[result.length - 1]; - if (LOG.isDebugEnabled()) { - LOG.debug("Setting current scanner as " + currentScannerCallable.scannerId + + if (LOG.isTraceEnabled()) { + LOG.trace("Setting current scanner as " + currentScannerCallable.scannerId + " associated with " + currentScannerCallable.getHRegionInfo().getReplicaId()); } // close all outstanding replica scanners but the one we heard back from diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 0963ad3ff29..1634b55edc3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -41,9 +41,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.google.common.base.Objects; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -90,16 +87,17 @@ import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.codehaus.jackson.map.ObjectMapper; - -import com.yammer.metrics.core.Histogram; -import com.yammer.metrics.stats.UniformSample; -import com.yammer.metrics.stats.Snapshot; - import org.htrace.Sampler; import org.htrace.Trace; import org.htrace.TraceScope; import org.htrace.impl.ProbabilitySampler; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.stats.Snapshot; +import com.yammer.metrics.stats.UniformSample; + /** * Script used evaluating HBase performance and scalability. Runs a HBase * client that steps through one of a set of hardcoded tests or 'experiments' @@ -472,34 +470,47 @@ public class PerformanceEvaluation extends Configured implements Tool { return job; } + /** + * Per client, how many tasks will we run? We divide number of rows by this number and have the + * client do the resulting count in a map task. + */ + static int TASKS_PER_CLIENT = 10; + + static String JOB_INPUT_FILENAME = "input.txt"; + /* * Write input file of offsets-per-client for the mapreduce job. * @param c Configuration - * @return Directory that contains file written. + * @return Directory that contains file written whose name is JOB_INPUT_FILENAME * @throws IOException */ - private static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { + static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException { + return writeInputFile(c, opts, new Path(".")); + } + + static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir) + throws IOException { SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss"); - Path jobdir = new Path(PERF_EVAL_DIR, formatter.format(new Date())); + Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date())); Path inputDir = new Path(jobdir, "inputs"); FileSystem fs = FileSystem.get(c); fs.mkdirs(inputDir); - Path inputFile = new Path(inputDir, "input.txt"); + Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME); PrintStream out = new PrintStream(fs.create(inputFile)); // Make input random. Map m = new TreeMap(); Hash h = MurmurHash.getInstance(); int perClientRows = (opts.totalRows / opts.numClientThreads); try { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < TASKS_PER_CLIENT; i++) { for (int j = 0; j < opts.numClientThreads; j++) { TestOptions next = new TestOptions(opts); next.startRow = (j * perClientRows) + (i * (perClientRows/10)); next.perClientRunRows = perClientRows / 10; String s = MAPPER.writeValueAsString(next); - LOG.info("maptask input=" + s); + LOG.info("Client=" + j + ", maptask=" + i + ", input=" + s); int hash = h.hash(Bytes.toBytes(s)); m.put(hash, s); } @@ -579,6 +590,7 @@ public class PerformanceEvaluation extends Configured implements Tool { boolean valueZipf = false; int valueSize = DEFAULT_VALUE_LENGTH; int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10; + int cycles = 1; public TestOptions() {} @@ -588,6 +600,7 @@ public class PerformanceEvaluation extends Configured implements Tool { */ public TestOptions(TestOptions that) { this.cmdName = that.cmdName; + this.cycles = that.cycles; this.nomapred = that.nomapred; this.startRow = that.startRow; this.size = that.size; @@ -620,6 +633,14 @@ public class PerformanceEvaluation extends Configured implements Tool { this.randomSleep = that.randomSleep; } + public int getCycles() { + return this.cycles; + } + + public void setCycles(final int cycles) { + this.cycles = cycles; + } + public boolean isValueZipf() { return valueZipf; } @@ -904,11 +925,11 @@ public class PerformanceEvaluation extends Configured implements Tool { */ Test(final HConnection con, final TestOptions options, final Status status) { this.connection = con; - this.conf = con.getConfiguration(); + this.conf = con == null? null: this.connection.getConfiguration(); + this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf); this.opts = options; this.status = status; this.testName = this.getClass().getSimpleName(); - receiverHost = SpanReceiverHost.getInstance(conf); if (options.traceRate >= 1.0) { this.traceSampler = Sampler.ALWAYS; } else if (options.traceRate > 0.0) { @@ -918,7 +939,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate)); if (options.isValueZipf()) { - this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.1); + this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2); } LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows."); } @@ -1016,18 +1037,21 @@ public class PerformanceEvaluation extends Configured implements Tool { void testTimed() throws IOException, InterruptedException { int lastRow = opts.startRow + opts.perClientRunRows; // Report on completion of 1/10th of total. - for (int i = opts.startRow; i < lastRow; i++) { - if (i % everyN != 0) continue; - long startTime = System.nanoTime(); - TraceScope scope = Trace.startSpan("test row", traceSampler); - try { - testRow(i); - } finally { - scope.close(); - } - latency.update((System.nanoTime() - startTime) / 1000); - if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { - status.setStatus(generateStatus(opts.startRow, i, lastRow)); + for (int ii = 0; ii < opts.cycles; ii++) { + if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles); + for (int i = opts.startRow; i < lastRow; i++) { + if (i % everyN != 0) continue; + long startTime = System.nanoTime(); + TraceScope scope = Trace.startSpan("test row", traceSampler); + try { + testRow(i); + } finally { + scope.close(); + } + latency.update((System.nanoTime() - startTime) / 1000); + if (status != null && i > 0 && (i % getReportingPeriod()) == 0) { + status.setStatus(generateStatus(opts.startRow, i, lastRow)); + } } } } @@ -1598,6 +1622,7 @@ public class PerformanceEvaluation extends Configured implements Tool { System.err.println(" multiGet Batch gets together into groups of N. Only supported " + "by randomRead. Default: disabled"); System.err.println(" replicas Enable region replica testing. Defaults: 1."); + System.err.println(" cycles How many times to cycle the test. Defaults: 1."); System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table."); System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0"); System.err.println(); @@ -1650,6 +1675,12 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } + final String cycles = "--cycles="; + if (cmd.startsWith(cycles)) { + opts.cycles = Integer.parseInt(cmd.substring(cycles.length())); + continue; + } + final String sampleRate = "--sampleRate="; if (cmd.startsWith(sampleRate)) { opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length())); @@ -1761,6 +1792,7 @@ public class PerformanceEvaluation extends Configured implements Tool { final String size = "--size="; if (cmd.startsWith(size)) { opts.size = Float.parseFloat(cmd.substring(size.length())); + if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB"); continue; } @@ -1815,26 +1847,36 @@ public class PerformanceEvaluation extends Configured implements Tool { if (isCommandClass(cmd)) { opts.cmdName = cmd; opts.numClientThreads = Integer.parseInt(args.remove()); - int rowsPerGB = ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize); if (opts.size != DEFAULT_OPTS.size && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { - throw new IllegalArgumentException(rows + " and " + size + " are mutually exclusive arguments."); - } - if (opts.size != DEFAULT_OPTS.size) { - // total size in GB specified - opts.totalRows = (int) opts.size * rowsPerGB; - opts.perClientRunRows = opts.totalRows / opts.numClientThreads; - } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { - // number of rows specified - opts.totalRows = opts.perClientRunRows * opts.numClientThreads; - opts.size = opts.totalRows / rowsPerGB; + throw new IllegalArgumentException(rows + " and " + size + + " are mutually exclusive options"); } + opts = calculateRowsAndSize(opts); break; } } return opts; } + static TestOptions calculateRowsAndSize(final TestOptions opts) { + int rowsPerGB = getRowsPerGB(opts); + if (opts.size != DEFAULT_OPTS.size) { + // total size in GB specified + opts.totalRows = (int) opts.size * rowsPerGB; + opts.perClientRunRows = opts.totalRows / opts.numClientThreads; + } else if (opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) { + // number of rows specified + opts.totalRows = opts.perClientRunRows * opts.numClientThreads; + opts.size = opts.totalRows / rowsPerGB; + } + return opts; + } + + static int getRowsPerGB(final TestOptions opts) { + return ONE_GB / (opts.valueRandom? opts.valueSize/2: opts.valueSize); + } + @Override public int run(String[] args) throws Exception { // Process command-line args. TODO: Better cmd-line processing diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java index a5bfcbee772..6bce51d2891 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerformanceEvaluation.java @@ -17,18 +17,35 @@ */ package org.apache.hadoop.hbase; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.PerformanceEvaluation.RandomReadTest; +import org.apache.hadoop.hbase.PerformanceEvaluation.TestOptions; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.stats.Snapshot; +import com.yammer.metrics.stats.UniformSample; + @Category(SmallTests.class) public class TestPerformanceEvaluation { + private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); + @Test public void testSerialization() throws JsonGenerationException, JsonMappingException, IOException { @@ -41,4 +58,82 @@ public class TestPerformanceEvaluation { mapper.readValue(optionsString, PerformanceEvaluation.TestOptions.class); assertTrue(optionsDeserialized.isAutoFlush()); } + + /** + * Exercise the mr spec writing. Simple assertions to make sure it is basically working. + * @throws IOException + */ + @Ignore @Test + public void testWriteInputFile() throws IOException { + TestOptions opts = new PerformanceEvaluation.TestOptions(); + final int clients = 10; + opts.setNumClientThreads(clients); + opts.setPerClientRunRows(10); + Path dir = + PerformanceEvaluation.writeInputFile(HTU.getConfiguration(), opts, HTU.getDataTestDir()); + FileSystem fs = FileSystem.get(HTU.getConfiguration()); + Path p = new Path(dir, PerformanceEvaluation.JOB_INPUT_FILENAME); + long len = fs.getFileStatus(p).getLen(); + assertTrue(len > 0); + byte [] content = new byte[(int)len]; + FSDataInputStream dis = fs.open(p); + try { + dis.readFully(content); + BufferedReader br = + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(content))); + int count = 0; + while (br.readLine() != null) { + count++; + } + assertEquals(clients * PerformanceEvaluation.TASKS_PER_CLIENT, count); + } finally { + dis.close(); + } + } + + @Test + public void testSizeCalculation() { + TestOptions opts = new PerformanceEvaluation.TestOptions(); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + int rows = opts.getPerClientRunRows(); + // Default row count + final int defaultPerClientRunRows = 1024 * 1024; + assertEquals(defaultPerClientRunRows, rows); + // If size is 2G, then twice the row count. + opts.setSize(2.0f); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows()); + // If two clients, then they get half the rows each. + opts.setNumClientThreads(2); + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(defaultPerClientRunRows, opts.getPerClientRunRows()); + // What if valueSize is 'random'? Then half of the valueSize so twice the rows. + opts.valueRandom = true; + opts = PerformanceEvaluation.calculateRowsAndSize(opts); + assertEquals(defaultPerClientRunRows * 2, opts.getPerClientRunRows()); + } + + @Test + public void testZipfian() + throws NoSuchMethodException, SecurityException, InstantiationException, IllegalAccessException, + IllegalArgumentException, InvocationTargetException { + TestOptions opts = new PerformanceEvaluation.TestOptions(); + opts.setValueZipf(true); + final int valueSize = 1024; + opts.setValueSize(valueSize); + RandomReadTest rrt = new RandomReadTest(null, opts, null); + Constructor ctor = + Histogram.class.getDeclaredConstructor(com.yammer.metrics.stats.Sample.class); + ctor.setAccessible(true); + Histogram histogram = (Histogram)ctor.newInstance(new UniformSample(1024 * 500)); + for (int i = 0; i < 100; i++) { + histogram.update(rrt.getValueLength(null)); + } + double stddev = histogram.stdDev(); + assertTrue(stddev != 0 && stddev != 1.0); + assertTrue(histogram.stdDev() != 0); + Snapshot snapshot = histogram.getSnapshot(); + double median = snapshot.getMedian(); + assertTrue(median != 0 && median != 1 && median != valueSize); + } } \ No newline at end of file