HBASE-23617 Add a stress test tool for region based procedure store (#962)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2019-12-27 22:28:12 +08:00 committed by GitHub
parent 684a68e945
commit 0ba84d8e95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 468 additions and 197 deletions

View File

@ -127,5 +127,13 @@
</includes> </includes>
<fileMode>0644</fileMode> <fileMode>0644</fileMode>
</fileSet> </fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-procedure/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>${procedure.test.jar}</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets> </fileSets>
</component> </component>

View File

@ -168,5 +168,13 @@
</includes> </includes>
<fileMode>0644</fileMode> <fileMode>0644</fileMode>
</fileSet> </fileSet>
<fileSet>
<directory>${project.basedir}/../hbase-procedure/target/</directory>
<outputDirectory>lib</outputDirectory>
<includes>
<include>${procedure.test.jar}</include>
</includes>
<fileMode>0644</fileMode>
</fileSet>
</fileSets> </fileSets>
</component> </component>

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
/**
* Base class for testing procedure store performance.
*/
public abstract class ProcedureStorePerformanceEvaluation<T extends ProcedureStore>
extends AbstractHBaseTool {
// Command line options and defaults.
public static String DEFAULT_OUTPUT_PATH = "proc-store";
public static Option OUTPUT_PATH_OPTION =
new Option("output", true, "The output path. Default: " + DEFAULT_OUTPUT_PATH);
public static int DEFAULT_NUM_THREADS = 20;
public static Option NUM_THREADS_OPTION = new Option("threads", true,
"Number of parallel threads which will write insert/updates/deletes to store. Default: " +
DEFAULT_NUM_THREADS);
public static int DEFAULT_NUM_PROCS = 1000000; // 1M
public static Option NUM_PROCS_OPTION = new Option("procs", true,
"Total number of procedures. Each procedure writes one insert and one update. Default: " +
DEFAULT_NUM_PROCS);
public static int DEFAULT_STATE_SIZE = 1024; // 1KB
public static Option STATE_SIZE_OPTION = new Option("state_size", true,
"Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE +
"bytes");
public static Option SYNC_OPTION = new Option("sync", true,
"Type of sync to use when writing WAL contents to file system. Accepted values: hflush, " +
"hsync, nosync. Default: hflush");
public static String DEFAULT_SYNC_OPTION = "hflush";
protected String outputPath;
protected int numThreads;
protected long numProcs;
protected String syncType;
protected int stateSize;
protected static byte[] SERIALIZED_STATE;
protected T store;
/** Used by {@link Worker}. */
private AtomicLong procIds = new AtomicLong(0);
private AtomicBoolean workersFailed = new AtomicBoolean(false);
// Timeout for worker threads.
private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
@Override
protected void addOptions() {
addOption(OUTPUT_PATH_OPTION);
addOption(NUM_THREADS_OPTION);
addOption(NUM_PROCS_OPTION);
addOption(SYNC_OPTION);
addOption(STATE_SIZE_OPTION);
}
@Override
protected void processOptions(CommandLine cmd) {
outputPath = cmd.getOptionValue(OUTPUT_PATH_OPTION.getOpt(), DEFAULT_OUTPUT_PATH);
numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(
syncType) : "sync argument can only accept one of these three values: hsync, hflush, nosync";
stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
SERIALIZED_STATE = new byte[stateSize];
new Random(12345).nextBytes(SERIALIZED_STATE);
}
private void setUpProcedureStore() throws IOException {
FileSystem fs = FileSystem.get(conf);
Path storeDir = fs.makeQualified(new Path(outputPath));
System.out.println("Procedure store directory : " + storeDir.toString());
fs.delete(storeDir, true);
store = createProcedureStore(storeDir);
store.start(numThreads);
store.recoverLease();
store.load(new ProcedureTestingUtility.LoadCounter());
System.out.println("Starting new procedure store: " + store.getClass().getSimpleName());
}
protected abstract T createProcedureStore(Path storeDir) throws IOException;
private void tearDownProcedureStore() {
Path storeDir = null;
try {
if (store != null) {
store.stop(false);
}
FileSystem fs = FileSystem.get(conf);
storeDir = fs.makeQualified(new Path(outputPath));
fs.delete(storeDir, true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up " +
"disk space. Location: " + storeDir);
e.printStackTrace();
}
}
protected abstract void printRawFormatResult(long timeTakenNs);
@Override
protected int doWork() throws Exception {
try {
setUpProcedureStore();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
Future<?>[] futures = new Future<?>[numThreads];
// Start worker threads.
long start = System.nanoTime();
for (int i = 0; i < numThreads; i++) {
futures[i] = executor.submit(new Worker(start));
}
boolean failure = false;
try {
for (Future<?> future : futures) {
long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
}
} catch (Exception e) {
System.err.println("Exception in worker thread.");
e.printStackTrace();
return EXIT_FAILURE;
}
executor.shutdown();
if (failure) {
return EXIT_FAILURE;
}
long timeTakenNs = System.nanoTime() - start;
System.out.println("******************************************");
System.out.println("Num threads : " + numThreads);
System.out.println("Num procedures : " + numProcs);
System.out.println("Sync type : " + syncType);
System.out.println("Time taken : " + TimeUnit.NANOSECONDS.toSeconds(timeTakenNs) + "sec");
System.out.println("******************************************");
System.out.println("Raw format for scripts");
printRawFormatResult(timeTakenNs);
return EXIT_SUCCESS;
} catch (IOException e) {
e.printStackTrace();
return EXIT_FAILURE;
} finally {
tearDownProcedureStore();
}
}
///////////////////////////////
// HELPER CLASSES
///////////////////////////////
/**
* Callable to generate load for wal by inserting/deleting/updating procedures. If procedure store
* fails to roll log file (throws IOException), all threads quit, and at least one returns value
* of {@link AbstractHBaseTool#EXIT_FAILURE}.
*/
private final class Worker implements Callable<Integer> {
private final long start;
public Worker(long start) {
this.start = start;
}
// TODO: Can also collect #procs, time taken by each thread to measure fairness.
@Override
public Integer call() throws IOException {
while (true) {
if (workersFailed.get()) {
return EXIT_FAILURE;
}
long procId = procIds.getAndIncrement();
if (procId >= numProcs) {
break;
}
if (procId != 0 && procId % 10000 == 0) {
long ns = System.nanoTime() - start;
System.out.println("Wrote " + procId + " procedures in " +
StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(ns)));
}
try {
preWrite(procId);
} catch (IOException ioe) {
// Ask other threads to quit too.
workersFailed.set(true);
System.err.println("Exception when rolling log file. Current procId = " + procId);
ioe.printStackTrace();
return EXIT_FAILURE;
}
ProcedureTestingUtility.TestProcedure proc =
new ProcedureTestingUtility.TestProcedure(procId);
proc.setData(SERIALIZED_STATE);
store.insert(proc, null);
store.update(proc);
}
return EXIT_SUCCESS;
}
}
protected abstract void preWrite(long procId) throws IOException;
}

