HBASE-4487 The increment operation can release the rowlock before sync-ing

the Hlog (dhruba borthakur)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1177815 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-09-30 20:22:22 +00:00
parent 1d621be994
commit 43f173cf44
7 changed files with 796 additions and 46 deletions

View File

@ -568,6 +568,8 @@ Release 0.92.0 - Unreleased
(Chris Trezzo via JD)
HBASE-2794 Utilize ROWCOL bloom filter if multiple columns within same family
are requested in a Get (Mikhail Bautin)
HBASE-4487 The increment operation can release the rowlock before sync-ing
the Hlog (dhruba borthakur)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -3526,6 +3526,7 @@ public class HRegion implements HeapSize { // , Writable{
List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
long now = EnvironmentEdgeManager.currentTimeMillis();
long size = 0;
long txid = 0;
// Lock row
startRegionOperation();
@ -3584,7 +3585,7 @@ public class HRegion implements HeapSize { // , Writable{
// Using default cluster id, as this can only happen in the orginating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
this.log.append(regionInfo, this.htableDescriptor.getName(),
txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
walEdits, HConstants.DEFAULT_CLUSTER_ID, now,
this.htableDescriptor);
}
@ -3595,6 +3596,9 @@ public class HRegion implements HeapSize { // , Writable{
this.updatesLock.readLock().unlock();
releaseRowLock(lid);
}
if (writeToWAL) {
this.log.sync(txid); // sync the transaction log outside the rowlock
}
} finally {
closeRegionOperation();
}
@ -3622,6 +3626,7 @@ public class HRegion implements HeapSize { // , Writable{
checkRow(row, "increment");
boolean flush = false;
boolean wrongLength = false;
long txid = 0;
// Lock row
long result = amount;
startRegionOperation();
@ -3665,7 +3670,7 @@ public class HRegion implements HeapSize { // , Writable{
// Using default cluster id, as this can only happen in the
// orginating cluster. A slave cluster receives the final value (not
// the delta) as a Put.
this.log.append(regionInfo, this.htableDescriptor.getName(),
txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(),
walEdit, HConstants.DEFAULT_CLUSTER_ID, now,
this.htableDescriptor);
}
@ -3682,6 +3687,9 @@ public class HRegion implements HeapSize { // , Writable{
this.updatesLock.readLock().unlock();
releaseRowLock(lid);
}
if (writeToWAL) {
this.log.sync(txid); // sync the transaction log outside the rowlock
}
} finally {
closeRegionOperation();
}

View File

