diff --git a/hbase-assembly/src/main/assembly/client-components.xml b/hbase-assembly/src/main/assembly/client-components.xml
index 2369f28d20c..fced18d5962 100644
--- a/hbase-assembly/src/main/assembly/client-components.xml
+++ b/hbase-assembly/src/main/assembly/client-components.xml
@@ -127,5 +127,13 @@
0644
+
+ ${project.basedir}/../hbase-procedure/target/
+ lib
+
+ ${procedure.test.jar}
+
+ 0644
+
diff --git a/hbase-assembly/src/main/assembly/components.xml b/hbase-assembly/src/main/assembly/components.xml
index 346cc8ce8ad..18dd4955903 100644
--- a/hbase-assembly/src/main/assembly/components.xml
+++ b/hbase-assembly/src/main/assembly/components.xml
@@ -168,5 +168,13 @@
0644
+
+ ${project.basedir}/../hbase-procedure/target/
+ lib
+
+ ${procedure.test.jar}
+
+ 0644
+
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
new file mode 100644
index 00000000000..b4888c5b162
--- /dev/null
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStorePerformanceEvaluation.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+
+/**
+ * Base class for testing procedure store performance.
+ */
+public abstract class ProcedureStorePerformanceEvaluation
+ extends AbstractHBaseTool {
+
+ // Command line options and defaults.
+ public static String DEFAULT_OUTPUT_PATH = "proc-store";
+
+ public static Option OUTPUT_PATH_OPTION =
+ new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH);
+
+ public static int DEFAULT_NUM_THREADS = 20;
+
+ public static Option NUM_THREADS_OPTION = new Option("threads", true,
+ "Number of parallel threads which will write insert/updates/deletes to store. Default: " +
+ DEFAULT_NUM_THREADS);
+
+ public static int DEFAULT_NUM_PROCS = 1000000; // 1M
+
+ public static Option NUM_PROCS_OPTION = new Option("procs", true,
+ "Total number of procedures. Each procedure writes one insert and one update. Default: " +
+ DEFAULT_NUM_PROCS);
+
+ public static int DEFAULT_STATE_SIZE = 1024; // 1KB
+
+ public static Option STATE_SIZE_OPTION = new Option("state_size", true,
+ "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE +
+ "bytes");
+
+ public static Option SYNC_OPTION = new Option("sync", true,
+ "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " +
+ "hsync, nosync. Default: hflush");
+
+ public static String DEFAULT_SYNC_OPTION = "hflush";
+
+ protected String outputPath;
+ protected int numThreads;
+ protected long numProcs;
+ protected String syncType;
+ protected int stateSize;
+ protected static byte[] SERIALIZED_STATE;
+
+ protected T store;
+
+ /** Used by {@link Worker}. */
+ private AtomicLong procIds = new AtomicLong(0);
+ private AtomicBoolean workersFailed = new AtomicBoolean(false);
+
+ // Timeout for worker threads.
+ private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
+
+ @Override
+ protected void addOptions() {
+ addOption(OUTPUT_PATH_OPTION);
+ addOption(NUM_THREADS_OPTION);
+ addOption(NUM_PROCS_OPTION);
+ addOption(SYNC_OPTION);
+ addOption(STATE_SIZE_OPTION);
+ }
+
+ @Override
+ protected void processOptions(CommandLine cmd) {
+ outputPath = cmd.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH);
+ numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
+ numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
+ syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
+ assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(
+ syncType) : "sync argument can only accept one of these three values: hsync, hflush, nosync";
+ stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
+ SERIALIZED_STATE = new byte[stateSize];
+ new Random(12345).nextBytes(SERIALIZED_STATE);
+ }
+
+ private void setUpProcedureStore() throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ Path storeDir = fs.makeQualified(new Path(outputPath));
+ System.out.println("Procedure store directory : " + storeDir.toString());
+ fs.delete(storeDir, true);
+ store = createProcedureStore(storeDir);
+ store.start(numThreads);
+ store.recoverLease();
+ store.load(new ProcedureTestingUtility.LoadCounter());
+ System.out.println("Starting new procedure store: " + store.getClass().getSimpleName());
+ }
+
+ protected abstract T createProcedureStore(Path storeDir) throws IOException;
+
+ private void tearDownProcedureStore() {
+ Path storeDir = null;
+ try {
+ if (store != null) {
+ store.stop(false);
+ }
+ FileSystem fs = FileSystem.get(conf);
+ storeDir = fs.makeQualified(new Path(outputPath));
+ fs.delete(storeDir, true);
+ } catch (IOException e) {
+ System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " +
+ "disk space. Location: " + storeDir);
+ e.printStackTrace();
+ }
+ }
+
+ protected abstract void printRawFormatResult(long timeTakenNs);
+
+ @Override
+ protected int doWork() throws Exception {
+ try {
+ setUpProcedureStore();
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ Future>[] futures = new Future>[numThreads];
+ // Start worker threads.
+ long start = System.nanoTime();
+ for (int i = 0; i < numThreads; i++) {
+ futures[i] = executor.submit(new Worker(start));
+ }
+ boolean failure = false;
+ try {
+ for (Future> future : futures) {
+ long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
+ failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
+ }
+ } catch (Exception e) {
+ System.err.println("Exception in worker thread.");
+ e.printStackTrace();
+ return EXIT_FAILURE;
+ }
+ executor.shutdown();
+ if (failure) {
+ return EXIT_FAILURE;
+ }
+ long timeTakenNs = System.nanoTime() - start;
+ System.out.println("******************************************");
+ System.out.println("Num threads : " + numThreads);
+ System.out.println("Num procedures : " + numProcs);
+ System.out.println("Sync type : " + syncType);
+ System.out.println("Time taken : " + TimeUnit.NANOSECONDS.toSeconds(timeTakenNs) + "sec");
+ System.out.println("******************************************");
+ System.out.println("Raw format for scripts");
+ printRawFormatResult(timeTakenNs);
+ return EXIT_SUCCESS;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return EXIT_FAILURE;
+ } finally {
+ tearDownProcedureStore();
+ }
+ }
+
+ ///////////////////////////////
+ // HELPER CLASSES
+ ///////////////////////////////
+
+ /**
+ * Callable to generate load for wal by inserting/deleting/updating procedures. If procedure store
+ * fails to roll log file (throws IOException), all threads quit, and at least one returns value
+ * of {@link AbstractHBaseTool#EXIT_FAILURE}.
+ */
+ private final class Worker implements Callable {
+ private final long start;
+
+ public Worker(long start) {
+ this.start = start;
+ }
+
+ // TODO: Can also collect #procs, time taken by each thread to measure fairness.
+ @Override
+ public Integer call() throws IOException {
+ while (true) {
+ if (workersFailed.get()) {
+ return EXIT_FAILURE;
+ }
+ long procId = procIds.getAndIncrement();
+ if (procId >= numProcs) {
+ break;
+ }
+ if (procId != 0 && procId % 10000 == 0) {
+ long ns = System.nanoTime() - start;
+ System.out.println("Wrote " + procId + " procedures in " +
+ StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(ns)));
+ }
+ try {
+ preWrite(procId);
+ } catch (IOException ioe) {
+ // Ask other threads to quit too.
+ workersFailed.set(true);
+ System.err.println("Exception when rolling log file. Current procId = " + procId);
+ ioe.printStackTrace();
+ return EXIT_FAILURE;
+ }
+ ProcedureTestingUtility.TestProcedure proc =
+ new ProcedureTestingUtility.TestProcedure(procId);
+ proc.setData(SERIALIZED_STATE);
+ store.insert(proc, null);
+ store.update(proc);
+ }
+ return EXIT_SUCCESS;
+ }
+ }
+
+ protected abstract void preWrite(long procId) throws IOException;
+}
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
index 7ad26d7590d..cab44264f29 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
@@ -18,66 +18,29 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
-import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.util.AbstractHBaseTool;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
-public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
- protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+public class ProcedureWALPerformanceEvaluation
+ extends ProcedureStorePerformanceEvaluation {
// Command line options and defaults.
- public static int DEFAULT_NUM_THREADS = 20;
- public static Option NUM_THREADS_OPTION = new Option("threads", true,
- "Number of parallel threads which will write insert/updates/deletes to WAL. Default: "
- + DEFAULT_NUM_THREADS);
- public static int DEFAULT_NUM_PROCS = 1000000; // 1M
- public static Option NUM_PROCS_OPTION = new Option("procs", true,
- "Total number of procedures. Each procedure writes one insert and one update. Default: "
- + DEFAULT_NUM_PROCS);
public static int DEFAULT_NUM_WALS = 0;
public static Option NUM_WALS_OPTION = new Option("wals", true,
- "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
- " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
- public static int DEFAULT_STATE_SIZE = 1024; // 1KB
- public static Option STATE_SIZE_OPTION = new Option("state_size", true,
- "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
- + "bytes");
- public static Option SYNC_OPTION = new Option("sync", true,
- "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, "
- + "hsync, nosync. Default: hflush");
- public static String DEFAULT_SYNC_OPTION = "hflush";
+ "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
+ " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
- public int numThreads;
- public long numProcs;
- public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value.
- public int numWals;
- public String syncType;
- public int stateSize;
- static byte[] serializedState;
- private WALProcedureStore store;
-
- /** Used by {@link Worker}. */
- private AtomicLong procIds = new AtomicLong(0);
- private AtomicBoolean workersFailed = new AtomicBoolean(false);
- // Timeout for worker threads.
- private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
+ private long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value.
+ private int numWals;
// Non-default configurations.
private void setupConf() {
@@ -88,163 +51,52 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
}
}
- private void setupProcedureStore() throws IOException {
- Path testDir = UTIL.getDataTestDir();
- FileSystem fs = testDir.getFileSystem(conf);
- Path logDir = new Path(testDir, "proc-logs");
- System.out.println("Logs directory : " + logDir.toString());
- fs.delete(logDir, true);
- if ("nosync".equals(syncType)) {
- store = new NoSyncWalProcedureStore(conf, logDir);
- } else {
- store = ProcedureTestingUtility.createWalStore(conf, logDir);
- }
- store.start(numThreads);
- store.recoverLease();
- store.load(new ProcedureTestingUtility.LoadCounter());
- System.out.println("Starting new log : "
- + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
- }
-
- private void tearDownProcedureStore() {
- store.stop(false);
- try {
- store.getFileSystem().delete(store.getWALDir(), true);
- } catch (IOException e) {
- System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
- + "disk space. Location: " + store.getWALDir().toString());
- e.printStackTrace();
- }
- }
-
/**
* Processes and validates command line options.
*/
@Override
public void processOptions(CommandLine cmd) {
- numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
- numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
+ super.processOptions(cmd);
numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
- syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
- assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType):
- "sync argument can only accept one of these three values: hsync, hflush, nosync";
- stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
- serializedState = new byte[stateSize];
setupConf();
}
@Override
public void addOptions() {
- addOption(NUM_THREADS_OPTION);
- addOption(NUM_PROCS_OPTION);
+ super.addOptions();
addOption(NUM_WALS_OPTION);
- addOption(SYNC_OPTION);
- addOption(STATE_SIZE_OPTION);
}
@Override
- public int doWork() {
- try {
- setupProcedureStore();
- ExecutorService executor = Executors.newFixedThreadPool(numThreads);
- Future>[] futures = new Future>[numThreads];
- // Start worker threads.
- long start = System.currentTimeMillis();
- for (int i = 0; i < numThreads; i++) {
- futures[i] = executor.submit(new Worker(start));
- }
- boolean failure = false;
- try {
- for (Future> future : futures) {
- long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
- failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
- }
- } catch (Exception e) {
- System.err.println("Exception in worker thread.");
- e.printStackTrace();
- return EXIT_FAILURE;
- }
- executor.shutdown();
- if (failure) {
- return EXIT_FAILURE;
- }
- long timeTaken = System.currentTimeMillis() - start;
- System.out.println("******************************************");
- System.out.println("Num threads : " + numThreads);
- System.out.println("Num procedures : " + numProcs);
- System.out.println("Sync type : " + syncType);
- System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec");
- System.out.println("******************************************");
- System.out.println("Raw format for scripts");
- System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
- + "total_time_ms=%s]",
- NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
- SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
- NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
- return EXIT_SUCCESS;
- } catch (IOException e) {
- e.printStackTrace();
- return EXIT_FAILURE;
- } finally {
- tearDownProcedureStore();
+ protected WALProcedureStore createProcedureStore(Path storeDir) throws IOException {
+ if ("nosync".equals(syncType)) {
+ return new NoSyncWalProcedureStore(conf, storeDir);
+ } else {
+ return ProcedureTestingUtility.createWalStore(conf, storeDir);
}
}
+ @Override
+ protected void printRawFormatResult(long timeTakenNs) {
+ System.out
+ .println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]",
+ NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
+ SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
+ NUM_WALS_OPTION.getOpt(), numWals, timeTakenNs));
+ }
+
+ @Override
+ protected void preWrite(long procId) throws IOException {
+ if (procId > 0 && procId % numProcsPerWal == 0) {
+ store.rollWriterForTesting();
+ System.out.println(
+ "Starting new log : " + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
+ }
+ }
///////////////////////////////
// HELPER CLASSES
///////////////////////////////
- /**
- * Callable to generate load for wal by inserting/deleting/updating procedures.
- * If procedure store fails to roll log file (throws IOException), all threads quit, and at
- * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
- */
- private final class Worker implements Callable {
- private final long start;
-
- public Worker(long start) {
- this.start = start;
- }
-
- // TODO: Can also collect #procs, time taken by each thread to measure fairness.
- @Override
- public Integer call() throws IOException {
- while (true) {
- if (workersFailed.get()) {
- return EXIT_FAILURE;
- }
- long procId = procIds.getAndIncrement();
- if (procId >= numProcs) {
- break;
- }
- if (procId != 0 && procId % 10000 == 0) {
- long ms = System.currentTimeMillis() - start;
- System.out.println("Wrote " + procId + " procedures in "
- + StringUtils.humanTimeDiff(ms));
- }
- try{
- if (procId > 0 && procId % numProcsPerWal == 0) {
- store.rollWriterForTesting();
- System.out.println("Starting new log : "
- + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
- }
- } catch (IOException ioe) {
- // Ask other threads to quit too.
- workersFailed.set(true);
- System.err.println("Exception when rolling log file. Current procId = " + procId);
- ioe.printStackTrace();
- return EXIT_FAILURE;
- }
- ProcedureTestingUtility.TestProcedure proc =
- new ProcedureTestingUtility.TestProcedure(procId);
- proc.setData(serializedState);
- store.insert(proc, null);
- store.update(proc);
- }
- return EXIT_SUCCESS;
- }
- }
-
private static class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
super(conf, logDir, null, new LeaseRecovery() {
@@ -263,7 +115,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
public static void main(String[] args) throws IOException {
ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
- tool.setConf(UTIL.getConfiguration());
+ tool.setConf(HBaseConfiguration.create());
tool.run(args);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
index fb24802a9b2..53bf66b49df 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionFlusherAndCompactor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -58,7 +59,7 @@ class RegionFlusherAndCompactor implements Closeable {
static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
- private static final long DEFAULT_FLUSH_SIZE = 16L * 1024 * 1024;
+ static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
index 531a9531818..169a194d608 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
@@ -65,6 +65,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@@ -91,11 +92,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* --<master-server-name>-dead <---- The WAL dir dead master
*
*
- * We use p:d column to store the serialized protobuf format procedure, and when deleting we
- * will first fill the info:proc column with an empty byte array, and then actually delete them in
- * the {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we
- * can not directly delete a procedure row as we do not know if it is the one with the max procedure
- * id.
+ * We use p:d column to store the serialized protobuf format procedure, and when deleting we will
+ * first fill the info:proc column with an empty byte array, and then actually delete them in the
+ * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
+ * not directly delete a procedure row as we do not know if it is the one with the max procedure id.
*/
@InterfaceAudience.Private
public class RegionProcedureStore extends ProcedureStoreBase {
@@ -155,7 +155,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
if (!setRunning(true)) {
return;
}
- LOG.info("Starting the Region Procedure Store...");
+ LOG.info("Starting the Region Procedure Store, number threads={}", numThreads);
this.numThreads = numThreads;
}
@@ -381,13 +381,15 @@ public class RegionProcedureStore extends ProcedureStoreBase {
CommonFSUtils.setRootDir(conf, rootDir);
CommonFSUtils.setWALRootDir(conf, rootDir);
RegionFlusherAndCompactor.setupConf(conf);
-
- walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
- walRoller.start();
conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
if (conf.get(USE_HSYNC_KEY) != null) {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
}
+ conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
+
+ walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
+ walRoller.start();
+
walFactory = new WALFactory(conf, server.getServerName().toString(), false);
Path dataDir = new Path(rootDir, DATA_DIR);
if (fs.exists(dataDir)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
index fc84c277204..7dd4d39e242 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStoreWALRoller.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
@@ -115,6 +117,11 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller {
// we do not need this feature, so force disable it.
conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
+ long flushSize = conf.getLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY,
+ RegionFlusherAndCompactor.DEFAULT_FLUSH_SIZE);
+ // make the roll size the same with the flush size, as we only have one region here
+ conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
+ conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
}
}
\ No newline at end of file
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 0c8991061b2..52986981d20 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -136,8 +136,13 @@ public abstract class AbstractFSWAL implements WAL {
protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
+ public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
+
public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
+ public static final String RING_BUFFER_SLOT_COUNT =
+ "hbase.regionserver.wal.disruptor.event.count";
+
/**
* file system instance
*/
@@ -358,9 +363,8 @@ public abstract class AbstractFSWAL implements WAL {
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion
// before they return.
int preallocatedEventCount =
- this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
- checkArgument(preallocatedEventCount >= 0,
- "hbase.regionserver.wal.disruptor.event.count must > 0");
+ this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
+ checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
int floor = Integer.highestOneBit(preallocatedEventCount);
if (floor == preallocatedEventCount) {
return floor;
@@ -450,8 +454,8 @@ public abstract class AbstractFSWAL implements WAL {
// 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
// make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
- float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
- this.logrollsize = (long)(this.blocksize * multiplier);
+ float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
+ this.logrollsize = (long) (this.blocksize * multiplier);
this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 152602e1bce..e8247875746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
public class WALUtil {
private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
+ public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize";
+
private WALUtil() {
// Shut down construction of this class.
}
@@ -193,7 +195,7 @@ public class WALUtil {
if (isRecoverEdits) {
return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize);
}
- return conf.getLong("hbase.regionserver.hlog.blocksize", defaultBlockSize);
+ return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize);
}
public static void filterCells(WALEdit edit, Function mapper) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
new file mode 100644
index 00000000000..f3ab2e52fa6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStorePerformanceEvaluation.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import java.io.IOException;
+import java.lang.management.MemoryType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+
+public class RegionProcedureStorePerformanceEvaluation
+ extends ProcedureStorePerformanceEvaluation {
+
+ private static final class MockServer implements Server {
+
+ private final Configuration conf;
+
+ private final ServerName serverName =
+ ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
+
+ public MockServer(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ @Override
+ public void stop(String why) {
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ZKWatcher getZooKeeper() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Connection getConnection() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Connection createConnection(Configuration conf) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public AsyncClusterConnection getAsyncClusterConnection() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return serverName;
+ }
+
+ @Override
+ public CoordinatedStateManager getCoordinatedStateManager() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
+ Pair pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
+ long globalMemStoreSize = pair.getFirst();
+ boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
+ float poolSizePercentage = offheap ? 1.0F :
+ conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
+ float initialCountPercentage =
+ conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
+ int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
+ ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
+ initialCountPercentage, null);
+ conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
+ CommonFSUtils.setRootDir(conf, storeDir);
+ return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> {
+ });
+ }
+
+ @Override
+ protected void printRawFormatResult(long timeTakenNs) {
+ System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]",
+ NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
+ SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads, timeTakenNs));
+ }
+
+ @Override
+ protected void preWrite(long procId) throws IOException {
+ }
+
+ public static void main(String[] args) throws IOException {
+ RegionProcedureStorePerformanceEvaluation tool =
+ new RegionProcedureStorePerformanceEvaluation();
+ tool.setConf(HBaseConfiguration.create());
+ tool.run(args);
+ }
+}
|