HBASE-23617 Add a stress test tool for region based procedure store (#962)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2019-12-27 22:28:12 +08:00
parent de157b975b
commit cfe6ccc755
10 changed files with 469 additions and 198 deletions

View File

@ -127,5 +127,13 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-procedure/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>${procedure.test.jar}</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
</component>

View File

@ -168,5 +168,13 @@
</includes>
<fileMode>0644</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-procedure/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>${procedure.test.jar}</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets>
</component>

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
/**
* Base class for testing procedure store performance.
*/
public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureStore>
extends AbstractHBaseTool {
// Command line options and defaults.
public static String DEFAULT_OUTPUT_PATH = "proc-store";
public static Option OUTPUT_PATH_OPTION =
new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH);
public static int DEFAULT_NUM_THREADS = 20;
public static Option NUM_THREADS_OPTION = new Option("threads", true,
"Number of parallel threads which will write insert/updates/deletes to store. Default: " +
DEFAULT_NUM_THREADS);
public static int DEFAULT_NUM_PROCS = 1000000; // 1M
public static Option NUM_PROCS_OPTION = new Option("procs", true,
"Total number of procedures. Each procedure writes one insert and one update. Default: " +
DEFAULT_NUM_PROCS);
public static int DEFAULT_STATE_SIZE = 1024; // 1KB
public static Option STATE_SIZE_OPTION = new Option("state_size", true,
"Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE +
"bytes");
public static Option SYNC_OPTION = new Option("sync", true,
"Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " +
"hsync, nosync. Default: hflush");
public static String DEFAULT_SYNC_OPTION = "hflush";
protected String outputPath;
protected int numThreads;
protected long numProcs;
protected String syncType;
protected int stateSize;
protected static byte[] SERIALIZED_STATE;
protected T store;
/** Used by {@link Worker}. */
private AtomicLong procIds = new AtomicLong(0);
private AtomicBoolean workersFailed = new AtomicBoolean(false);
// Timeout for worker threads.
private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
@Override
protected void addOptions() {
addOption(OUTPUT_PATH_OPTION);
addOption(NUM_THREADS_OPTION);
addOption(NUM_PROCS_OPTION);
addOption(SYNC_OPTION);
addOption(STATE_SIZE_OPTION);
}
@Override
protected void processOptions(CommandLine cmd) {
outputPath = cmd.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH);
numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(
syncType) : "sync argument can only accept one of these three values: hsync, hflush, nosync";
stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
SERIALIZED_STATE = new byte[stateSize];
new Random(12345).nextBytes(SERIALIZED_STATE);
}
private void setUpProcedureStore() throws IOException {
FileSystem fs = FileSystem.get(conf);
Path storeDir = fs.makeQualified(new Path(outputPath));
System.out.println("Procedure store directory : " + storeDir.toString());
fs.delete(storeDir, true);
store = createProcedureStore(storeDir);
store.start(numThreads);
store.recoverLease();
store.load(new ProcedureTestingUtility.LoadCounter());
System.out.println("Starting new procedure store: " + store.getClass().getSimpleName());
}
protected abstract T createProcedureStore(Path storeDir) throws IOException;
private void tearDownProcedureStore() {
Path storeDir = null;
try {
if (store != null) {
store.stop(false);
}
FileSystem fs = FileSystem.get(conf);
storeDir = fs.makeQualified(new Path(outputPath));
fs.delete(storeDir, true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " +
"disk space. Location: " + storeDir);
e.printStackTrace();
}
}
protected abstract void printRawFormatResult(long timeTakenNs);
@Override
protected int doWork() throws Exception {
try {
setUpProcedureStore();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
Future<?>[] futures = new Future<?>[numThreads];
// Start worker threads.
long start = System.nanoTime();
for (int i = 0; i < numThreads; i++) {
futures[i] = executor.submit(new Worker(start));
}
boolean failure = false;
try {
for (Future<?> future : futures) {
long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
}
} catch (Exception e) {
System.err.println("Exception in worker thread.");
e.printStackTrace();
return EXIT_FAILURE;
}
executor.shutdown();
if (failure) {
return EXIT_FAILURE;
}
long timeTakenNs = System.nanoTime() - start;
System.out.println("******************************************");
System.out.println("Num threads : " + numThreads);
System.out.println("Num procedures : " + numProcs);
System.out.println("Sync type : " + syncType);
System.out.println("Time taken : " + TimeUnit.NANOSECONDS.toSeconds(timeTakenNs) + "sec");
System.out.println("******************************************");
System.out.println("Raw format for scripts");
printRawFormatResult(timeTakenNs);
return EXIT_SUCCESS;
} catch (IOException e) {
e.printStackTrace();
return EXIT_FAILURE;
} finally {
tearDownProcedureStore();
}
}
///////////////////////////////
// HELPER CLASSES
///////////////////////////////
/**
* Callable to generate load for wal by inserting/deleting/updating procedures. If procedure store
* fails to roll log file (throws IOException), all threads quit, and at least one returns value
* of {@link AbstractHBaseTool#EXIT_FAILURE}.
*/
private final class Worker implements Callable<Integer> {
private final long start;
public Worker(long start) {
this.start = start;
}
// TODO: Can also collect #procs, time taken by each thread to measure fairness.
@Override
public Integer call() throws IOException {
while (true) {
if (workersFailed.get()) {
return EXIT_FAILURE;
}
long procId = procIds.getAndIncrement();
if (procId >= numProcs) {
break;
}
if (procId != 0 && procId % 10000 == 0) {
long ns = System.nanoTime() - start;
System.out.println("Wrote " + procId + " procedures in " +
StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(ns)));
}
try {
preWrite(procId);
} catch (IOException ioe) {
// Ask other threads to quit too.
workersFailed.set(true);
System.err.println("Exception when rolling log file. Current procId = " + procId);
ioe.printStackTrace();
return EXIT_FAILURE;
}
ProcedureTestingUtility.TestProcedure proc =
new ProcedureTestingUtility.TestProcedure(procId);
proc.setData(SERIALIZED_STATE);
store.insert(proc, null);
store.update(proc);
}
return EXIT_SUCCESS;
}
}
protected abstract void preWrite(long procId) throws IOException;
}