@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedMap;
@ -130,6 +131,8 @@ public class HLog implements Syncable {
private final long optionalFlushInterval;
private final long blocksize;
private final String prefix;
private final AtomicLong unflushedEntries = new AtomicLong(0);
private volatile long syncedTillHere = 0;
private final Path oldLogDir;
private boolean logRollRunning;
@ -256,8 +259,9 @@ public class HLog implements Syncable {
private static volatile long writeOps;
private static volatile long writeTime;
// For measuring latency of syncs
private static volatile long syncOps;
private static volatile long syncTime;
private static AtomicLong syncOps = new AtomicLong();
private static AtomicLong syncTime = new AtomicLong();
private static AtomicLong syncBatchSize = new AtomicLong();
public static long getWriteOps() {
long ret = writeOps;
@ -272,15 +276,15 @@ public class HLog implements Syncable {
}
public static long getSyncOps() {
long ret = syncOps;
syncOps = 0;
return ret;
return syncOps.getAndSet(0);
}
public static long getSyncTime() {
long ret = syncTime;
syncTime = 0;
return ret;
return syncTime.getAndSet(0);
}
public static long getSyncBatchSize() {
return syncBatchSize.getAndSet(0);
}
/**
@ -795,6 +799,15 @@ public class HLog implements Syncable {
if (this.writer != null) {
// Close the current writer, get a new one.
try {
// Wait till all current transactions are written to the hlog.
// No new transactions can occur because we have the updatelock.
if (this.unflushedEntries.get() != this.syncedTillHere) {
LOG.debug("cleanupCurrentWriter " +
" waiting for transactions to get synced " +
" total " + this.unflushedEntries.get() +
" synced till here " + syncedTillHere);
sync();
}
this.writer.close();
closeErrorCount.set(0);
} catch (IOException e) {
@ -953,14 +966,17 @@ public class HLog implements Syncable {
* @param regionInfo
* @param logEdit
* @param logKey
* @param doSync shall we sync after writing the transaction
* @return The txid of this transaction
* @throws IOException
*/
public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
HTableDescriptor htd)
public long append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
HTableDescriptor htd, boolean doSync)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
long txid = 0;
synchronized (updateLock) {
long seqNum = obtainSeqNum();
logKey.setLogSeqNum(seqNum);
@ -972,16 +988,19 @@ public class HLog implements Syncable {
this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
Long.valueOf(seqNum));
doWrite(regionInfo, logKey, logEdit, htd);
txid = this.unflushedEntries.incrementAndGet();
this.numEntries.incrementAndGet();
}
// Sync if catalog region, and if not then check if that table supports
// deferred log flushing
if (regionInfo.isMetaRegion() ||
!htd.isDeferredLogFlush()) {
if (doSync &&
(regionInfo.isMetaRegion() ||
!htd.isDeferredLogFlush())) {
// sync txn to file system
this.sync();
this.sync(txid);
}
return txid;
}
/**
@ -1022,15 +1041,18 @@ public class HLog implements Syncable {
* @param edits
* @param clusterId The originating clusterId for this edit (for replication)
* @param now
* @param doSync shall we sync?
* @return txid of this transaction
* @throws IOException
*/
public void append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
final long now, HTableDescriptor htd)
private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
final long now, HTableDescriptor htd, boolean doSync)
throws IOException {
if (edits.isEmpty()) return;
if (edits.isEmpty()) return this.unflushedEntries.get();;
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
long txid = 0;
synchronized (this.updateLock) {
long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest
@ -1045,16 +1067,57 @@ public class HLog implements Syncable {
HLogKey logKey = makeKey(hriKey, tableName, seqNum, now, clusterId);
doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet();
txid = this.unflushedEntries.incrementAndGet();
}
// Sync if catalog region, and if not then check if that table supports
// deferred log flushing
if (info.isMetaRegion() ||
!htd.isDeferredLogFlush()) {
if (doSync &&
(info.isMetaRegion() ||
!htd.isDeferredLogFlush())) {
// sync txn to file system
this.sync();
this.sync(txid);
}
return txid;
}
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is not flushed
* after this transaction is written to the log.
*
* @param info
* @param tableName
* @param edits
* @param clusterId The originating clusterId for this edit (for replication)
* @param now
* @return txid of this transaction
* @throws IOException
*/
public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd)
throws IOException {
return append(info, tableName, edits, clusterId, now, htd, false);
}
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is flushed
* after this transaction is written to the log.
*
* @param info
* @param tableName
* @param edits
* @param clusterId The originating clusterId for this edit (for replication)
* @param now
* @return txid of this transaction
* @throws IOException
*/
public long append(HRegionInfo info, byte [] tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd)
throws IOException {
return append(info, tableName, edits, clusterId, now, htd, true);
}
/**
* This thread is responsible to call syncFs and buffer up the writers while
* it happens.
@ -1063,6 +1126,14 @@ public class HLog implements Syncable {
private final long optionalFlushInterval;
// List of pending writes to the HLog. There corresponds to transactions
// that have not yet returned to the client. We keep them cached here
// instead of writing them to HDFS piecemeal, because the HDFS write
// method is pretty heavyweight as far as locking is concerned. The
// goal is to increase the batchsize for writing-to-hdfs as well as
// sync-to-hdfs, so that we can get better system throughput.
private List<Entry> pendingWrites = new LinkedList<Entry>();
LogSyncer(long optionalFlushInterval) {
this.optionalFlushInterval = optionalFlushInterval;
}
@ -1075,7 +1146,9 @@ public class HLog implements Syncable {
while(!this.isInterrupted()) {
try {
Thread.sleep(this.optionalFlushInterval);
if (unflushedEntries.get() <= syncedTillHere) {
Thread.sleep(this.optionalFlushInterval);
}
sync();
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
@ -1088,38 +1161,85 @@ public class HLog implements Syncable {
LOG.info(getName() + " exiting");
}
}
}
private void syncer() throws IOException {
synchronized (this.updateLock) {
if (this.closed) {
return;
// appends new writes to the pendingWrites. It is better to keep it in
// our own queue rather than writing it to the HDFS output stream because
// HDFSOutputStream.writeChunk is not lightweight at all.
synchronized void append(Entry e) throws IOException {
pendingWrites.add(e);
}
// Returns all currently pending writes. New writes
// will accumulate in a new list.
synchronized List<Entry> getPendingWrites() {
List<Entry> save = this.pendingWrites;
this.pendingWrites = new LinkedList<Entry>();
return save;
}
// writes out pending entries to the HLog
void hlogFlush(Writer writer) throws IOException {
// Atomically fetch all existing pending writes. New writes
// will start accumulating in a new list.
List<Entry> pending = getPendingWrites();
// write out all accumulated Entries to hdfs.
for (Entry e : pending) {
writer.append(e);
}
}
}
// sync all known transactions
private void syncer() throws IOException {
syncer(this.unflushedEntries.get()); // sync all pending items
}
// sync all transactions upto the specified txid
private void syncer(long txid) throws IOException {
synchronized (this.updateLock) {
if (this.closed) return;
}
// if the transaction that we are interested in is already
// synced, then return immediately.
if (txid <= this.syncedTillHere) {
return;
}
try {
long doneUpto = this.unflushedEntries.get();
long now = System.currentTimeMillis();
// Done in parallel for all writer threads, thanks to HDFS-895
boolean syncSuccessful = true;
try {
// First flush all the pending writes to HDFS. Then
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
logSyncerThread.hlogFlush(this.writer);
this.writer.sync();
syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
this.syncedTillHere = doneUpto;
} catch(IOException io) {
syncSuccessful = false;
}
synchronized (this.updateLock) {
if (!syncSuccessful) {
if (!syncSuccessful) {
synchronized (this.updateLock) {
// HBASE-4387, retry with updateLock held
this.writer.sync();
}
syncTime += System.currentTimeMillis() - now;
syncOps++;
if (!this.logRollRunning) {
checkLowReplication();
if (this.writer.getLength() > this.logrollsize) {
requestLogRoll();
}
syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
this.syncedTillHere = doneUpto;
}
}
// We try to not acquire the updateLock just to update statistics.
// Make these statistics as AtomicLong.
syncTime.addAndGet(System.currentTimeMillis() - now);
syncOps.incrementAndGet();
if (!this.logRollRunning) {
checkLowReplication();
if (this.writer.getLength() > this.logrollsize) {
requestLogRoll();
}
}
} catch (IOException e) {
LOG.fatal("Could not sync. Requesting close of hlog", e);
requestLogRoll();
@ -1212,6 +1332,10 @@ public class HLog implements Syncable {
syncer();
}
public void sync(long txid) throws IOException {
syncer(txid);
}
private void requestLogRoll() {
if (!this.listeners.isEmpty()) {
for (WALActionsListener i: this.listeners) {
@ -1235,8 +1359,8 @@ public class HLog implements Syncable {
long now = System.currentTimeMillis();
// coprocessor hook:
if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) {
// if not bypassed:
this.writer.append(new HLog.Entry(logKey, logEdit));
// write to our buffer for the Hlog file.
logSyncerThread.append(new HLog.Entry(logKey, logEdit));
}
long took = System.currentTimeMillis() - now;
coprocessorHost.postWALWrite(info, logKey, logEdit);
@ -1357,18 +1481,20 @@ public class HLog implements Syncable {
if (this.closed) {
return;
}
long txid = 0;
synchronized (updateLock) {
long now = System.currentTimeMillis();
WALEdit edit = completeCacheFlushLogEdit();
HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
this.writer.append(new Entry(key, edit));
logSyncerThread.append(new Entry(key, edit));
txid = this.unflushedEntries.incrementAndGet();
writeTime += System.currentTimeMillis() - now;
writeOps++;
this.numEntries.incrementAndGet();
}
// sync txn to file system
this.sync();
this.sync(txid);
} finally {
// updateLock not needed for removing snapshot's entry

View File

@ -0,0 +1,265 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Test;
import com.google.common.collect.Lists;
/**
* Testing of HRegion.incrementColumnValue
*
*/
public class TestIncrement extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestIncrement.class);
HRegion region = null;
private final String DIR = HBaseTestingUtility.getTestDir() +
"/TestIncrement/";
private final int MAX_VERSIONS = 2;
// Test names
static final byte[] tableName = Bytes.toBytes("testtable");;
static final byte[] qual1 = Bytes.toBytes("qual1");
static final byte[] qual2 = Bytes.toBytes("qual2");
static final byte[] qual3 = Bytes.toBytes("qual3");
static final byte[] value1 = Bytes.toBytes("value1");
static final byte[] value2 = Bytes.toBytes("value2");
static final byte [] row = Bytes.toBytes("rowA");
static final byte [] row2 = Bytes.toBytes("rowB");
/**
* @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
*/
@Override
protected void setUp() throws Exception {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
EnvironmentEdgeManagerTestHelper.reset();
}
//////////////////////////////////////////////////////////////////////////////
// New tests that doesn't spin up a mini cluster but rather just test the
// individual code pieces in the HRegion.
//////////////////////////////////////////////////////////////////////////////
/**
* Test one increment command.
*/
public void testIncrementColumnValue() throws IOException {
LOG.info("Starting test testIncrementColumnValue");
initHRegion(tableName, getName(), fam1);
long value = 1L;
long amount = 3L;
Put put = new Put(row);
put.add(fam1, qual1, Bytes.toBytes(value));
region.put(put);
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
assertEquals(value+amount, result);
Store store = region.getStore(fam1);
// ICV removes any extra values floating around in there.
assertEquals(1, store.memstore.kvset.size());
assertTrue(store.memstore.snapshot.isEmpty());
assertICV(row, fam1, qual1, value+amount);
}
/**
* Test multi-threaded increments.
*/
public void testIncrementMultiThreads() throws IOException {
LOG.info("Starting test testIncrementMultiThreads");
initHRegion(tableName, getName(), fam1);
// create 100 threads, each will increment by its own quantity
int numThreads = 100;
int incrementsPerThread = 1000;
Incrementer[] all = new Incrementer[numThreads];
int expectedTotal = 0;
// create all threads
for (int i = 0; i < numThreads; i++) {
all[i] = new Incrementer(region, i, i, incrementsPerThread);
expectedTotal += (i * incrementsPerThread);
}
// run all threads
for (int i = 0; i < numThreads; i++) {
all[i].start();
}
// wait for all threads to finish
for (int i = 0; i < numThreads; i++) {
try {
all[i].join();
} catch (InterruptedException e) {
}
}
assertICV(row, fam1, qual1, expectedTotal);
LOG.info("testIncrementMultiThreads successfully verified that total is " +
expectedTotal);
}
private void assertICV(byte [] row,
byte [] familiy,
byte[] qualifier,
long amount) throws IOException {
// run a get and see?
Get get = new Get(row);
get.addColumn(familiy, qualifier);
Result result = region.get(get, null);
assertEquals(1, result.size());
KeyValue kv = result.raw()[0];
long r = Bytes.toLong(kv.getValue());
assertEquals(amount, r);
}
private void initHRegion (byte [] tableName, String callingMethod,
byte[] ... families)
throws IOException {
initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families);
}
private void initHRegion (byte [] tableName, String callingMethod,
Configuration conf, byte [] ... families)
throws IOException{
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false);
Path path = new Path(DIR + callingMethod);
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
throw new IOException("Failed delete of " + path);
}
}
region = HRegion.createHRegion(info, path, conf, htd);
}
/**
* A thread that makes a few increment calls
*/
public static class Incrementer extends Thread {
private final HRegion region;
private final int threadNumber;
private final int numIncrements;
private final int amount;
private int count;
public Incrementer(HRegion region,
int threadNumber, int amount, int numIncrements) {
this.region = region;
this.threadNumber = threadNumber;
this.numIncrements = numIncrements;
this.count = 0;
this.amount = amount;
setDaemon(true);
}
@Override
public void run() {
for (int i=0; i<numIncrements; i++) {
try {
long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
// LOG.info("thread:" + threadNumber + " iter:" + i);
} catch (IOException e) {
e.printStackTrace();
}
count++;
}
}
}
}

