HBASE-16101 Tool to microbenchmark procedure WAL performance.

Change-Id: I8ec158319395d2ec8e36641a3beab2694f7b6aef
This commit is contained in:
Apekshit Sharma 2016-08-29 19:23:09 -07:00
parent 4a4f8e7049
commit c66bb48ce8
4 changed files with 567 additions and 37 deletions

View File

@ -165,6 +165,17 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-thrift</artifactId>
</dependency>
<!-- To dump tools in hbase-procedure into cached_classpath.txt. -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-procedure</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-procedure</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>

View File

@ -17,13 +17,11 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
@ -40,12 +38,11 @@ import org.apache.hadoop.util.ToolRunner;
*/
@InterfaceAudience.Private
public abstract class AbstractHBaseTool implements Tool {
protected static final int EXIT_SUCCESS = 0;
protected static final int EXIT_FAILURE = 1;
private static final String SHORT_HELP_OPTION = "h";
private static final String LONG_HELP_OPTION = "help";
private static final Option HELP_OPTION = new Option("h", "help", false,
"Prints help for this tool.");
private static final Log LOG = LogFactory.getLog(AbstractHBaseTool.class);
@ -53,8 +50,6 @@ public abstract class AbstractHBaseTool implements Tool {
protected Configuration conf = null;
private static final Set<String> requiredOptions = new TreeSet<String>();
protected String[] cmdLineArgs = null;
/**
@ -83,6 +78,7 @@ public abstract class AbstractHBaseTool implements Tool {
@Override
public final int run(String[] args) throws IOException {
cmdLineArgs = args;
if (conf == null) {
LOG.error("Tool configuration is not initialized");
throw new NullPointerException("conf");
@ -90,24 +86,22 @@ public abstract class AbstractHBaseTool implements Tool {
CommandLine cmd;
try {
addOptions();
if (isHelpCommand(args)) {
printUsage();
return EXIT_SUCCESS;
}
// parse the command line arguments
cmd = parseArgs(args);
cmdLineArgs = args;
cmd = new BasicParser().parse(options, args);
} catch (ParseException e) {
LOG.error("Error when parsing command-line arguments", e);
printUsage();
return EXIT_FAILURE;
}
if (cmd.hasOption(SHORT_HELP_OPTION) || cmd.hasOption(LONG_HELP_OPTION) ||
!sanityCheckOptions(cmd)) {
printUsage();
return EXIT_FAILURE;
}
processOptions(cmd);
int ret = EXIT_FAILURE;
int ret;
try {
ret = doWork();
} catch (Exception e) {
@ -117,22 +111,11 @@ public abstract class AbstractHBaseTool implements Tool {
return ret;
}
private boolean sanityCheckOptions(CommandLine cmd) {
boolean success = true;
for (String reqOpt : requiredOptions) {
if (!cmd.hasOption(reqOpt)) {
LOG.error("Required option -" + reqOpt + " is missing");
success = false;
}
}
return success;
}
protected CommandLine parseArgs(String[] args) throws ParseException {
options.addOption(SHORT_HELP_OPTION, LONG_HELP_OPTION, false, "Show usage");
addOptions();
CommandLineParser parser = new BasicParser();
return parser.parse(options, args);
private boolean isHelpCommand(String[] args) throws ParseException {
Options helpOption = new Options().addOption(HELP_OPTION);
// this parses the command line but doesn't throw an exception on unknown options
CommandLine cl = new BasicParser().parse(helpOption, args, true);
return cl.getOptions().length != 0;
}
protected void printUsage() {
@ -146,14 +129,20 @@ public abstract class AbstractHBaseTool implements Tool {
helpFormatter.printHelp(usageStr, usageHeader, options, usageFooter);
}
protected void addOption(Option option) {
options.addOption(option);
}
protected void addRequiredOptWithArg(String opt, String description) {
requiredOptions.add(opt);
addOptWithArg(opt, description);
Option option = new Option(opt, true, description);
option.setRequired(true);
options.addOption(option);
}
protected void addRequiredOptWithArg(String shortOpt, String longOpt, String description) {
requiredOptions.add(longOpt);
addOptWithArg(shortOpt, longOpt, description);
Option option = new Option(shortOpt, longOpt, true, description);
option.setRequired(true);
options.addOption(option);
}
protected void addOptNoArg(String opt, String description) {
@ -172,6 +161,21 @@ public abstract class AbstractHBaseTool implements Tool {
options.addOption(shortOpt, longOpt, true, description);
}
public int getOptionAsInt(CommandLine cmd, String opt, int defaultValue) {
if (cmd.hasOption(opt)) {
return Integer.parseInt(cmd.getOptionValue(opt));
} else {
return defaultValue;
}
}
public double getOptionAsDouble(CommandLine cmd, String opt, double defaultValue) {
if (cmd.hasOption(opt)) {
return Double.parseDouble(cmd.getOptionValue(opt));
} else {
return defaultValue;
}
}
/**
* Parse a number and enforce a range.
*/

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import static java.lang.System.currentTimeMillis;
public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
// Command line options and defaults.
public static int DEFAULT_NUM_PROCS = 1000000; // 1M
public static Option NUM_PROCS_OPTION = new Option("procs", true,
"Total number of procedures. Default: " + DEFAULT_NUM_PROCS);
public static int DEFAULT_NUM_WALS = 0;
public static Option NUM_WALS_OPTION = new Option("wals", true,
"Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
" conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
public static int DEFAULT_STATE_SIZE = 1024; // 1KB
public static Option STATE_SIZE_OPTION = new Option("size", true,
"Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
+ " bytes");
public static int DEFAULT_UPDATES_PER_PROC = 5;
public static Option UPDATES_PER_PROC_OPTION = new Option("updates_per_proc", true,
"Number of update states to write for each proc. Default: " + DEFAULT_UPDATES_PER_PROC);
public static double DEFAULT_DELETE_PROCS_FRACTION = 0.50;
public static Option DELETE_PROCS_FRACTION_OPTION = new Option("delete_procs_fraction", true,
"Fraction of procs for which to write delete state. Distribution of procs chosen for "
+ "delete is uniform across all procs. Default: " + DEFAULT_DELETE_PROCS_FRACTION);
public int numProcs;
public int updatesPerProc;
public double deleteProcsFraction;
public int numWals;
private WALProcedureStore store;
static byte[] serializedState;
private class LoadCounter implements ProcedureStore.ProcedureLoader {
public LoadCounter() {}
@Override
public void setMaxProcId(long maxProcId) {
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
if (procIter.isNextCompleted()) {
ProcedureInfo proc = procIter.nextAsProcedureInfo();
} else {
Procedure proc = procIter.nextAsProcedure();
}
}
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
Procedure proc = procIter.nextAsProcedure();
}
}
}
@Override
protected void addOptions() {
addOption(NUM_PROCS_OPTION);
addOption(UPDATES_PER_PROC_OPTION);
addOption(DELETE_PROCS_FRACTION_OPTION);
addOption(NUM_WALS_OPTION);
addOption(STATE_SIZE_OPTION);
}
@Override
protected void processOptions(CommandLine cmd) {
numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
int stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
serializedState = new byte[stateSize];
updatesPerProc = getOptionAsInt(cmd, UPDATES_PER_PROC_OPTION.getOpt(),
DEFAULT_UPDATES_PER_PROC);
deleteProcsFraction = getOptionAsDouble(cmd, DELETE_PROCS_FRACTION_OPTION.getOpt(),
DEFAULT_DELETE_PROCS_FRACTION);
setupConf();
}
private void setupConf() {
if (numWals > 0) {
conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
}
}
public void setUpProcedureStore() throws IOException {
Path testDir = UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
Path logDir = new Path(testDir, "proc-logs");
System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
fs.delete(logDir, true);
store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
store.start(1);
store.recoverLease();
store.load(new LoadCounter());
}
/**
* @return a list of shuffled integers which represent state of proc id. First occurrence of a
* number denotes insert state, consecutive occurrences denote update states, and -ve value
* denotes delete state.
*/
private List<Integer> shuffleProcWriteSequence() {
Random rand = new Random();
List<Integer> procStatesSequence = new ArrayList<>();
Set<Integer> toBeDeletedProcs = new HashSet<>();
// Add n + 1 entries of the proc id for insert + updates. If proc is chosen for delete, add
// extra entry which is marked -ve in the loop after shuffle.
for (int procId = 1; procId <= numProcs; ++procId) {
procStatesSequence.addAll(Collections.nCopies(updatesPerProc + 1, procId));
if (rand.nextFloat() < deleteProcsFraction) {
procStatesSequence.add(procId);
toBeDeletedProcs.add(procId);
}
}
Collections.shuffle(procStatesSequence);
// Mark last occurrences of proc ids in toBeDeletedProcs with -ve to denote it's a delete state.
for (int i = procStatesSequence.size() - 1; i >= 0; --i) {
int procId = procStatesSequence.get(i);
if (toBeDeletedProcs.contains(procId)) {
procStatesSequence.set(i, -1 * procId);
toBeDeletedProcs.remove(procId);
}
}
return procStatesSequence;
}
private void writeWals() throws IOException {
List<Integer> procStates = shuffleProcWriteSequence();
TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used.
int numProcsPerWal = numWals > 0 ? (int)Math.ceil(procStates.size() / numWals)
: Integer.MAX_VALUE;
long startTime = currentTimeMillis();
long lastTime = startTime;
for (int i = 0; i < procStates.size(); ++i) {
int procId = procStates.get(i);
if (procId < 0) {
store.delete(procs[-procId].getProcId());
procs[-procId] = null;
} else if (procs[procId] == null) {
procs[procId] = new TestProcedure(procId, 0);
procs[procId].setData(serializedState);
store.insert(procs[procId], null);
} else {
store.update(procs[procId]);
}
if (i > 0 && i % numProcsPerWal == 0) {
long currentTime = currentTimeMillis();
System.out.println("Forcing wall roll. Time taken on last WAL: " +
(currentTime - lastTime) / 1000.0f + " sec");
store.rollWriterForTesting();
lastTime = currentTime;
}
}
long timeTaken = currentTimeMillis() - startTime;
System.out.println("\n\nDone writing WALs.\nNum procs : " + numProcs + "\nTotal time taken : "
+ StringUtils.humanTimeDiff(timeTaken) + "\n\n");
}
private void storeRestart(ProcedureStore.ProcedureLoader loader) throws IOException {
System.out.println("Restarting procedure store to read back the WALs");
store.stop(false);
store.start(1);
store.recoverLease();
long startTime = currentTimeMillis();
store.load(loader);
long timeTaken = System.currentTimeMillis() - startTime;
System.out.println("******************************************");
System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec");
System.out.println("******************************************");
}
public void tearDownProcedureStore() {
store.stop(false);
try {
store.getFileSystem().delete(store.getLogDir(), true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
+ "disk space. Location: " + store.getLogDir().toString());
System.err.println(e.toString());
}
}
@Override
protected int doWork() {
try {
setUpProcedureStore();
writeWals();
storeRestart(new LoadCounter());
return EXIT_SUCCESS;
} catch (IOException e) {
e.printStackTrace();
return EXIT_FAILURE;
} finally {
tearDownProcedureStore();
}
}
public static void main(String[] args) throws IOException {
ProcedureWALLoaderPerformanceEvaluation tool = new ProcedureWALLoaderPerformanceEvaluation();
tool.setConf(UTIL.getConfiguration());
tool.run(args);
}
}