View File

@ -18,66 +18,29 @@
package org.apache.hadoop.hbase.procedure2.store.wal; package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery; import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { public class ProcedureWALPerformanceEvaluation
protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); extends ProcedureStorePerformanceEvaluation<WALProcedureStore> {
// Command line options and defaults. // Command line options and defaults.
public static int DEFAULT_NUM_THREADS = 20;
public static Option NUM_THREADS_OPTION = new Option("threads", true,
"Number of parallel threads which will write insert/updates/deletes to WAL. Default: "
+ DEFAULT_NUM_THREADS);
public static int DEFAULT_NUM_PROCS = 1000000; // 1M
public static Option NUM_PROCS_OPTION = new Option("procs", true,
"Total number of procedures. Each procedure writes one insert and one update. Default: "
+ DEFAULT_NUM_PROCS);
public static int DEFAULT_NUM_WALS = 0; public static int DEFAULT_NUM_WALS = 0;
public static Option NUM_WALS_OPTION = new Option("wals", true, public static Option NUM_WALS_OPTION = new Option("wals", true,
"Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY + "Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
" conf to roll the logs. Default: " + DEFAULT_NUM_WALS); " conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
public static int DEFAULT_STATE_SIZE = 1024; // 1KB
public static Option STATE_SIZE_OPTION = new Option("state_size", true,
"Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
+ "bytes");
public static Option SYNC_OPTION = new Option("sync", true,
"Type of sync to use when writing WAL contents to file system. Accepted values: hflush, "
+ "hsync, nosync. Default: hflush");
public static String DEFAULT_SYNC_OPTION = "hflush";
public int numThreads; private long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value.
public long numProcs; private int numWals;
public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value.
public int numWals;
public String syncType;
public int stateSize;
static byte[] serializedState;
private WALProcedureStore store;
/** Used by {@link Worker}. */
private AtomicLong procIds = new AtomicLong(0);
private AtomicBoolean workersFailed = new AtomicBoolean(false);
// Timeout for worker threads.
private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
// Non-default configurations. // Non-default configurations.
private void setupConf() { private void setupConf() {
@ -88,163 +51,52 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
} }
} }
private void setupProcedureStore() throws IOException {
Path testDir = UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
Path logDir = new Path(testDir, "proc-logs");
System.out.println("Logs directory : " + logDir.toString());
fs.delete(logDir, true);
if ("nosync".equals(syncType)) {
store = new NoSyncWalProcedureStore(conf, logDir);
} else {
store = ProcedureTestingUtility.createWalStore(conf, logDir);
}
store.start(numThreads);
store.recoverLease();
store.load(new ProcedureTestingUtility.LoadCounter());
System.out.println("Starting new log : "
+ store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
private void tearDownProcedureStore() {
store.stop(false);
try {
store.getFileSystem().delete(store.getWALDir(), true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
+ "disk space. Location: " + store.getWALDir().toString());
e.printStackTrace();
}
}
/** /**
* Processes and validates command line options. * Processes and validates command line options.
*/ */
@Override @Override
public void processOptions(CommandLine cmd) { public void processOptions(CommandLine cmd) {
numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS); super.processOptions(cmd);
numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS); numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType):
"sync argument can only accept one of these three values: hsync, hflush, nosync";
stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
serializedState = new byte[stateSize];
setupConf(); setupConf();
} }
@Override @Override
public void addOptions() { public void addOptions() {
addOption(NUM_THREADS_OPTION); super.addOptions();
addOption(NUM_PROCS_OPTION);
addOption(NUM_WALS_OPTION); addOption(NUM_WALS_OPTION);
addOption(SYNC_OPTION);
addOption(STATE_SIZE_OPTION);
} }
@Override @Override
public int doWork() { protected WALProcedureStore createProcedureStore(Path storeDir) throws IOException {
try { if ("nosync".equals(syncType)) {
setupProcedureStore(); return new NoSyncWalProcedureStore(conf, storeDir);
ExecutorService executor = Executors.newFixedThreadPool(numThreads); } else {
Future<?>[] futures = new Future<?>[numThreads]; return ProcedureTestingUtility.createWalStore(conf, storeDir);
// Start worker threads.
long start = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++) {
futures[i] = executor.submit(new Worker(start));
}
boolean failure = false;
try {
for (Future<?> future : futures) {
long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
}
} catch (Exception e) {
System.err.println("Exception in worker thread.");
e.printStackTrace();
return EXIT_FAILURE;
}
executor.shutdown();
if (failure) {
return EXIT_FAILURE;
}
long timeTaken = System.currentTimeMillis() - start;
System.out.println("******************************************");
System.out.println("Num threads : " + numThreads);
System.out.println("Num procedures : " + numProcs);
System.out.println("Sync type : " + syncType);
System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec");
System.out.println("******************************************");
System.out.println("Raw format for scripts");
System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
+ "total_time_ms=%s]",
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
return EXIT_SUCCESS;
} catch (IOException e) {
e.printStackTrace();
return EXIT_FAILURE;
} finally {
tearDownProcedureStore();
} }
} }
@Override
protected void printRawFormatResult(long timeTakenNs) {
System.out
.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]",
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads,
NUM_WALS_OPTION.getOpt(), numWals, timeTakenNs));
}
@Override
protected void preWrite(long procId) throws IOException {
if (procId > 0 && procId % numProcsPerWal == 0) {
store.rollWriterForTesting();
System.out.println(
"Starting new log : " + store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
}
/////////////////////////////// ///////////////////////////////
// HELPER CLASSES // HELPER CLASSES
/////////////////////////////// ///////////////////////////////
/**
* Callable to generate load for wal by inserting/deleting/updating procedures.
* If procedure store fails to roll log file (throws IOException), all threads quit, and at
* least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
*/
private final class Worker implements Callable<Integer> {
private final long start;
public Worker(long start) {
this.start = start;
}
// TODO: Can also collect #procs, time taken by each thread to measure fairness.
@Override
public Integer call() throws IOException {
while (true) {
if (workersFailed.get()) {
return EXIT_FAILURE;
}
long procId = procIds.getAndIncrement();
if (procId >= numProcs) {
break;
}
if (procId != 0 && procId % 10000 == 0) {
long ms = System.currentTimeMillis() - start;
System.out.println("Wrote " + procId + " procedures in "
+ StringUtils.humanTimeDiff(ms));
}
try{
if (procId > 0 && procId % numProcsPerWal == 0) {
store.rollWriterForTesting();
System.out.println("Starting new log : "
+ store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
} catch (IOException ioe) {
// Ask other threads to quit too.
workersFailed.set(true);
System.err.println("Exception when rolling log file. Current procId = " + procId);
ioe.printStackTrace();
return EXIT_FAILURE;
}
ProcedureTestingUtility.TestProcedure proc =
new ProcedureTestingUtility.TestProcedure(procId);
proc.setData(serializedState);
store.insert(proc, null);
store.update(proc);
}
return EXIT_SUCCESS;
}
}
private static class NoSyncWalProcedureStore extends WALProcedureStore { private static class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException { public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
super(conf, logDir, null, new LeaseRecovery() { super(conf, logDir, null, new LeaseRecovery() {
@ -263,7 +115,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation(); ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
tool.setConf(UTIL.getConfiguration()); tool.setConf(HBaseConfiguration.create());
tool.run(args); tool.run(args);
} }
} }

View File

@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -58,7 +59,7 @@ class RegionFlusherAndCompactor implements Closeable {
static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size"; static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
private static final long DEFAULT_FLUSH_SIZE = 16L * 1024 * 1024; static final long DEFAULT_FLUSH_SIZE = TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE;
static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes"; static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";

View File

@ -65,6 +65,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.math.IntMath;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@ -91,11 +92,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* --&lt;master-server-name&gt;-dead <---- The WAL dir dead master * --&lt;master-server-name&gt;-dead <---- The WAL dir dead master
* </pre> * </pre>
* *
* We use p:d column to store the serialized protobuf format procedure, and when deleting we * We use p:d column to store the serialized protobuf format procedure, and when deleting we will
* will first fill the info:proc column with an empty byte array, and then actually delete them in * first fill the info:proc column with an empty byte array, and then actually delete them in the
* the {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we * {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we can
* can not directly delete a procedure row as we do not know if it is the one with the max procedure * not directly delete a procedure row as we do not know if it is the one with the max procedure id.
* id.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RegionProcedureStore extends ProcedureStoreBase { public class RegionProcedureStore extends ProcedureStoreBase {
@ -155,7 +155,7 @@ public class RegionProcedureStore extends ProcedureStoreBase {
if (!setRunning(true)) { if (!setRunning(true)) {
return; return;
} }
LOG.info("Starting the Region Procedure Store..."); LOG.info("Starting the Region Procedure Store, number threads={}", numThreads);
this.numThreads = numThreads; this.numThreads = numThreads;
} }
@ -381,13 +381,15 @@ public class RegionProcedureStore extends ProcedureStoreBase {
CommonFSUtils.setRootDir(conf, rootDir); CommonFSUtils.setRootDir(conf, rootDir);
CommonFSUtils.setWALRootDir(conf, rootDir); CommonFSUtils.setWALRootDir(conf, rootDir);
RegionFlusherAndCompactor.setupConf(conf); RegionFlusherAndCompactor.setupConf(conf);
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
walRoller.start();
conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS)); conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
if (conf.get(USE_HSYNC_KEY) != null) { if (conf.get(USE_HSYNC_KEY) != null) {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY)); conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
} }
conf.setInt(AbstractFSWAL.RING_BUFFER_SLOT_COUNT, IntMath.ceilingPowerOfTwo(16 * numThreads));
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
walRoller.start();
walFactory = new WALFactory(conf, server.getServerName().toString(), false); walFactory = new WALFactory(conf, server.getServerName().toString(), false);
Path dataDir = new Path(rootDir, DATA_DIR); Path dataDir = new Path(rootDir, DATA_DIR);
if (fs.exists(dataDir)) { if (fs.exists(dataDir)) {

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AbstractWALRoller; import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -115,6 +117,11 @@ final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
// we do not need this feature, so force disable it. // we do not need this feature, so force disable it.
conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false); conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS)); conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
long flushSize = conf.getLong(RegionFlusherAndCompactor.FLUSH_SIZE_KEY,
RegionFlusherAndCompactor.DEFAULT_FLUSH_SIZE);
// make the roll size the same with the flush size, as we only have one region here
conf.setLong(WALUtil.WAL_BLOCK_SIZE, flushSize * 2);
conf.setFloat(AbstractFSWAL.WAL_ROLL_MULTIPLIER, 0.5f);
return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir); return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
} }
} }

View File

@ -136,8 +136,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout"; protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
public static final String WAL_ROLL_MULTIPLIER = "hbase.regionserver.logroll.multiplier";
public static final String MAX_LOGS = "hbase.regionserver.maxlogs"; public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
public static final String RING_BUFFER_SLOT_COUNT =
"hbase.regionserver.wal.disruptor.event.count";
/** /**
* file system instance * file system instance
*/ */
@ -358,9 +363,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// sync. If no sync, then the handlers will be outstanding just waiting on sync completion // sync. If no sync, then the handlers will be outstanding just waiting on sync completion
// before they return. // before they return.
int preallocatedEventCount = int preallocatedEventCount =
this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); this.conf.getInt(RING_BUFFER_SLOT_COUNT, 1024 * 16);
checkArgument(preallocatedEventCount >= 0, checkArgument(preallocatedEventCount >= 0, RING_BUFFER_SLOT_COUNT + " must > 0");
"hbase.regionserver.wal.disruptor.event.count must > 0");
int floor = Integer.highestOneBit(preallocatedEventCount); int floor = Integer.highestOneBit(preallocatedEventCount);
if (floor == preallocatedEventCount) { if (floor == preallocatedEventCount) {
return floor; return floor;
@ -450,7 +454,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
// 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally // 2 times the DFS default: i.e. 2 * DFS default block size rolling at 50% full will generally
// make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148. // make similar size logs to 1 * DFS default block size rolling at 95% full. See HBASE-19148.
this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir); this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f); float multiplier = conf.getFloat(WAL_ROLL_MULTIPLIER, 0.5f);
this.logrollsize = (long) (this.blocksize * multiplier); this.logrollsize = (long) (this.blocksize * multiplier);
this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize))); this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));