View File

@ -18,66 +18,29 @@
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
public class ProcedureWALPerformanceEvaluation
extends ProcedureStorePerformanceEvaluation<WALProcedureStore> {
// Command line options and defaults.
public static int DEFAULT_NUM_THREADS = 20;
public static Option NUM_THREADS_OPTION = new Option("threads", true,
"Number of parallel threads which will write insert/updates/deletes to WAL. Default: "
+ DEFAULT_NUM_THREADS);
public static int DEFAULT_NUM_PROCS = 1000000; // 1M
public static Option NUM_PROCS_OPTION = new Option("procs", true,
"Total number of procedures. Each procedure writes one insert and one update. Default: "
+ DEFAULT_NUM_PROCS);
public static int DEFAULT_NUM_WALS = 0;
public static Option NUM_WALS_OPTION = new Option("wals", true,
"Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
" conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
public static int DEFAULT_STATE_SIZE = 1024; // 1KB
public static Option STATE_SIZE_OPTION = new Option("state_size", true,
"Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
+ "bytes");
public static Option SYNC_OPTION = new Option("sync", true,
"Type of sync to use when writing WAL contents to file system. Accepted values: hflush, "
+ "hsync, nosync. Default: hflush");
public static String DEFAULT_SYNC_OPTION = "hflush";
"Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
" conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
public int numThreads;
public long numProcs;
public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value.
public int numWals;
public String syncType;
public int stateSize;
static byte[] serializedState;
private WALProcedureStore store;
/** Used by {@link Worker}. */
private AtomicLong procIds = new AtomicLong(0);
private AtomicBoolean workersFailed = new AtomicBoolean(false);
// Timeout for worker threads.
private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
private long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value.
private int numWals;
// Non-default configurations.
private void setupConf() {
@ -88,163 +51,52 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
}
}
private void setupProcedureStore() throws IOException {
Path testDir = UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
Path logDir = new Path(testDir, "proc-logs");
System.out.println("Logs directory : " + logDir.toString());
fs.delete(logDir, true);
if ("nosync".equals(syncType)) {
store = new NoSyncWalProcedureStore(conf, logDir);
} else {
store = ProcedureTestingUtility.createWalStore(conf, logDir);
}
store.start(numThreads);
store.recoverLease();
store.load(new ProcedureTestingUtility.LoadCounter());
System.out.println("Starting new log : "
+ store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
private void tearDownProcedureStore() {
store.stop(false);
try {
store.getFileSystem().delete(store.getWALDir(), true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
+ "disk space. Location: " + store.getWALDir().toString());
e.printStackTrace();
}
}
/**
* Processes and validates command line options.
*/
@Override
public void processOptions(CommandLine cmd) {
numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
super.processOptions(cmd);
numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType):
"sync argument can only accept one of these three values: hsync, hflush, nosync";
stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
serializedState = new byte[stateSize];
setupConf();
}
@Override
public void addOptions() {
addOption(NUM_THREADS_OPTION);
addOption(NUM_PROCS_OPTION);
super.addOptions();
addOption(NUM_WALS_OPTION);
addOption(SYNC_OPTION);
addOption(STATE_SIZE_OPTION);
}
@Override
public int doWork() {
try {
setupProcedureStore();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
Future<?>[] futures = new Future<?>[numThreads];
// Start worker threads.
long start = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++) {
futures[i] = executor.submit(new Worker(start));
}
boolean failure = false;
try {
for (Future<?> future : futures) {
long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
}
} catch (Exception e) {
System.err.println("Exception in worker thread.");
e.printStackTrace();
return EXIT_FAILURE;
}
executor.shutdown();
if (failure) {
return EXIT_FAILURE;
}
long timeTaken = System.currentTimeMillis() - start;
System.out.println("******************************************");
System.out.println("Num threads : " + numThreads);
System.out.println("Num procedures : " + numProcs);
System.out.println("Sync type : " + syncType);
System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec");
System.out.println("******************************************");
System.out.println("Raw format for scripts");
System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
+ "total_time_ms=%s]",
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
return EXIT_SUCCESS;
} catch (IOException e) {
e.printStackTrace();
return EXIT_FAILURE;
} finally {
tearDownProcedureStore();
protected WALProcedureStore createProcedureStore(Path storeDir) throws IOException {
if ("nosync".equals(syncType)) {
return new NoSyncWalProcedureStore(conf, storeDir);
} else {
return ProcedureTestingUtility.createWalStore(conf, storeDir);
}
}
@Override
protected void printRawFormatResult(long timeTakenNs) {
System.out
.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]",
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
NUM_WALS_OPTION.getOpt(), numWals, timeTakenNs));
}
@Override
protected void preWrite(long procId) throws IOException {
if (procId > 0 && procId % numProcsPerWal == 0) {
store.rollWriterForTesting();
System.out.println(
"Starting new log : " + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
}
///////////////////////////////
// HELPER CLASSES
///////////////////////////////
/**
* Callable to generate load for wal by inserting/deleting/updating procedures.
* If procedure store fails to roll log file (throws IOException), all threads quit, and at
* least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
*/
private final class Worker implements Callable<Integer> {
private final long start;
public Worker(long start) {
this.start = start;
}
// TODO: Can also collect #procs, time taken by each thread to measure fairness.
@Override
public Integer call() throws IOException {
while (true) {
if (workersFailed.get()) {
return EXIT_FAILURE;
}
long procId = procIds.getAndIncrement();
if (procId >= numProcs) {
break;
}
if (procId != 0 && procId % 10000 == 0) {
long ms = System.currentTimeMillis() - start;
System.out.println("Wrote " + procId + " procedures in "
+ StringUtils.humanTimeDiff(ms));
}
try{
if (procId > 0 && procId % numProcsPerWal == 0) {
store.rollWriterForTesting();
System.out.println("Starting new log : "
+ store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
} catch (IOException ioe) {
// Ask other threads to quit too.
workersFailed.set(true);
System.err.println("Exception when rolling log file. Current procId = " + procId);
ioe.printStackTrace();
return EXIT_FAILURE;
}
ProcedureTestingUtility.TestProcedure proc =
new ProcedureTestingUtility.TestProcedure(procId);
proc.setData(serializedState);
store.insert(proc, null);
store.update(proc);
}
return EXIT_SUCCESS;
}
}
private static class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
super(conf, logDir, null, new LeaseRecovery() {
@ -263,7 +115,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
public static void main(String[] args) throws IOException {
ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
tool.setConf(UTIL.getConfiguration());
tool.setConf(HBaseConfiguration.create());
tool.run(args);
}
}

View File

@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -58,7 +59,7 @@ class RegionFlusherAndCompactor implements Closeable {
static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
private static final long DEFAULT_FLUSH_SIZE = 16L * 1024 * 1024;
static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";

View File

@ -65,6 +65,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@ -91,11 +92,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* --&lt;master-server-name&gt;-dead <---- The WAL dir dead master
* </pre>
*
* We use p:d column to store the serialized protobuf format procedure, and when deleting we
* will first fill the info:proc column with an empty byte array, and then actually delete them in
* the {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we
* can not directly delete a procedure row as we do not know if it is the one with the max procedure
* id.
* We use p:d column to store the serialized protobuf format procedure, and when deleting we will
* first fill the info:proc column with an empty byte array, and then actually delete them in the
* {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
* not directly delete a procedure row as we do not know if it is the one with the max procedure id.
*/
@InterfaceAudience.Private
public class RegionProcedureStore extends ProcedureStoreBase {
@ -155,7 +155,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
if (!setRunning(true)) {
return;
}
LOG.info("Starting the Region Procedure Store...");
LOG.info("Starting the Region Procedure Store, number threads={}", numThreads);
this.numThreads = numThreads;
}
@ -381,13 +381,15 @@ public class RegionProcedureStore extends ProcedureStoreBase {
CommonFSUtils.setRootDir(conf, rootDir);
CommonFSUtils.setWALRootDir(conf, rootDir);
RegionFlusherAndCompactor.setupConf(conf);
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
walRoller.start();
conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
if (conf.get(USE_HSYNC_KEY) != null) {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
}
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
walRoller.start();
walFactory = new WALFactory(conf, server.getServerName().toString());
Path dataDir = new Path(rootDir, DATA_DIR);
if (fs.exists(dataDir)) {
@ -581,4 +583,4 @@ public class RegionProcedureStore extends ProcedureStoreBase {
LOG.warn("Failed to clean up delete procedures", e);
}
}
}
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
@ -115,6 +117,11 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
// we do not need this feature, so force disable it.
conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
long flushSize = conf.getLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY,
RegionFlusherAndCompactor.DEFAULT_FLUSH_SIZE);
// make the roll size the same with the flush size, as we only have one region here
conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
}
}

View File

@ -132,8 +132,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
public static final String RING_BUFFER_SLOT_COUNT =
"hbase.regionserver.wal.disruptor.event.count";
/**
* file system instance
*/
@ -342,9 +347,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion
// before they return.
int preallocatedEventCount =
this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
checkArgument(preallocatedEventCount >= 0,
"hbase.regionserver.wal.disruptor.event.count must > 0");
this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
int floor = Integer.highestOneBit(preallocatedEventCount);
if (floor == preallocatedEventCount) {
return floor;
@ -434,8 +438,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
// make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
this.logrollsize = (long)(this.blocksize * multiplier);
float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
this.logrollsize = (long) (this.blocksize * multiplier);
this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +

View File

@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
public class WALUtil {
private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize";
private WALUtil() {
// Shut down construction of this class.
}
@ -191,6 +193,6 @@ public class WALUtil {
if (isRecoverEdits) {
return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize);
}
return conf.getLong("hbase.regionserver.hlog.blocksize", defaultBlockSize);
return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize);
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import java.io.IOException;
import java.lang.management.MemoryType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
public class RegionProcedureStorePerformanceEvaluation
extends ProcedureStorePerformanceEvaluation<RegionProcedureStore> {
private static final class MockServer implements Server {
private final Configuration conf;
private final ServerName serverName =
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
public MockServer(Configuration conf) {
this.conf = conf;
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZKWatcher getZooKeeper() {
throw new UnsupportedOperationException();
}
@Override
public Connection getConnection() {
throw new UnsupportedOperationException();
}
@Override
public Connection createConnection(Configuration conf) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public ClusterConnection getClusterConnection() {
throw new UnsupportedOperationException();
}
@Override
public ServerName getServerName() {
return serverName;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
throw new UnsupportedOperationException();
}
@Override
public ChoreService getChoreService() {
throw new UnsupportedOperationException();
}
}
@Override
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
long globalMemStoreSize = pair.getFirst();
boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
float poolSizePercentage = offheap ? 1.0F :
conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
float initialCountPercentage =
conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, null);
conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
CommonFSUtils.setRootDir(conf, storeDir);
return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> {
});
}
@Override
protected void printRawFormatResult(long timeTakenNs) {
System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]",
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads, timeTakenNs));
}
@Override
protected void preWrite(long procId) throws IOException {
}
public static void main(String[] args) throws IOException {
RegionProcedureStorePerformanceEvaluation tool =
new RegionProcedureStorePerformanceEvaluation();
tool.setConf(HBaseConfiguration.create());
tool.run(args);
}
}