View File

@ -0,0 +1,349 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.Random;
import java.text.NumberFormat;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.log4j.Level;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHLogBench extends Configured implements Tool {
static final Log LOG = LogFactory.getLog(TestHLogBench.class);
private static final Random r = new Random();
private static final byte [] FAMILY = Bytes.toBytes("hlogbenchFamily");
// accumulate time here
private static int totalTime = 0;
private static Object lock = new Object();
// the file system where to create the Hlog file
protected FileSystem fs;
// the number of threads and the number of iterations per thread
private int numThreads = 300;
private int numIterationsPerThread = 10000;
private Path regionRootDir = new Path(HBaseTestingUtility.getTestDir() +
"/TestHLogBench/");
private boolean appendNoSync = false;
public TestHLogBench() {
this(null);
}
private TestHLogBench(Configuration conf) {
super(conf);
fs = null;
}
/**
* Initialize file system object
*/
public void init() throws IOException {
getConf().setQuietMode(true);
if (this.fs == null) {
this.fs = FileSystem.get(getConf());
}
}
/**
* Close down file system
*/
public void close() throws IOException {
if (fs != null) {
fs.close();
fs = null;
}
}
/**
* The main run method of TestHLogBench
*/
public int run(String argv[]) throws Exception {
int exitCode = -1;
int i = 0;
// verify that we have enough command line parameters
if (argv.length < 4) {
printUsage("");
return exitCode;
}
// initialize LogBench
try {
init();
} catch (HBaseRPC.VersionMismatch v) {
LOG.warn("Version Mismatch between client and server" +
"... command aborted.");
return exitCode;
} catch (IOException e) {
LOG.warn("Bad connection to FS. command aborted.");
return exitCode;
}
try {
for (; i < argv.length; i++) {
if ("-numThreads".equals(argv[i])) {
i++;
this.numThreads = Integer.parseInt(argv[i]);
} else if ("-numIterationsPerThread".equals(argv[i])) {
i++;
this.numIterationsPerThread = Integer.parseInt(argv[i]);
} else if ("-path".equals(argv[i])) {
// get an absolute path using the default file system
i++;
this.regionRootDir = new Path(argv[i]);
this.regionRootDir = regionRootDir.makeQualified(this.fs);
} else if ("-nosync".equals(argv[i])) {
this.appendNoSync = true;
} else {
printUsage(argv[i]);
return exitCode;
}
}
} catch (NumberFormatException nfe) {
LOG.warn("Illegal numThreads or numIterationsPerThread, " +
" a positive integer expected");
throw nfe;
}
go();
return 0;
}
private void go() throws IOException, InterruptedException {
long start = System.currentTimeMillis();
log("Running TestHLogBench with " + numThreads + " threads each doing " +
numIterationsPerThread + " HLog appends " +
(appendNoSync ? "nosync" : "sync") +
" at rootDir " + regionRootDir);
// Mock an HRegion
byte [] tableName = Bytes.toBytes("table");
byte [][] familyNames = new byte [][] { FAMILY };
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(Bytes.toBytes("f1")));
HRegion region = mockRegion(tableName, familyNames, regionRootDir);
HLog hlog = region.getLog();
// Spin up N threads to each perform M log operations
LogWriter [] incrementors = new LogWriter[numThreads];
for (int i=0; i<numThreads; i++) {
incrementors[i] = new LogWriter(region, tableName, hlog, i,
numIterationsPerThread,
appendNoSync);
incrementors[i].start();
}
// Wait for threads to finish
for (int i=0; i<numThreads; i++) {
//log("Waiting for #" + i + " to finish");
incrementors[i].join();
}
// Output statistics
long totalOps = numThreads * numIterationsPerThread;
log("Operations per second " + ((totalOps * 1000L)/totalTime));
log("Average latency in ms " + ((totalTime * 1000L)/totalOps));
}
/**
* Displays format of commands.
*/
private static void printUsage(String cmd) {
String prefix = "Usage: java " + TestHLogBench.class.getSimpleName();
System.err.println(prefix + cmd +
" [-numThreads <number>] " +
" [-numIterationsPerThread <number>] " +
" [-path <path where region's root directory is created>]" +
" [-nosync]");
}
/**
* A thread that writes data to an HLog
*/
public static class LogWriter extends Thread {
private final HRegion region;
private final int threadNumber;
private final int numIncrements;
private final HLog hlog;
private boolean appendNoSync;
private byte[] tableName;
private int count;
public LogWriter(HRegion region, byte[] tableName,
HLog log, int threadNumber,
int numIncrements, boolean appendNoSync) {
this.region = region;
this.threadNumber = threadNumber;
this.numIncrements = numIncrements;
this.hlog = log;
this.count = 0;
this.appendNoSync = appendNoSync;
this.tableName = tableName;
setDaemon(true);
//log("LogWriter[" + threadNumber + "] instantiated");
}
@Override
public void run() {
long now = System.currentTimeMillis();
byte [] key = Bytes.toBytes("thisisakey");
KeyValue kv = new KeyValue(key, now);
WALEdit walEdit = new WALEdit();
walEdit.add(kv);
HRegionInfo hri = region.getRegionInfo();
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(Bytes.toBytes("f1")));
boolean isMetaRegion = false;
long start = System.currentTimeMillis();
for (int i=0; i<numIncrements; i++) {
try {
if (appendNoSync) {
hlog.appendNoSync(hri, tableName, walEdit,
HConstants.DEFAULT_CLUSTER_ID, now, htd);
} else {
hlog.append(hri, tableName, walEdit, now, htd);
}
} catch (IOException e) {
log("Fatal exception: " + e);
e.printStackTrace();
}
count++;
}
long tot = System.currentTimeMillis() - start;
synchronized (lock) {
totalTime += tot; // update global statistics
}
}
}
private static void log(String string) {
LOG.info(string);
}
private byte[][] makeBytes(int numArrays, int arraySize) {
byte [][] bytes = new byte[numArrays][];
for (int i=0; i<numArrays; i++) {
bytes[i] = new byte[arraySize];
r.nextBytes(bytes[i]);
}
return bytes;
}
/**
* Create a dummy region
*/
private HRegion mockRegion(byte[] tableName, byte[][] familyNames,
Path rootDir) throws IOException {
HBaseTestingUtility htu = new HBaseTestingUtility();
Configuration conf = htu.getConfiguration();
conf.setBoolean("hbase.rs.cacheblocksonwrite", true);
conf.setBoolean("hbase.hregion.use.incrementnew", true);
conf.setBoolean("dfs.support.append", true);
FileSystem fs = FileSystem.get(conf);
int numQualifiers = 10;
byte [][] qualifiers = new byte [numQualifiers][];
for (int i=0; i<numQualifiers; i++) qualifiers[i] = Bytes.toBytes("qf" + i);
int numRows = 10;
byte [][] rows = new byte [numRows][];
for (int i=0; i<numRows; i++) rows[i] = Bytes.toBytes("r" + i);
// switch off debug message from Region server
((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.WARN);
HTableDescriptor htd = new HTableDescriptor(tableName);
for (byte [] family : familyNames)
htd.addFamily(new HColumnDescriptor(family));
HRegionInfo hri = new HRegionInfo(tableName, Bytes.toBytes(0L),
Bytes.toBytes(0xffffffffL));
if (fs.exists(rootDir)) {
if (!fs.delete(rootDir, true)) {
throw new IOException("Failed delete of " + rootDir);
}
}
return HRegion.createHRegion(hri, rootDir, conf, htd);
}
@Test
public void testLogPerformance() throws Exception {
TestHLogBench bench = new TestHLogBench();
int res;
String[] argv = new String[7];
argv[0] = "-numThreads";
argv[1] = Integer.toString(100);
argv[2] = "-numIterationsPerThread";
argv[3] = Integer.toString(1000);
argv[4] = "-path";
argv[5] = HBaseTestingUtility.getTestDir() + "/HlogPerformance";
argv[6] = "-nosync";
try {
res = ToolRunner.run(bench, argv);
} finally {
bench.close();
}
}
public static void main(String[] argv) throws Exception {
TestHLogBench bench = new TestHLogBench();
int res;
try {
res = ToolRunner.run(bench, argv);
} finally {
bench.close();
}
System.exit(res);
}
}

View File

@ -96,7 +96,7 @@ public class TestWALActionsListener {
htd.addFamily(new HColumnDescriptor(b));
HLogKey key = new HLogKey(b,b, 0, 0, HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
hlog.append(hri, key, edit, htd, true);
if (i == 10) {
hlog.registerWALActionsListener(laterobserver);
}

View File

@ -174,7 +174,7 @@ public class TestReplicationSourceManager {
LOG.info(i);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
hlog.append(hri, key, edit, htd, true);
}
// Simulate a rapid insert that's followed
@ -187,7 +187,7 @@ public class TestReplicationSourceManager {
for (int i = 0; i < 3; i++) {
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
hlog.append(hri, key, edit, htd, true);
}
assertEquals(6, manager.getHLogs().get(slaveId).size());
@ -199,7 +199,7 @@ public class TestReplicationSourceManager {
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
hlog.append(hri, key, edit, htd, true);
assertEquals(1, manager.getHLogs().size());