View File

@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
public class WALUtil { public class WALUtil {
private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class); private static final Logger LOG = LoggerFactory.getLogger(WALUtil.class);
public static final String WAL_BLOCK_SIZE = "hbase.regionserver.hlog.blocksize";
private WALUtil() { private WALUtil() {
// Shut down construction of this class. // Shut down construction of this class.
} }
@ -193,7 +195,7 @@ public class WALUtil {
if (isRecoverEdits) { if (isRecoverEdits) {
return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize); return conf.getLong("hbase.regionserver.recoverededits.blocksize", defaultBlockSize);
} }
return conf.getLong("hbase.regionserver.hlog.blocksize", defaultBlockSize); return conf.getLong(WAL_BLOCK_SIZE, defaultBlockSize);
} }
public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) { public static void filterCells(WALEdit edit, Function<Cell, Cell> mapper) {

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import java.io.IOException;
import java.lang.management.MemoryType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStorePerformanceEvaluation;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
public class RegionProcedureStorePerformanceEvaluation
extends ProcedureStorePerformanceEvaluation<RegionProcedureStore> {
private static final class MockServer implements Server {
private final Configuration conf;
private final ServerName serverName =
ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
public MockServer(Configuration conf) {
this.conf = conf;
}
@Override
public void abort(String why, Throwable e) {
}
@Override
public boolean isAborted() {
return false;
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZKWatcher getZooKeeper() {
throw new UnsupportedOperationException();
}
@Override
public Connection getConnection() {
throw new UnsupportedOperationException();
}
@Override
public Connection createConnection(Configuration conf) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public AsyncClusterConnection getAsyncClusterConnection() {
throw new UnsupportedOperationException();
}
@Override
public ServerName getServerName() {
return serverName;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
throw new UnsupportedOperationException();
}
@Override
public ChoreService getChoreService() {
throw new UnsupportedOperationException();
}
}
@Override
protected RegionProcedureStore createProcedureStore(Path storeDir) throws IOException {
Pair<Long, MemoryType> pair = MemorySizeUtil.getGlobalMemStoreSize(conf);
long globalMemStoreSize = pair.getFirst();
boolean offheap = pair.getSecond() == MemoryType.NON_HEAP;
float poolSizePercentage = offheap ? 1.0F :
conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT);
float initialCountPercentage =
conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT);
int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage,
initialCountPercentage, null);
conf.setBoolean(RegionProcedureStore.USE_HSYNC_KEY, "hsync".equals(syncType));
CommonFSUtils.setRootDir(conf, storeDir);
return new RegionProcedureStore(new MockServer(conf), (fs, apth) -> {
});
}
@Override
protected void printRawFormatResult(long timeTakenNs) {
System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, " + "total_time_ms=%s]",
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), stateSize,
SYNC_OPTION.getOpt(), syncType, NUM_THREADS_OPTION.getOpt(), numThreads, timeTakenNs));
}
@Override
protected void preWrite(long procId) throws IOException {
}
public static void main(String[] args) throws IOException {
RegionProcedureStorePerformanceEvaluation tool =
new RegionProcedureStorePerformanceEvaluation();
tool.setConf(HBaseConfiguration.create());
tool.run(args);
}
}