View File

@ -0,0 +1,267 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.util.*;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
// Command line options and defaults.
public static int DEFAULT_NUM_THREADS = 20;
public static Option NUM_THREADS_OPTION = new Option("threads", true,
"Number of parallel threads which will write insert/updates/deletes to WAL. Default: "
+ DEFAULT_NUM_THREADS);
public static int DEFAULT_NUM_PROCS = 1000000; // 1M
public static Option NUM_PROCS_OPTION = new Option("procs", true,
"Total number of procedures. Each procedure writes one insert and one update. Default: "
+ DEFAULT_NUM_PROCS);
public static int DEFAULT_NUM_WALS = 0;
public static Option NUM_WALS_OPTION = new Option("wals", true,
"Number of WALs to write. If -ve or 0, uses " + WALProcedureStore.ROLL_THRESHOLD_CONF_KEY +
" conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
public static int DEFAULT_STATE_SIZE = 1024; // 1KB
public static Option STATE_SIZE_OPTION = new Option("size", true,
"Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE
+ "bytes");
public static Option SYNC_OPTION = new Option("sync", true,
"Type of sync to use when writing WAL contents to file system. Accepted values: hflush, "
+ "hsync, nosync. Default: hflush");
public static String DEFAULT_SYNC_OPTION = "hflush";
public int numThreads;
public long numProcs;
public long numProcsPerWal = Long.MAX_VALUE; // never roll wall based on this value.
public int numWals;
public String syncType;
public int stateSize;
static byte[] serializedState;
private WALProcedureStore store;
/** Used by {@link Worker}. */
private AtomicLong procIds = new AtomicLong(0);
private AtomicBoolean workersFailed = new AtomicBoolean(false);
// Timeout for worker threads.
private static final int WORKER_THREADS_TIMEOUT_SEC = 600; // in seconds
// Non-default configurations.
private void setupConf() {
conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, "hsync".equals(syncType));
if (numWals > 0) {
conf.setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, Long.MAX_VALUE);
numProcsPerWal = numProcs / numWals;
}
}
private void setupProcedureStore() throws IOException {
Path testDir = UTIL.getDataTestDir();
FileSystem fs = testDir.getFileSystem(conf);
Path logDir = new Path(testDir, "proc-logs");
System.out.println("Logs directory : " + logDir.toString());
fs.delete(logDir, true);
if ("nosync".equals(syncType)) {
store = new NoSyncWalProcedureStore(conf, fs, logDir);
} else {
store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
}
store.start(numThreads);
store.recoverLease();
store.load(new ProcedureTestingUtility.LoadCounter());
System.out.println("Starting new log : "
+ store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
private void tearDownProcedureStore() {
store.stop(false);
try {
store.getFileSystem().delete(store.getLogDir(), true);
} catch (IOException e) {
System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up "
+ "disk space. Location: " + store.getLogDir().toString());
e.printStackTrace();
}
}
/**
* Processes and validates command line options.
*/
@Override
public void processOptions(CommandLine cmd) {
numThreads = getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
numProcs = getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
numWals = getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
assert "hsync".equals(syncType) || "hflush".equals(syncType) || "nosync".equals(syncType):
"sync argument can only accept one of these three values: hsync, hflush, nosync";
stateSize = getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
serializedState = new byte[stateSize];
setupConf();
}
@Override
public void addOptions() {
addOption(NUM_THREADS_OPTION);
addOption(NUM_PROCS_OPTION);
addOption(NUM_WALS_OPTION);
addOption(SYNC_OPTION);
addOption(STATE_SIZE_OPTION);
}
@Override
public int doWork() {
try {
setupProcedureStore();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
Future<Integer>[] futures = (Future<Integer>[]) new Object[numThreads];
// Start worker threads.
long start = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++) {
futures[i] = executor.submit(this.new Worker(start));
}
boolean failure = false;
try {
for (Future<Integer> future : futures) {
long timeout = start + WORKER_THREADS_TIMEOUT_SEC * 1000 - System.currentTimeMillis();
failure |= (future.get(timeout, TimeUnit.MILLISECONDS).equals(EXIT_FAILURE));
}
} catch (Exception e) {
System.err.println("Exception in worker thread.");
e.printStackTrace();
return EXIT_FAILURE;
}
executor.shutdown();
if (failure) {
return EXIT_FAILURE;
}
long timeTaken = System.currentTimeMillis() - start;
System.out.println("******************************************");
System.out.println("Num threads : " + numThreads);
System.out.println("Num procedures : " + numProcs);
System.out.println("Sync type : " + syncType);
System.out.println("Time taken : " + (timeTaken / 1000.0f) + "sec");
System.out.println("******************************************");
return EXIT_SUCCESS;
} catch (IOException e) {
e.printStackTrace();
return EXIT_FAILURE;
} finally {
tearDownProcedureStore();
}
}
///////////////////////////////
// HELPER CLASSES
///////////////////////////////
/**
* Callable to generate load for wal by inserting/deleting/updating procedures.
* If procedure store fails to roll log file (throws IOException), all threads quit, and at
* least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}.
*/
class Worker implements Callable<Integer> {
final long start;
public Worker(long start) {
this.start = start;
}
// TODO: Can also collect #procs, time taken by each thread to measure fairness.
@Override
public Integer call() throws IOException {
while (true) {
if (workersFailed.get()) {
return EXIT_FAILURE;
}
long procId = procIds.getAndIncrement();
if (procId >= numProcs) {
break;
}
if (procId != 0 && procId % 10000 == 0) {
long ms = System.currentTimeMillis() - start;
System.out.println("Wrote " + procId + " procedures in "
+ StringUtils.humanTimeDiff(ms));
}
try{
if (procId > 0 && procId % numProcsPerWal == 0) {
store.rollWriterForTesting();
System.out.println("Starting new log : "
+ store.getActiveLogs().get(store.getActiveLogs().size() - 1));
}
} catch (IOException ioe) {
// Ask other threads to quit too.
workersFailed.set(true);
System.err.println("Exception when rolling log file. Current procId = " + procId);
ioe.printStackTrace();
return EXIT_FAILURE;
}
ProcedureTestingUtility.TestProcedure proc =
new ProcedureTestingUtility.TestProcedure(procId);
proc.setData(serializedState);
store.insert(proc, null);
store.update(proc);
}
return EXIT_SUCCESS;
}
}
public class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs,
final Path logDir) {
super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
}
});
}
@Override
protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count)
throws IOException {
long totalSynced = 0;
for (int i = 0; i < count; ++i) {
totalSynced += slots[offset + i].size();
}
return totalSynced;
}
}
public static void main(String[] args) throws IOException {
ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
tool.setConf(UTIL.getConfiguration());
tool.run(args);
}
}