HADOOP-2139 (phase 1) Increase parallelism in region servers

There are a lot of changes in this patch. The memcache has been changed from a per/region object to a per/column object, and HLocking has been removed since we do not have to maintain any locks across RPC calls.

This necessitated major changes to HRegion and HStore

Additionally there were many changes required to the unit tests since they tend to exploit some private interfaces that weren't designed to be public. Some of those interfaces changed so the test cases did as well.

This patch is the result of extensive analysis of the multiple threads in HBase that contend for shared resources: updates, reads, scanners, cache flushing, compaction and region splitting.

Many of the tests are timing sensitive, and since we tend to make "dormant" intervals as short as possible to speed up the Hudson build, we may go through several iterations of getting them right before Hudson is happy. This is especially true since two test cases failed on my dual cpu windows machine while running the tests under Ant, but ran fine under Eclipse.

However, now that the tests are passing locally, I believe the changes are doing the right thing, but may require some parameter tweaks.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@596835 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-11-20 21:53:30 +00:00
parent 025ac21ab1
commit 8a5b3da292
32 changed files with 2105 additions and 2414 deletions

View File

@ -40,6 +40,7 @@ Trunk (unreleased changes)
HADOOP-2126 Use Bob Jenkins' hash for bloom filters
HADOOP-2157 Make Scanners implement Iterable
HADOOP-2176 Htable.deleteAll documentation is ambiguous
HADOOP-2139 (phase 1) Increase parallelism in region servers.
Release 0.15.1
Branch 0.15

View File

@ -212,6 +212,9 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
*/
public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
throws IOException {
if (scannerClosed) {
return false;
}
// Find the next row label (and timestamp)
Text chosenRow = null;
long chosenTimestamp = -1;
@ -277,6 +280,7 @@ public abstract class HAbstractScanner implements HInternalScannerInterface {
return insertedItem;
}
/** {@inheritDoc} */
public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
throw new UnsupportedOperationException("Unimplemented serverside. " +
"next(HStoreKey, StortedMap(...) is more efficient");

View File

@ -43,7 +43,8 @@ public interface HConstants {
/** default host address */
static final String DEFAULT_HOST = "0.0.0.0";
/** default port that the master listens on */
static final int DEFAULT_MASTER_PORT = 60000;
/** Default master address */
@ -164,7 +165,7 @@ public interface HConstants {
* commit.
*/
static final long LATEST_TIMESTAMP = Long.MAX_VALUE;
/**
* Define for 'return-all-versions'.
*/

View File

@ -1,107 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.util.concurrent.atomic.AtomicInteger;
/**
* HLocking is a set of lock primitives that does not rely on a
* particular thread holding the monitor for an object. This is
* especially important when a lock must persist over multiple RPC's
* since there is no guarantee that the same Server thread will handle
* all the RPC's until the lock is released. Not requiring that the locker
* thread is same as unlocking thread is the key distinction between this
* class and {@link java.util.concurrent.locks.ReentrantReadWriteLock}.
*
* <p>For each independent entity that needs locking, create a new HLocking
* instance.
*/
public class HLocking {
private Integer mutex;
// If lockers == 0, the lock is unlocked
// If lockers > 0, locked for read
// If lockers == -1 locked for write
private AtomicInteger lockers;
/** Constructor */
public HLocking() {
this.mutex = new Integer(0);
this.lockers = new AtomicInteger(0);
}
/**
* Caller needs the no-nexclusive read-lock
*/
public void obtainReadLock() {
synchronized(mutex) {
while(lockers.get() < 0) {
try {
mutex.wait();
} catch(InterruptedException ie) {
// continue
}
}
lockers.incrementAndGet();
mutex.notifyAll();
}
}
/**
* Caller is finished with the non-exclusive read-lock
*/
public void releaseReadLock() {
synchronized(mutex) {
if(lockers.decrementAndGet() < 0) {
throw new IllegalStateException("lockers: " + lockers);
}
mutex.notifyAll();
}
}
/**
* Caller needs the exclusive write-lock
*/
public void obtainWriteLock() {
synchronized(mutex) {
while(!lockers.compareAndSet(0, -1)) {
try {
mutex.wait();
} catch (InterruptedException ie) {
// continue
}
}
mutex.notifyAll();
}
}
/**
* Caller is finished with the write lock
*/
public void releaseWriteLock() {
synchronized(mutex) {
if(!lockers.compareAndSet(-1, 0)) {
throw new IllegalStateException("lockers: " + lockers);
}
mutex.notifyAll();
}
}
}

View File

@ -409,13 +409,13 @@ public class HLog implements HConstants {
* @param timestamp
* @throws IOException
*/
synchronized void append(Text regionName, Text tableName, Text row,
TreeMap<Text, byte[]> columns, long timestamp)
throws IOException {
synchronized void append(Text regionName, Text tableName,
TreeMap<HStoreKey, byte[]> edits) throws IOException {
if (closed) {
throw new IOException("Cannot append; log is closed");
}
long seqNum[] = obtainSeqNum(columns.size());
long seqNum[] = obtainSeqNum(edits.size());
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
@ -424,10 +424,12 @@ public class HLog implements HConstants {
this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
}
int counter = 0;
for (Map.Entry<Text, byte[]> es : columns.entrySet()) {
for (Map.Entry<HStoreKey, byte[]> es : edits.entrySet()) {
HStoreKey key = es.getKey();
HLogKey logKey =
new HLogKey(regionName, tableName, row, seqNum[counter++]);
HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
HLogEdit logEdit =
new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
this.writer.append(logKey, logEdit);
this.numEntries++;
}

View File

@ -47,7 +47,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
@ -2468,10 +2467,12 @@ HMasterRegionInterface {
// for the table we want to create already exists, then table already
// created. Throw already-exists exception.
MetaRegion m = (onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
onlineMetaRegions.get(newRegion.getRegionName()) :
onlineMetaRegions.get(onlineMetaRegions.headMap(
newRegion.getTableDesc().getName()).lastKey()));
MetaRegion m = (onlineMetaRegions.size() == 1 ?
onlineMetaRegions.get(onlineMetaRegions.firstKey()) :
(onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
onlineMetaRegions.get(newRegion.getRegionName()) :
onlineMetaRegions.get(onlineMetaRegions.headMap(
newRegion.getTableDesc().getName()).lastKey())));
Text metaRegionName = m.getRegionName();
HRegionInterface server = connection.getHRegionConnection(m.getServer());

View File

@ -1,490 +0,0 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
/**
* The HMemcache holds in-memory modifications to the HRegion. This is really a
* wrapper around a TreeMap that helps us when staging the Memcache out to disk.
*/
public class HMemcache {
static final Log LOG = LogFactory.getLog(HMemcache.class);
// Note that since these structures are always accessed with a lock held,
// no additional synchronization is required.
volatile SortedMap<HStoreKey, byte []> memcache;
List<SortedMap<HStoreKey, byte []>> history =
Collections.synchronizedList(new ArrayList<SortedMap<HStoreKey, byte []>>());
volatile SortedMap<HStoreKey, byte []> snapshot = null;
final HLocking lock = new HLocking();
/*
* Approximate size in bytes of the payload carried by this memcache.
* Does not consider deletes nor adding again on same key.
*/
private AtomicLong size = new AtomicLong(0);
/**
* Constructor
*/
public HMemcache() {
super();
memcache =
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
}
/** represents the state of the memcache at a specified point in time */
static class Snapshot {
final SortedMap<HStoreKey, byte []> memcacheSnapshot;
final long sequenceId;
Snapshot(final SortedMap<HStoreKey, byte[]> memcache, final Long i) {
super();
this.memcacheSnapshot = memcache;
this.sequenceId = i.longValue();
}
}
/**
* Returns a snapshot of the current HMemcache with a known HLog
* sequence number at the same time.
*
* We need to prevent any writing to the cache during this time,
* so we obtain a write lock for the duration of the operation.
*
* <p>If this method returns non-null, client must call
* {@link #deleteSnapshot()} to clear 'snapshot-in-progress'
* state when finished with the returned {@link Snapshot}.
*
* @return frozen HMemcache TreeMap and HLog sequence number.
*/
Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
this.lock.obtainWriteLock();
try {
if(snapshot != null) {
throw new IOException("Snapshot in progress!");
}
// If no entries in memcache.
if(memcache.size() == 0) {
return null;
}
Snapshot retval =
new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
// From here on, any failure is catastrophic requiring replay of hlog
this.snapshot = memcache;
synchronized (history) {
history.add(memcache);
}
memcache =
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
// Reset size of this memcache.
this.size.set(0);
return retval;
} finally {
this.lock.releaseWriteLock();
}
}
/**
* Delete the snapshot, remove from history.
*
* Modifying the structure means we need to obtain a writelock.
* @throws IOException
*/
public void deleteSnapshot() throws IOException {
this.lock.obtainWriteLock();
try {
if(snapshot == null) {
throw new IOException("Snapshot not present!");
}
synchronized (history) {
history.remove(snapshot);
}
this.snapshot = null;
} finally {
this.lock.releaseWriteLock();
}
}
/**
* Store a value.
* Operation uses a write lock.
* @param row
* @param columns
* @param timestamp
*/
public void add(final Text row, final TreeMap<Text, byte []> columns,
final long timestamp) {
this.lock.obtainWriteLock();
try {
for (Map.Entry<Text, byte []> es: columns.entrySet()) {
HStoreKey key = new HStoreKey(row, es.getKey(), timestamp);
byte [] value = es.getValue();
this.size.addAndGet(key.getSize());
this.size.addAndGet(((value == null)? 0: value.length));
memcache.put(key, value);
}
} finally {
this.lock.releaseWriteLock();
}
}
/**
* @return Approximate size in bytes of payload carried by this memcache.
* Does not take into consideration deletes nor adding again on same key.
*/
public long getSize() {
return this.size.get();
}
/**
* Look back through all the backlog TreeMaps to find the target.
* @param key
* @param numVersions
* @return An array of byte arrays ordered by timestamp.
*/
public byte [][] get(final HStoreKey key, final int numVersions) {
this.lock.obtainReadLock();
try {
ArrayList<byte []> results = get(memcache, key, numVersions);
synchronized (history) {
for (int i = history.size() - 1; i >= 0; i--) {
if (numVersions > 0 && results.size() >= numVersions) {
break;
}
results.addAll(results.size(),
get(history.get(i), key, numVersions - results.size()));
}
}
return (results.size() == 0) ? null :
ImmutableBytesWritable.toArray(results);
} finally {
this.lock.releaseReadLock();
}
}
/**
* Return all the available columns for the given key. The key indicates a
* row and timestamp, but not a column name.
*
* The returned object should map column names to byte arrays (byte[]).
* @param key
* @return All columns for given key.
*/
public TreeMap<Text, byte []> getFull(HStoreKey key) {
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
this.lock.obtainReadLock();
try {
internalGetFull(memcache, key, results);
synchronized (history) {
for (int i = history.size() - 1; i >= 0; i--) {
SortedMap<HStoreKey, byte []> cur = history.get(i);
internalGetFull(cur, key, results);
}
}
return results;
} finally {
this.lock.releaseReadLock();
}
}
void internalGetFull(SortedMap<HStoreKey, byte []> map, HStoreKey key,
TreeMap<Text, byte []> results) {
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
Text itCol = itKey.getColumn();
if (results.get(itCol) == null
&& key.matchesWithoutColumn(itKey)) {
byte [] val = tailMap.get(itKey);
results.put(itCol, val);
} else if (key.getRow().compareTo(itKey.getRow()) < 0) {
break;
}
}
}
/**
* Examine a single map for the desired key.
*
* We assume that all locking is done at a higher-level. No locking within
* this method.
*
* TODO - This is kinda slow. We need a data structure that allows for
* proximity-searches, not just precise-matches.
*
* @param map
* @param key
* @param numVersions
* @return Ordered list of items found in passed <code>map</code>. If no
* matching values, returns an empty list (does not return null).
*/
ArrayList<byte []> get(final SortedMap<HStoreKey, byte []> map,
final HStoreKey key, final int numVersions) {
ArrayList<byte []> result = new ArrayList<byte []>();
// TODO: If get is of a particular version -- numVersions == 1 -- we
// should be able to avoid all of the tailmap creations and iterations
// below.
HStoreKey curKey = new HStoreKey(key);
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(curKey);
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
if (!HLogEdit.isDeleted(es.getValue())) {
result.add(tailMap.get(itKey));
curKey.setVersion(itKey.getTimestamp() - 1);
}
}
if (numVersions > 0 && result.size() >= numVersions) {
break;
}
}
return result;
}
/**
* Get <code>versions</code> keys matching the origin key's
* row/column/timestamp and those of an older vintage
* Default access so can be accessed out of {@link HRegionServer}.
* @param origin Where to start searching.
* @param versions How many versions to return. Pass
* {@link HConstants.ALL_VERSIONS} to retrieve all.
* @return Ordered list of <code>versions</code> keys going from newest back.
* @throws IOException
*/
List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
this.lock.obtainReadLock();
try {
List<HStoreKey> results = getKeys(this.memcache, origin, versions);
synchronized (history) {
for (int i = history.size() - 1; i >= 0; i--) {
results.addAll(results.size(), getKeys(history.get(i), origin,
versions == HConstants.ALL_VERSIONS ? versions :
(versions - results.size())));
}
}
return results;
} finally {
this.lock.releaseReadLock();
}
}
/*
* @param origin Where to start searching.
* @param versions How many versions to return. Pass
* {@link HConstants.ALL_VERSIONS} to retrieve all.
* @return List of all keys that are of the same row and column and of
* equal or older timestamp. If no keys, returns an empty List. Does not
* return null.
*/
private List<HStoreKey> getKeys(final SortedMap<HStoreKey, byte []> map,
final HStoreKey origin, final int versions) {
List<HStoreKey> result = new ArrayList<HStoreKey>();
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey key = es.getKey();
if (!key.matchesRowCol(origin)) {
break;
}
if (!HLogEdit.isDeleted(es.getValue())) {
result.add(key);
if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) {
// We have enough results. Return.
break;
}
}
}
return result;
}
/**
* @param key
* @return True if an entry and its content is {@link HGlobals.deleteBytes}.
* Use checking values in store. On occasion the memcache has the fact that
* the cell has been deleted.
*/
boolean isDeleted(final HStoreKey key) {
return HLogEdit.isDeleted(this.memcache.get(key));
}
/**
* Return a scanner over the keys in the HMemcache
*/
HInternalScannerInterface getScanner(long timestamp,
Text targetCols[], Text firstRow) throws IOException {
return new HMemcacheScanner(timestamp, targetCols, firstRow);
}
//////////////////////////////////////////////////////////////////////////////
// HMemcacheScanner implements the HScannerInterface.
// It lets the caller scan the contents of the Memcache.
//////////////////////////////////////////////////////////////////////////////
class HMemcacheScanner extends HAbstractScanner {
SortedMap<HStoreKey, byte []> backingMaps[];
final Iterator<HStoreKey> keyIterators[];
@SuppressWarnings("unchecked")
HMemcacheScanner(final long timestamp, final Text targetCols[],
final Text firstRow) throws IOException {
super(timestamp, targetCols);
lock.obtainReadLock();
try {
synchronized (history) {
this.backingMaps = new SortedMap[history.size() + 1];
// Note that since we iterate through the backing maps from 0 to n, we
// need to put the memcache first, the newest history second, ..., etc.
backingMaps[0] = memcache;
for (int i = history.size() - 1; i >= 0; i--) {
backingMaps[i + 1] = history.get(i);
}
}
this.keyIterators = new Iterator[backingMaps.length];
this.keys = new HStoreKey[backingMaps.length];
this.vals = new byte[backingMaps.length][];
// Generate list of iterators
HStoreKey firstKey = new HStoreKey(firstRow);
for (int i = 0; i < backingMaps.length; i++) {
if (firstRow != null && firstRow.getLength() != 0) {
keyIterators[i] =
backingMaps[i].tailMap(firstKey).keySet().iterator();
} else {
keyIterators[i] = backingMaps[i].keySet().iterator();
}
while (getNext(i)) {
if (!findFirstRow(i, firstRow)) {
continue;
}
if (columnMatch(i)) {
break;
}
}
}
} catch (RuntimeException ex) {
LOG.error("error initializing HMemcache scanner: ", ex);
close();
IOException e = new IOException("error initializing HMemcache scanner");
e.initCause(ex);
throw e;
} catch(IOException ex) {
LOG.error("error initializing HMemcache scanner: ", ex);
close();
throw ex;
}
}
/**
* The user didn't want to start scanning at the first row. This method
* seeks to the requested row.
*
* @param i which iterator to advance
* @param firstRow seek to this row
* @return true if this is the first row
*/
@Override
boolean findFirstRow(int i, Text firstRow) {
return firstRow.getLength() == 0 ||
keys[i].getRow().compareTo(firstRow) >= 0;
}
/**
* Get the next value from the specified iterator.
*
* @param i Which iterator to fetch next value from
* @return true if there is more data available
*/
@Override
boolean getNext(int i) {
boolean result = false;
while (true) {
if (!keyIterators[i].hasNext()) {
closeSubScanner(i);
break;
}
// Check key is < than passed timestamp for this scanner.
HStoreKey hsk = keyIterators[i].next();
if (hsk == null) {
throw new NullPointerException("Unexpected null key");
}
if (hsk.getTimestamp() <= this.timestamp) {
this.keys[i] = hsk;
this.vals[i] = backingMaps[i].get(keys[i]);
result = true;
break;
}
}
return result;
}
/** Shut down an individual map iterator. */
@Override
void closeSubScanner(int i) {
keyIterators[i] = null;
keys[i] = null;
vals[i] = null;
backingMaps[i] = null;
}
/** Shut down map iterators, and release the lock */
public void close() {
if (!scannerClosed) {
try {
for (int i = 0; i < keys.length; i++) {
if(keyIterators[i] != null) {
closeSubScanner(i);
}
}
} finally {
lock.releaseReadLock();
scannerClosed = true;
}
}
}
}
}

View File

@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.TreeMap;
import java.util.TreeSet;
@ -34,6 +35,8 @@ import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.BatchUpdate;
/**
* A non-instantiable class that has a static method capable of compacting
* a table by merging adjacent regions that have grown too small.
@ -41,6 +44,7 @@ import org.apache.hadoop.io.Text;
class HMerge implements HConstants {
static final Log LOG = LogFactory.getLog(HMerge.class);
static final Text[] META_COLS = {COL_REGIONINFO};
static final Random rand = new Random();
private HMerge() {
// Not instantiable
@ -366,53 +370,30 @@ class HMerge implements HConstants {
oldRegion2
};
for(int r = 0; r < regionsToDelete.length; r++) {
long lockid = -1L;
try {
lockid = root.startUpdate(regionsToDelete[r]);
root.delete(lockid, COL_REGIONINFO);
root.delete(lockid, COL_SERVER);
root.delete(lockid, COL_STARTCODE);
root.commit(lockid, System.currentTimeMillis());
lockid = -1L;
long lockid = Math.abs(rand.nextLong());
BatchUpdate b = new BatchUpdate(lockid);
lockid = b.startUpdate(regionsToDelete[r]);
b.delete(lockid, COL_REGIONINFO);
b.delete(lockid, COL_SERVER);
b.delete(lockid, COL_STARTCODE);
root.batchUpdate(System.currentTimeMillis(), b);
lockid = -1L;
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + regionsToDelete[r]);
}
} finally {
try {
if(lockid != -1L) {
root.abort(lockid);
}
} catch(IOException iex) {
LOG.error(iex);
}
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + regionsToDelete[r]);
}
}
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteValue);
newRegion.getRegionInfo().setOffline(true);
newRegion.getRegionInfo().write(s);
long lockid = -1L;
try {
lockid = root.startUpdate(newRegion.getRegionName());
root.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
root.commit(lockid, System.currentTimeMillis());
lockid = -1L;
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: "
+ newRegion.getRegionName());
}
} finally {
try {
if(lockid != -1L) {
root.abort(lockid);
}
} catch(IOException iex) {
LOG.error(iex);
}
long lockid = Math.abs(rand.nextLong());
BatchUpdate b = new BatchUpdate(lockid);
lockid = b.startUpdate(newRegion.getRegionName());
b.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
root.batchUpdate(System.currentTimeMillis(), b);
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + newRegion.getRegionName());
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.FSUtils;
@ -194,15 +193,12 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
List<HRegion> nonClosedRegionsToCheck = getRegionsToCheck();
for(HRegion cur: nonClosedRegionsToCheck) {
try {
if (cur.needsCompaction()) {
cur.compactStores();
}
// After compaction, it probably needs splitting. May also need
// splitting just because one of the memcache flushes was big.
Text midKey = new Text();
if (cur.needsSplit(midKey)) {
split(cur, midKey);
if (cur.compactIfNeeded()) {
// After compaction, it probably needs splitting. May also need
// splitting just because one of the memcache flushes was big.
split(cur);
}
} catch(IOException e) {
//TODO: What happens if this fails? Are we toast?
LOG.error("Split or compaction failed", e);
@ -213,10 +209,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}
}
private void split(final HRegion region, final Text midKey)
throws IOException {
private void split(final HRegion region) throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.closeAndSplit(midKey, this);
final HRegion[] newRegions = region.splitRegion(this);
if (newRegions == null) {
return; // Didn't need to be split
}
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
@ -302,7 +301,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Flush them, if necessary
for(HRegion cur: nonClosedRegionsToFlush) {
try {
cur.optionallyFlush();
cur.flushcache();
} catch (DroppedSnapshotException e) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog
@ -1046,7 +1045,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try {
HRegion region = getRegion(regionName);
MapWritable result = new MapWritable();
TreeMap<Text, byte[]> map = region.getFull(row);
Map<Text, byte[]> map = region.getFull(row);
for (Map.Entry<Text, byte []> es: map.entrySet()) {
result.put(new HStoreKey(row, es.getKey()),
new ImmutableBytesWritable(es.getValue()));
@ -1100,46 +1099,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/** {@inheritDoc} */
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
throws IOException {
throws IOException {
checkOpen();
this.requestCount.incrementAndGet();
// If timestamp == LATEST_TIMESTAMP and we have deletes, then they need
// special treatment. For these we need to first find the latest cell so
// when we write the delete, we write it with the latest cells' timestamp
// so the delete record overshadows. This means deletes and puts do not
// happen within the same row lock.
List<Text> deletes = null;
HRegion region = getRegion(regionName);
try {
long lockid = startUpdate(regionName, b.getRow());
for (BatchOperation op: b) {
switch(op.getOp()) {
case PUT:
put(regionName, lockid, op.getColumn(), op.getValue());
break;
case DELETE:
if (timestamp == LATEST_TIMESTAMP) {
// Save off these deletes.
if (deletes == null) {
deletes = new ArrayList<Text>();
}
deletes.add(op.getColumn());
} else {
delete(regionName, lockid, op.getColumn());
}
break;
}
}
commit(regionName, lockid,
(timestamp == LATEST_TIMESTAMP)? System.currentTimeMillis(): timestamp);
if (deletes != null && deletes.size() > 0) {
// We have some LATEST_TIMESTAMP deletes to run.
HRegion r = getRegion(regionName);
for (Text column: deletes) {
r.deleteMultiple(b.getRow(), column, LATEST_TIMESTAMP, 1);
}
}
region.batchUpdate(timestamp, b);
} catch (IOException e) {
checkFileSystem();
throw e;
@ -1234,24 +1200,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Methods that do the actual work for the remote API
//
protected long startUpdate(Text regionName, Text row) throws IOException {
HRegion region = getRegion(regionName);
return region.startUpdate(row);
}
protected void put(final Text regionName, final long lockid,
final Text column, final byte [] val)
throws IOException {
HRegion region = getRegion(regionName, true);
region.put(lockid, column, val);
}
protected void delete(Text regionName, long lockid, Text column)
throws IOException {
HRegion region = getRegion(regionName);
region.delete(lockid, column);
}
/** {@inheritDoc} */
public void deleteAll(final Text regionName, final Text row,
final Text column, final long timestamp)
@ -1260,13 +1208,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
region.deleteAll(row, column, timestamp);
}
protected void commit(Text regionName, final long lockid,
final long timestamp) throws IOException {
HRegion region = getRegion(regionName, true);
region.commit(lockid, timestamp);
}
/**
* @return Info on this server.
*/
@ -1379,6 +1320,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/
protected List<HRegion> getRegionsToCheck() {
ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
//TODO: is this locking necessary?
lock.readLock().lock();
try {
regionsToCheck.addAll(this.onlineRegions.values());

File diff suppressed because it is too large Load Diff

View File

@ -937,6 +937,7 @@ public class HTable implements HConstants {
this.closed = true;
}
/** {@inheritDoc} */
public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
return new Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>>() {
HStoreKey key = null;

View File

@ -34,7 +34,11 @@ public class BatchOperation implements Writable {
* Operation types.
* @see org.apache.hadoop.io.SequenceFile.Writer
*/
public static enum Operation {PUT, DELETE}
public static enum Operation {
/** update a field */
PUT,
/** delete a field */
DELETE}
private Operation op;
private Text column;
@ -65,7 +69,8 @@ public class BatchOperation implements Writable {
/**
* Creates a put operation
*
*
* @param operation the operation (put or get)
* @param column column name
* @param value column value
*/

View File

@ -102,9 +102,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
meta.getLog().closeAndDelete();
} catch (Exception e) {
if(dfsCluster != null) {
dfsCluster.shutdown();
}
StaticTestEnvironment.shutdownDfs(dfsCluster);
throw e;
}
}
@ -115,16 +113,7 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
@Override
public void tearDown() throws Exception {
super.tearDown();
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (this.fs != null) {
try {
this.fs.close();
} catch (IOException e) {
LOG.info("During tear down got a " + e.getMessage());
}
}
StaticTestEnvironment.shutdownDfs(dfsCluster);
}
private HRegion createAregion(Text startKey, Text endKey, int firstRow,
@ -134,20 +123,21 @@ public abstract class AbstractMergeTestBase extends HBaseTestCase {
System.out.println("created region " + region.getRegionName());
HRegionIncommon r = new HRegionIncommon(region);
for(int i = firstRow; i < firstRow + nrows; i++) {
long lockid = region.startUpdate(new Text("row_"
long lockid = r.startUpdate(new Text("row_"
+ String.format("%1$05d", i)));
region.put(lockid, COLUMN_NAME, value.get());
region.commit(lockid, System.currentTimeMillis());
r.put(lockid, COLUMN_NAME, value.get());
r.commit(lockid, System.currentTimeMillis());
if(i % 10000 == 0) {
System.out.println("Flushing write #" + i);
region.flushcache(false);
r.flushcache();
}
}
System.out.println("Rolling log...");
region.log.rollWriter();
region.compactStores();
region.compactIfNeeded();
region.close();
region.getLog().closeAndDelete();
region.getRegionInfo().setOffline(true);

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.Random;
import junit.framework.TestCase;
@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.io.Text;
/**
@ -219,19 +222,36 @@ public abstract class HBaseTestCase extends TestCase {
&& endKey.compareTo(t) <= 0) {
break EXIT;
}
long lockid = updater.startBatchUpdate(t);
try {
updater.put(lockid, new Text(column), bytes);
if (ts == -1) {
updater.commit(lockid);
} else {
updater.commit(lockid, ts);
}
lockid = -1;
} finally {
if (lockid != -1) {
updater.abort(lockid);
long lockid = updater.startBatchUpdate(t);
try {
updater.put(lockid, new Text(column), bytes);
if (ts == -1) {
updater.commit(lockid);
} else {
updater.commit(lockid, ts);
}
lockid = -1;
} catch (RuntimeException ex) {
ex.printStackTrace();
throw ex;
} catch (IOException ex) {
ex.printStackTrace();
throw ex;
} finally {
if (lockid != -1) {
updater.abort(lockid);
}
}
} catch (RuntimeException ex) {
ex.printStackTrace();
throw ex;
} catch (IOException ex) {
ex.printStackTrace();
throw ex;
}
}
// Set start character back to FIRST_CHAR after we've done first loop.
@ -275,32 +295,56 @@ public abstract class HBaseTestCase extends TestCase {
/**
* A class that makes a {@link Incommon} out of a {@link HRegion}
*/
public static class HRegionIncommon implements Incommon {
public static class HRegionIncommon implements Incommon, FlushCache {
final HRegion region;
private final Random rand = new Random();
private BatchUpdate batch;
private void checkBatch() {
if (batch == null) {
throw new IllegalStateException("No update in progress");
}
}
public HRegionIncommon(final HRegion HRegion) {
super();
this.region = HRegion;
this.batch = null;
}
public void abort(long lockid) throws IOException {
this.region.abort(lockid);
this.batch = null;
}
public void commit(long lockid) throws IOException {
this.region.commit(lockid);
commit(lockid, HConstants.LATEST_TIMESTAMP);
}
public void commit(long lockid, final long ts) throws IOException {
this.region.commit(lockid, ts);
checkBatch();
try {
this.region.batchUpdate(ts, batch);
} finally {
this.batch = null;
}
}
public void put(long lockid, Text column, byte[] val) throws IOException {
this.region.put(lockid, column, val);
checkBatch();
this.batch.put(lockid, column, val);
}
public void delete(long lockid, Text column) throws IOException {
this.region.delete(lockid, column);
checkBatch();
this.batch.delete(lockid, column);
}
public void deleteAll(Text row, Text column, long ts) throws IOException {
this.region.deleteAll(row, column, ts);
}
public long startBatchUpdate(Text row) throws IOException {
return this.region.startUpdate(row);
return startUpdate(row);
}
public long startUpdate(Text row) throws IOException {
if (this.batch != null) {
throw new IllegalStateException("Update already in progress");
}
long lockid = Math.abs(rand.nextLong());
this.batch = new BatchUpdate(lockid);
return batch.startUpdate(row);
}
public HScannerInterface getScanner(Text [] columns, Text firstRow,
long ts)
@ -317,6 +361,12 @@ public abstract class HBaseTestCase extends TestCase {
throws IOException {
return this.region.get(row, column, ts, versions);
}
public Map<Text, byte []> getFull(Text row) throws IOException {
return region.getFull(row);
}
public void flushcache() throws IOException {
this.region.internalFlushcache(this.region.snapshotMemcaches());
}
}
/**

View File

@ -117,9 +117,14 @@ public class MiniHBaseCluster implements HConstants {
this.deleteOnExit = deleteOnExit;
this.shutdownDFS = false;
if (miniHdfsFilesystem) {
this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
this.fs = cluster.getFileSystem();
this.shutdownDFS = true;
try {
this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
this.fs = cluster.getFileSystem();
this.shutdownDFS = true;
} catch (IOException e) {
StaticTestEnvironment.shutdownDfs(cluster);
throw e;
}
} else {
this.cluster = null;
this.fs = FileSystem.get(conf);
@ -224,26 +229,13 @@ public class MiniHBaseCluster implements HConstants {
*/
public void shutdown() {
this.hbaseCluster.shutdown();
try {
if (shutdownDFS && cluster != null) {
FileSystem fs = cluster.getFileSystem();
if (fs != null) {
LOG.info("Shutting down FileSystem");
fs.close();
}
if (this.cluster != null) {
LOG.info("Shutting down Mini DFS ");
cluster.shutdown();
}
}
} catch (IOException e) {
LOG.error("shutdown", e);
} finally {
// Delete all DFS files
if(deleteOnExit) {
deleteFile(new File(System.getProperty(
StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
}
if (shutdownDFS) {
StaticTestEnvironment.shutdownDfs(cluster);
}
// Delete all DFS files
if(deleteOnExit) {
deleteFile(new File(System.getProperty(
StaticTestEnvironment.TEST_DIRECTORY_KEY), "dfs"));
}
}
@ -265,7 +257,7 @@ public class MiniHBaseCluster implements HConstants {
for (LocalHBaseCluster.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().onlineRegions.values() ) {
r.flushcache(false);
r.internalFlushcache(r.snapshotMemcaches());
}
}
}

View File

@ -53,11 +53,9 @@ public class MultiRegionTable extends HBaseTestCase {
@SuppressWarnings("null")
public static void makeMultiRegionTable(HBaseConfiguration conf,
MiniHBaseCluster cluster, FileSystem localFs, String tableName,
String columnName)
throws IOException {
String columnName) throws IOException {
final int retries = 10;
final long waitTime =
conf.getLong("hbase.master.meta.thread.rescanfrequency", 10L * 1000L);
final long waitTime = 20L * 1000L;
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M. Should
@ -106,7 +104,7 @@ public class MultiRegionTable extends HBaseTestCase {
}
// Flush will provoke a split next time the split-checker thread runs.
r.flushcache(false);
r.internalFlushcache(r.snapshotMemcaches());
// Now, wait until split makes it into the meta table.
int oldCount = count;
@ -156,15 +154,19 @@ public class MultiRegionTable extends HBaseTestCase {
// Wait till the parent only has reference to remaining split, one that
// still has references.
while (getSplitParentInfo(meta, parent).size() == 3) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// continue
while (true) {
data = getSplitParentInfo(meta, parent);
if (data == null || data.size() == 3) {
try {
Thread.sleep(waitTime);
} catch (InterruptedException e) {
// continue
}
continue;
}
break;
}
LOG.info("Parent split returned " +
getSplitParentInfo(meta, parent).keySet().toString());
LOG.info("Parent split returned " + data.keySet().toString());
// Call second split.

View File

@ -20,8 +20,11 @@
package org.apache.hadoop.hbase;
import java.io.File;
import java.io.IOException;
import java.util.Enumeration;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
@ -33,6 +36,9 @@ import org.apache.log4j.PatternLayout;
* Initializes test environment
*/
public class StaticTestEnvironment {
private static final Logger LOG =
Logger.getLogger(StaticTestEnvironment.class.getPackage().getName());
private StaticTestEnvironment() {} // Not instantiable
/** configuration parameter name for test directory */
@ -105,7 +111,28 @@ public class StaticTestEnvironment {
consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
}
}
Logger.getLogger(
HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
LOG.setLevel(logLevel);
}
/**
* Common method to close down a MiniDFSCluster and the associated file system
*
* @param cluster
*/
public static void shutdownDfs(MiniDFSCluster cluster) {
if (cluster != null) {
try {
FileSystem fs = cluster.getFileSystem();
if (fs != null) {
LOG.info("Shutting down FileSystem");
fs.close();
}
} catch (IOException e) {
LOG.error("error closing file system", e);
}
LOG.info("Shutting down Mini DFS ");
cluster.shutdown();
}
}
}

View File

@ -43,7 +43,11 @@ public class TestCompaction extends HBaseTestCase {
/** constructor */
public TestCompaction() {
super();
STARTROW = new Text(START_KEY);
// Set cache flush size to 1MB
conf.setInt("hbase.hregion.memcache.flush.size", 1024*1024);
}
/** {@inheritDoc} */
@ -71,11 +75,10 @@ public class TestCompaction extends HBaseTestCase {
*/
public void testCompaction() throws Exception {
createStoreFile(r);
assertFalse(r.needsCompaction());
assertFalse(r.compactIfNeeded());
for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
createStoreFile(r);
}
assertTrue(r.needsCompaction());
// Add more content. Now there are about 5 versions of each column.
// Default is that there only 3 (MAXVERSIONS) versions allowed per column.
// Assert > 3 and then after compaction, assert that only 3 versions
@ -91,7 +94,7 @@ public class TestCompaction extends HBaseTestCase {
@Override
public void run() {
try {
region.flushcache(false);
region.flushcache();
} catch (IOException e) {
e.printStackTrace();
}
@ -101,7 +104,7 @@ public class TestCompaction extends HBaseTestCase {
@Override
public void run() {
try {
assertTrue(region.compactStores());
assertTrue(region.compactIfNeeded());
} catch (IOException e) {
e.printStackTrace();
}
@ -140,16 +143,15 @@ public class TestCompaction extends HBaseTestCase {
// verify that it is removed as we compact.
// Assert all delted.
assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
this.r.flushcache(false);
this.r.flushcache();
assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
// Add a bit of data and flush it so we for sure have the compaction limit
// for store files. Usually by this time we will have but if compaction
// included the flush that ran 'concurrently', there may be just the
// compacted store and the flush above when we added deletes. Add more
// content to be certain.
createBunchOfSmallStoreFiles(this.r);
assertTrue(this.r.needsCompaction());
this.r.compactStores();
createSmallerStoreFile(this.r);
assertTrue(this.r.compactIfNeeded());
// Assert that the first row is still deleted.
bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
assertNull(bytes);
@ -167,21 +169,14 @@ public class TestCompaction extends HBaseTestCase {
private void createStoreFile(final HRegion region) throws IOException {
HRegionIncommon loader = new HRegionIncommon(region);
for (int i = 0; i < 1; i++) {
addContent(loader, COLUMN_FAMILY);
}
region.flushcache(false);
addContent(loader, COLUMN_FAMILY);
loader.flushcache();
}
private void createBunchOfSmallStoreFiles(final HRegion region)
throws IOException {
final String xyz = new String("xyz");
byte [] bytes = xyz.getBytes();
for (int i = 0; i < 1; i++) {
long lid = region.startUpdate(new Text(xyz));
region.put(lid, COLUMN_FAMILY_TEXT, bytes);
region.commit(lid);
region.flushcache(false);
}
private void createSmallerStoreFile(final HRegion region) throws IOException {
HRegionIncommon loader = new HRegionIncommon(region);
addContent(loader, COLUMN_FAMILY,
("bbb" + PUNCTUATION).getBytes(), null);
loader.flushcache();
}
}

View File

@ -23,7 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -44,7 +44,7 @@ public class TestGet extends HBaseTestCase {
private static final String SERVER_ADDRESS = "foo.bar.com:1234";
private void verifyGet(final HRegion r, final String expectedServer)
private void verifyGet(final HRegionIncommon r, final String expectedServer)
throws IOException {
// This should return a value because there is only one family member
byte [] value = r.get(ROW_KEY, CONTENTS);
@ -55,7 +55,7 @@ public class TestGet extends HBaseTestCase {
assertNull(value);
// Find out what getFull returns
TreeMap<Text, byte []> values = r.getFull(ROW_KEY);
Map<Text, byte []> values = r.getFull(ROW_KEY);
// assertEquals(4, values.keySet().size());
for(Iterator<Text> i = values.keySet().iterator(); i.hasNext(); ) {
@ -95,7 +95,8 @@ public class TestGet extends HBaseTestCase {
HLog log = new HLog(fs, new Path(regionDir, "log"), conf);
HRegion r = new HRegion(dir, log, fs, conf, info, null);
HRegion region = new HRegion(dir, log, fs, conf, info, null);
HRegionIncommon r = new HRegionIncommon(region);
// Write information to the table
@ -132,9 +133,10 @@ public class TestGet extends HBaseTestCase {
// Close and re-open region, forcing updates to disk
r.close();
region.close();
log.rollWriter();
r = new HRegion(dir, log, fs, conf, info, null);
region = new HRegion(dir, log, fs, conf, info, null);
r = new HRegionIncommon(region);
// Read it back
@ -160,9 +162,10 @@ public class TestGet extends HBaseTestCase {
// Close region and re-open it
r.close();
region.close();
log.rollWriter();
r = new HRegion(dir, log, fs, conf, info, null);
region = new HRegion(dir, log, fs, conf, info, null);
r = new HRegionIncommon(region);
// Read it back
@ -170,13 +173,11 @@ public class TestGet extends HBaseTestCase {
// Close region once and for all
r.close();
region.close();
log.closeAndDelete();
} finally {
if(cluster != null) {
cluster.shutdown();
}
StaticTestEnvironment.shutdownDfs(cluster);
}
}
}

View File

@ -49,13 +49,13 @@ public class TestHLog extends HBaseTestCase implements HConstants {
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
TreeMap<Text, byte []> cols = new TreeMap<Text, byte []>();
long timestamp = System.currentTimeMillis();
TreeMap<HStoreKey, byte []> cols = new TreeMap<HStoreKey, byte []>();
for (int i = 0; i < COL_COUNT; i++) {
cols.put(new Text(Integer.toString(i)),
cols.put(new HStoreKey(row, new Text(Integer.toString(i)), timestamp),
new byte[] { (byte)(i + '0') });
}
long timestamp = System.currentTimeMillis();
log.append(regionName, tableName, row, cols, timestamp);
log.append(regionName, tableName, cols);
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();

View File

@ -22,23 +22,16 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HMemcache.Snapshot;
import org.apache.hadoop.io.Text;
/** memcache test case */
public class TestHMemcache extends TestCase {
private HMemcache hmemcache;
private Configuration conf;
private HStore.Memcache hmemcache;
private static final int ROW_COUNT = 3;
@ -50,10 +43,7 @@ public class TestHMemcache extends TestCase {
@Override
public void setUp() throws Exception {
super.setUp();
this.hmemcache = new HMemcache();
// Set up a configuration that has configuration for a file
// filesystem implementation.
this.conf = new HBaseConfiguration();
this.hmemcache = new HStore.Memcache();
}
private Text getRowName(final int index) {
@ -69,48 +59,26 @@ public class TestHMemcache extends TestCase {
* Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
* @param hmc Instance to add rows to.
*/
private void addRows(final HMemcache hmc) throws UnsupportedEncodingException {
private void addRows(final HStore.Memcache hmc)
throws UnsupportedEncodingException {
for (int i = 0; i < ROW_COUNT; i++) {
TreeMap<Text, byte []> columns = new TreeMap<Text, byte []>();
long timestamp = System.currentTimeMillis();
for (int ii = 0; ii < COLUMNS_COUNT; ii++) {
Text k = getColumnName(i, ii);
columns.put(k, k.toString().getBytes(HConstants.UTF8_ENCODING));
hmc.add(new HStoreKey(getRowName(i), k, timestamp),
k.toString().getBytes(HConstants.UTF8_ENCODING));
}
hmc.add(getRowName(i), columns, System.currentTimeMillis());
}
}
private HLog getLogfile() throws IOException {
// Create a log file.
Path testDir = new Path(conf.get("hadoop.tmp.dir",
System.getProperty("java.tmp.dir")), "hbase");
Path logFile = new Path(testDir, this.getName());
FileSystem fs = testDir.getFileSystem(conf);
// Cleanup any old log file.
if (fs.exists(logFile)) {
fs.delete(logFile);
}
return new HLog(fs, logFile, this.conf);
}
private Snapshot runSnapshot(final HMemcache hmc, final HLog log)
throws IOException {
private void runSnapshot(final HStore.Memcache hmc) {
// Save off old state.
int oldHistorySize = hmc.history.size();
SortedMap<HStoreKey, byte []> oldMemcache = hmc.memcache;
// Run snapshot.
Snapshot s = hmc.snapshotMemcacheForLog(log);
int oldHistorySize = hmc.snapshot.size();
hmc.snapshot();
// Make some assertions about what just happened.
assertEquals("Snapshot equals old memcache", hmc.snapshot,
oldMemcache);
assertEquals("Returned snapshot holds old memcache",
s.memcacheSnapshot, oldMemcache);
assertEquals("History has been incremented",
oldHistorySize + 1, hmc.history.size());
assertEquals("History holds old snapshot",
hmc.history.get(oldHistorySize), oldMemcache);
return s;
assertTrue("History size has not increased",
oldHistorySize < hmc.snapshot.size());
}
/**
@ -119,21 +87,14 @@ public class TestHMemcache extends TestCase {
*/
public void testSnapshotting() throws IOException {
final int snapshotCount = 5;
final Text tableName = new Text(getName());
HLog log = getLogfile();
// Add some rows, run a snapshot. Do it a few times.
for (int i = 0; i < snapshotCount; i++) {
addRows(this.hmemcache);
int historyInitialSize = this.hmemcache.history.size();
Snapshot s = runSnapshot(this.hmemcache, log);
log.completeCacheFlush(new Text(Integer.toString(i)),
tableName, s.sequenceId);
// Clean up snapshot now we are done with it.
this.hmemcache.deleteSnapshot();
assertTrue("History not being cleared",
historyInitialSize == this.hmemcache.history.size());
runSnapshot(this.hmemcache);
this.hmemcache.getSnapshot();
assertEquals("History not being cleared", 0,
this.hmemcache.snapshot.size());
}
log.closeAndDelete();
}
private void isExpectedRow(final int rowIndex, TreeMap<Text, byte []> row)
@ -161,7 +122,8 @@ public class TestHMemcache extends TestCase {
addRows(this.hmemcache);
for (int i = 0; i < ROW_COUNT; i++) {
HStoreKey hsk = new HStoreKey(getRowName(i));
TreeMap<Text, byte []> all = this.hmemcache.getFull(hsk);
TreeMap<Text, byte []> all = new TreeMap<Text, byte[]>();
this.hmemcache.getFull(hsk, all);
isExpectedRow(i, all);
}
}

View File

@ -61,9 +61,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
read();
cleanup();
} finally {
if(cluster != null) {
cluster.shutdown();
}
StaticTestEnvironment.shutdownDfs(cluster);
}
}
@ -78,14 +76,15 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
private static final Text CONTENTS_BODY = new Text("contents:body");
private static final Text CONTENTS_FIRSTCOL = new Text("contents:firstcol");
private static final Text ANCHOR_SECONDCOL = new Text("anchor:secondcol");
private MiniDFSCluster cluster = null;
private FileSystem fs = null;
private Path parentdir = null;
private Path newlogdir = null;
private HLog log = null;
private HTableDescriptor desc = null;
HRegion region = null;
HRegion r = null;
HRegionIncommon region = null;
private static int numInserted = 0;
@ -103,8 +102,9 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
desc = new HTableDescriptor("test");
desc.addFamily(new HColumnDescriptor("contents:"));
desc.addFamily(new HColumnDescriptor("anchor:"));
region = new HRegion(parentdir, log, fs, conf,
r = new HRegion(parentdir, log, fs, conf,
new HRegionInfo(desc, null, null), null);
region = new HRegionIncommon(r);
}
// Test basic functionality. Writes to contents:basic and anchor:anchornum-*
@ -129,7 +129,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
region.flushcache(false);
region.flushcache();
System.out.println("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -169,7 +169,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
try {
region.put(-1, CONTENTS_BASIC,
"bad input".getBytes(HConstants.UTF8_ENCODING));
} catch (LockException e) {
} catch (Exception e) {
exceptionThrown = true;
}
assertTrue("Bad lock id", exceptionThrown);
@ -182,6 +182,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
String unregisteredColName = "FamilyGroup:FamilyLabel";
region.put(lockid, new Text(unregisteredColName),
unregisteredColName.getBytes(HConstants.UTF8_ENCODING));
region.commit(lockid);
} catch (IOException e) {
exceptionThrown = true;
} finally {
@ -209,8 +210,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
for (int i = 0; i < lockCount; i++) {
try {
Text rowid = new Text(Integer.toString(i));
lockids[i] = region.obtainRowLock(rowid);
rowid.equals(region.getRowFromLock(lockids[i]));
lockids[i] = r.obtainRowLock(rowid);
rowid.equals(r.getRowFromLock(lockids[i]));
LOG.debug(getName() + " locked " + rowid.toString());
} catch (IOException e) {
e.printStackTrace();
@ -221,13 +222,8 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
// Abort outstanding locks.
for (int i = lockCount - 1; i >= 0; i--) {
try {
region.abort(lockids[i]);
LOG.debug(getName() + " unlocked " +
Integer.toString(i));
} catch (IOException e) {
e.printStackTrace();
}
r.releaseRowLock(r.getRowFromLock(lockids[i]));
LOG.debug(getName() + " unlocked " + i);
}
LOG.debug(getName() + " released " +
Integer.toString(lockCount) + " locks");
@ -288,7 +284,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
HInternalScannerInterface s =
region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
int numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -326,7 +322,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
region.flushcache(false);
region.flushcache();
System.out.println("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -335,7 +331,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -390,7 +386,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -428,7 +424,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
region.flushcache(false);
region.flushcache();
System.out.println("Cache flush elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
@ -437,7 +433,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
numFetched = 0;
try {
HStoreKey curKey = new HStoreKey();
@ -473,7 +469,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text("row_vals1_500"),
s = r.getScanner(cols, new Text("row_vals1_500"),
System.currentTimeMillis(), null);
numFetched = 0;
@ -542,7 +538,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
System.out.println("Flushing write #" + k);
long flushStart = System.currentTimeMillis();
region.flushcache(false);
region.flushcache();
long flushEnd = System.currentTimeMillis();
totalFlush += (flushEnd - flushStart);
@ -557,7 +553,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
}
}
long startCompact = System.currentTimeMillis();
if(region.compactStores()) {
if(r.compactIfNeeded()) {
totalCompact = System.currentTimeMillis() - startCompact;
System.out.println("Region compacted - elapsedTime: " + (totalCompact / 1000.0));
@ -583,43 +579,28 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
// NOTE: This test depends on testBatchWrite succeeding
private void splitAndMerge() throws IOException {
Text midKey = new Text();
if(region.needsSplit(midKey)) {
System.out.println("Needs split");
}
// Split it anyway
Text midkey = new Text("row_"
+ (StaticTestEnvironment.debugging ? (N_ROWS / 2) : (NUM_VALS/2)));
Path oldRegionPath = region.getRegionDir();
Path oldRegionPath = r.getRegionDir();
long startTime = System.currentTimeMillis();
HRegion subregions[] = r.splitRegion(this);
if (subregions != null) {
System.out.println("Split region elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
HRegion subregions[] = region.closeAndSplit(midkey, this);
assertEquals("Number of subregions", subregions.length, 2);
System.out.println("Split region elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
// Now merge it back together
assertEquals("Number of subregions", subregions.length, 2);
// Now merge it back together
Path oldRegion1 = subregions[0].getRegionDir();
Path oldRegion2 = subregions[1].getRegionDir();
startTime = System.currentTimeMillis();
region = HRegion.closeAndMerge(subregions[0], subregions[1]);
System.out.println("Merge regions elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
fs.delete(oldRegionPath);
fs.delete(oldRegion1);
fs.delete(oldRegion2);
Path oldRegion1 = subregions[0].getRegionDir();
Path oldRegion2 = subregions[1].getRegionDir();
startTime = System.currentTimeMillis();
r = HRegion.closeAndMerge(subregions[0], subregions[1]);
region = new HRegionIncommon(r);
System.out.println("Merge regions elapsed time: "
+ ((System.currentTimeMillis() - startTime) / 1000.0));
fs.delete(oldRegion1);
fs.delete(oldRegion2);
fs.delete(oldRegionPath);
}
}
/**
@ -650,7 +631,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
long startTime = System.currentTimeMillis();
HInternalScannerInterface s =
region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
try {
@ -706,7 +687,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
try {
int numFetched = 0;
HStoreKey curKey = new HStoreKey();
@ -744,7 +725,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
if(StaticTestEnvironment.debugging) {
startTime = System.currentTimeMillis();
s = region.getScanner(new Text[] { CONTENTS_BODY }, new Text(),
s = r.getScanner(new Text[] { CONTENTS_BODY }, new Text(),
System.currentTimeMillis(), null);
try {
@ -782,7 +763,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
startTime = System.currentTimeMillis();
s = region.getScanner(cols, new Text(), System.currentTimeMillis(), null);
s = r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
try {
int fetched = 0;
@ -817,22 +798,11 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
}
private void cleanup() {
// Shut down the mini cluster
try {
log.closeAndDelete();
} catch (IOException e) {
e.printStackTrace();
}
if (cluster != null) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
cluster.shutdown();
cluster = null;
}
// Delete all the DFS files

View File

@ -45,9 +45,13 @@ public class TestHStoreFile extends HBaseTestCase {
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null);
this.fs = cluster.getFileSystem();
this.dir = new Path(DIR, getName());
try {
this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null);
this.fs = cluster.getFileSystem();
this.dir = new Path(DIR, getName());
} catch (IOException e) {
StaticTestEnvironment.shutdownDfs(cluster);
}
super.setUp();
}
@ -55,13 +59,7 @@ public class TestHStoreFile extends HBaseTestCase {
@Override
public void tearDown() throws Exception {
super.tearDown();
if (this.cluster != null) {
try {
this.cluster.shutdown();
} catch (Exception e) {
LOG.warn("Closing down mini DFS", e);
}
}
StaticTestEnvironment.shutdownDfs(cluster);
// ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
// "Temporary end-of-test thread dump debugging HADOOP-2040: " + getName());
}

View File

@ -22,7 +22,6 @@ package org.apache.hadoop.hbase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@ -90,6 +89,7 @@ public class TestLogRolling extends HBaseTestCase {
super.setUp();
dfs = new MiniDFSCluster(conf, 2, true, (String[]) null);
} catch (Exception e) {
StaticTestEnvironment.shutdownDfs(dfs);
LOG.fatal("error during setUp: ", e);
throw e;
}
@ -100,21 +100,10 @@ public class TestLogRolling extends HBaseTestCase {
public void tearDown() throws Exception {
try {
super.tearDown();
if (cluster != null) { // shutdown mini HBase cluster
cluster.shutdown();
}
if (dfs != null) {
FileSystem fs = dfs.getFileSystem();
try {
dfs.shutdown();
} finally {
if (fs != null) {
fs.close();
}
}
}
StaticTestEnvironment.shutdownDfs(dfs);
} catch (Exception e) {
LOG.fatal("error in tearDown", e);
throw e;

View File

@ -36,6 +36,10 @@ public class TestMasterAdmin extends HBaseClusterTestCase {
public TestMasterAdmin() {
super(true);
admin = null;
// Make the thread wake frequency a little slower so other threads
// can run
conf.setInt("hbase.server.thread.wakefrequency", 2000);
}
/** @throws Exception */

View File

@ -50,7 +50,8 @@ public class TestScanner extends HBaseTestCase {
private static final long START_CODE = Long.MAX_VALUE;
private HRegion region;
private HRegion r;
private HRegionIncommon region;
/** Compare the HRegionInfo we read from HBase to what we stored */
private void validateRegionInfo(byte [] regionBytes) throws IOException {
@ -79,7 +80,7 @@ public class TestScanner extends HBaseTestCase {
for(int i = 0; i < scanColumns.length; i++) {
try {
scanner = region.getScanner(scanColumns[i], FIRST_ROW,
scanner = r.getScanner(scanColumns[i], FIRST_ROW,
System.currentTimeMillis(), null);
while(scanner.next(key, results)) {
@ -145,7 +146,8 @@ public class TestScanner extends HBaseTestCase {
HLog log = new HLog(fs, new Path(regionDir, "log"), conf);
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
region = new HRegionIncommon(r);
// Write information to the meta table
@ -165,9 +167,10 @@ public class TestScanner extends HBaseTestCase {
// Close and re-open
region.close();
r.close();
log.rollWriter();
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
region = new HRegionIncommon(r);
// Verify we can get the data back now that it is on disk.
@ -196,7 +199,7 @@ public class TestScanner extends HBaseTestCase {
// flush cache
region.flushcache(false);
region.flushcache();
// Validate again
@ -205,9 +208,10 @@ public class TestScanner extends HBaseTestCase {
// Close and reopen
region.close();
r.close();
log.rollWriter();
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
region = new HRegionIncommon(r);
// Validate again
@ -232,7 +236,7 @@ public class TestScanner extends HBaseTestCase {
// flush cache
region.flushcache(false);
region.flushcache();
// Validate again
@ -241,9 +245,10 @@ public class TestScanner extends HBaseTestCase {
// Close and reopen
region.close();
r.close();
log.rollWriter();
region = new HRegion(dir, log, fs, conf, REGION_INFO, null);
r = new HRegion(dir, log, fs, conf, REGION_INFO, null);
region = new HRegionIncommon(r);
// Validate again
@ -252,13 +257,11 @@ public class TestScanner extends HBaseTestCase {
// clean up
region.close();
r.close();
log.closeAndDelete();
} finally {
if(cluster != null) {
cluster.shutdown();
}
StaticTestEnvironment.shutdownDfs(cluster);
}
}
}

View File

@ -71,6 +71,9 @@ public class TestScanner2 extends HBaseClusterTestCase {
}
}
/**
* @throws Exception
*/
public void testStopRow() throws Exception {
Text tableName = new Text(getName());
createTable(new HBaseAdmin(this.conf), tableName);
@ -86,6 +89,9 @@ public class TestScanner2 extends HBaseClusterTestCase {
}
}
/**
* @throws Exception
*/
public void testIterator() throws Exception {
HTable table = new HTable(this.conf, HConstants.ROOT_TABLE_NAME);
HScannerInterface scanner =
@ -139,11 +145,12 @@ public class TestScanner2 extends HBaseClusterTestCase {
int count = 0;
while (scanner.next(key, results)) {
for (Text c: results.keySet()) {
System.out.println(c);
assertTrue(c.toString().matches(regexColumnname));
count++;
}
}
assertTrue(count == 1);
assertEquals(1, count);
scanner.close();
}
@ -267,7 +274,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
Text tableName = new Text(getName());
admin.createTable(new HTableDescriptor(tableName.toString()));
List<HRegionInfo> regions = scan(metaTable);
assertEquals("Expected one region", regions.size(), 1);
assertEquals("Expected one region", 1, regions.size());
HRegionInfo region = regions.get(0);
assertTrue("Expected region named for test",
region.getRegionName().toString().startsWith(getName()));
@ -365,7 +372,7 @@ public class TestScanner2 extends HBaseClusterTestCase {
// Assert added.
byte [] bytes = t.get(region.getRegionName(), HConstants.COL_REGIONINFO);
HRegionInfo hri = Writables.getHRegionInfo(bytes);
assertEquals(hri.getRegionId(), region.getRegionId());
assertEquals(region.getRegionId(), hri.getRegionId());
if (LOG.isDebugEnabled()) {
LOG.info("Added region " + region.getRegionName() + " to table " +
t.getTableName());

View File

@ -39,6 +39,9 @@ public class TestSplit extends MultiRegionTable {
/** constructor */
public TestSplit() {
super();
// Always compact if there is more than one store file.
conf.setInt("hbase.hstore.compactionThreshold", 2);
// Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000);
@ -47,20 +50,15 @@ public class TestSplit extends MultiRegionTable {
// Increase the amount of time between client retries
conf.setLong("hbase.client.pause", 15 * 1000);
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 1024 * 128);
Logger.getRootLogger().setLevel(Level.WARN);
Logger.getLogger(this.getClass().getPackage().getName()).
setLevel(Level.DEBUG);
}
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
super.setUp();
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 1024 * 128);
}
/**
* Splits twice and verifies getting from each of the split regions.
* @throws Exception
@ -83,7 +81,7 @@ public class TestSplit extends MultiRegionTable {
private void basicSplit(final HRegion region) throws Exception {
addContent(region, COLFAMILY_NAME3);
region.internalFlushcache();
region.internalFlushcache(region.snapshotMemcaches());
Text midkey = new Text();
assertTrue(region.needsSplit(midkey));
HRegion [] regions = split(region);
@ -110,7 +108,12 @@ public class TestSplit extends MultiRegionTable {
}
addContent(regions[i], COLFAMILY_NAME2);
addContent(regions[i], COLFAMILY_NAME1);
regions[i].internalFlushcache();
long startTime = region.snapshotMemcaches();
if (startTime == -1) {
LOG.info("cache flush not needed");
} else {
regions[i].internalFlushcache(startTime);
}
}
// Assert that even if one store file is larger than a reference, the
@ -126,7 +129,7 @@ public class TestSplit extends MultiRegionTable {
// To make regions splitable force compaction.
for (int i = 0; i < regions.length; i++) {
assertTrue(regions[i].compactStores());
regions[i].compactStores();
}
TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
@ -156,6 +159,12 @@ public class TestSplit extends MultiRegionTable {
* @throws Exception
*/
public void testSplitRegionIsDeleted() throws Exception {
// Make sure the cache gets flushed so we trigger a compaction(s) and
// hence splits. This is done here rather than in the constructor because
// the first test runs without a cluster, and will block when the cache
// fills up.
conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024);
try {
// Start up a hbase cluster
MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1, true);
@ -228,7 +237,7 @@ public class TestSplit extends MultiRegionTable {
assertTrue(r.needsSplit(midKey));
// Assert can get mid key from passed region.
assertGet(r, COLFAMILY_NAME3, midKey);
HRegion [] regions = r.closeAndSplit(midKey, null);
HRegion [] regions = r.splitRegion(null);
assertEquals(regions.length, 2);
return regions;
}

View File

@ -38,11 +38,11 @@ public class TestTimestamp extends HBaseTestCase {
private static final String COLUMN_NAME = "contents:";
private static final Text COLUMN = new Text(COLUMN_NAME);
private static final Text[] COLUMNS = {COLUMN};
private static final Text ROW = new Text("row");
// When creating column descriptor, how many versions of a cell to allow.
private static final int VERSIONS = 3;
/**
* Test that delete works according to description in <a
@ -52,11 +52,8 @@ public class TestTimestamp extends HBaseTestCase {
public void testDelete() throws IOException {
final HRegion r = createRegion();
try {
doTestDelete(new HRegionIncommon(r), new FlushCache() {
public void flushcache() throws IOException {
r.flushcache(false);
}
});
final HRegionIncommon region = new HRegionIncommon(r);
doTestDelete(region, region);
} finally {
r.close();
r.getLog().closeAndDelete();
@ -70,11 +67,8 @@ public class TestTimestamp extends HBaseTestCase {
public void testTimestampScanning() throws IOException {
final HRegion r = createRegion();
try {
doTestTimestampScanning(new HRegionIncommon(r), new FlushCache() {
public void flushcache() throws IOException {
r.flushcache(false);
}
});
final HRegionIncommon region = new HRegionIncommon(r);
doTestTimestampScanning(region, region);
} finally {
r.close();
r.getLog().closeAndDelete();
@ -187,7 +181,7 @@ public class TestTimestamp extends HBaseTestCase {
// Now assert that if we ask for multiple versions, that they come out in
// order.
byte [][] bytesBytes = incommon.get(ROW, COLUMN, tss.length);
assertEquals(bytesBytes.length, tss.length);
assertEquals(tss.length, bytesBytes.length);
for (int i = 0; i < bytesBytes.length; i++) {
long ts = Writables.bytesToLong(bytesBytes[i]);
assertEquals(ts, tss[i]);

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseAdmin;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HScannerInterface;
@ -44,6 +43,7 @@ import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.hbase.StaticTestEnvironment;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@ -58,7 +58,7 @@ import org.apache.lucene.search.TermQuery;
/**
* Test Map/Reduce job to build index over HBase table
*/
public class TestTableIndex extends HBaseTestCase {
public class TestTableIndex extends MultiRegionTable {
private static final Log LOG = LogFactory.getLog(TestTableIndex.class);
static final String TABLE_NAME = "moretest";
@ -76,13 +76,21 @@ public class TestTableIndex extends HBaseTestCase {
private Path dir;
private MiniHBaseCluster hCluster = null;
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
super.setUp();
// Make sure the cache gets flushed so we trigger a compaction(s) and
// hence splits.
conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024);
// Always compact if there is more than one store file.
conf.setInt("hbase.hstore.compactionThreshold", 2);
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
conf.setLong("hbase.hregion.max.filesize", 128 * 1024);
desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
@ -103,22 +111,19 @@ public class TestTableIndex extends HBaseTestCase {
admin.createTable(desc);
// Populate a table into multiple regions
MultiRegionTable.makeMultiRegionTable(conf, hCluster, null, TABLE_NAME,
INPUT_COLUMN);
makeMultiRegionTable(conf, hCluster, null, TABLE_NAME, INPUT_COLUMN);
// Verify table indeed has multiple regions
HTable table = new HTable(conf, new Text(TABLE_NAME));
Text[] startKeys = table.getStartKeys();
assertTrue(startKeys.length > 1);
} catch (Exception e) {
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
StaticTestEnvironment.shutdownDfs(dfsCluster);
throw e;
}
}
/** {@inheritDoc} */
@Override
public void tearDown() throws Exception {
super.tearDown();
@ -127,9 +132,7 @@ public class TestTableIndex extends HBaseTestCase {
hCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
StaticTestEnvironment.shutdownDfs(dfsCluster);
}
/**

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTable;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultiRegionTable;
import org.apache.hadoop.hbase.StaticTestEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@ -86,6 +87,21 @@ public class TestTableMapReduce extends MultiRegionTable {
public TestTableMapReduce() {
super();
// Make the thread wake frequency a little slower so other threads
// can run
conf.setInt("hbase.server.thread.wakefrequency", 2000);
// Make sure the cache gets flushed so we trigger a compaction(s) and
// hence splits.
conf.setInt("hbase.hregion.memcache.flush.size", 1024 * 1024);
// Always compact if there is more than one store file.
conf.setInt("hbase.hstore.compactionThreshold", 2);
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
// Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000);
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
@ -97,9 +113,6 @@ public class TestTableMapReduce extends MultiRegionTable {
@Override
public void setUp() throws Exception {
super.setUp();
// This size is picked so the table is split into two
// after addContent in testMultiRegionTableMapReduce.
conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
try {
fs = dfsCluster.getFileSystem();
@ -109,10 +122,7 @@ public class TestTableMapReduce extends MultiRegionTable {
hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS));
} catch (Exception e) {
if (dfsCluster != null) {
dfsCluster.shutdown();
dfsCluster = null;
}
StaticTestEnvironment.shutdownDfs(dfsCluster);
throw e;
}
}
@ -126,18 +136,7 @@ public class TestTableMapReduce extends MultiRegionTable {
if(hCluster != null) {
hCluster.shutdown();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (fs != null) {
try {
fs.close();
} catch (IOException e) {
LOG.info("During tear down got a " + e.getMessage());
}
}
StaticTestEnvironment.shutdownDfs(dfsCluster);
}
/**
@ -218,49 +217,54 @@ public class TestTableMapReduce extends MultiRegionTable {
// insert some data into the test table
HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME));
for(int i = 0; i < values.length; i++) {
long lockid = table.startUpdate(new Text("row_"
+ String.format("%1$05d", i)));
try {
for(int i = 0; i < values.length; i++) {
long lockid = table.startUpdate(new Text("row_"
+ String.format("%1$05d", i)));
try {
table.put(lockid, TEXT_INPUT_COLUMN, values[i]);
table.commit(lockid, System.currentTimeMillis());
lockid = -1;
} finally {
if (lockid != -1)
table.abort(lockid);
}
}
LOG.info("Print table contents before map/reduce");
scanTable(conf, SINGLE_REGION_TABLE_NAME);
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
try {
table.put(lockid, TEXT_INPUT_COLUMN, values[i]);
table.commit(lockid, System.currentTimeMillis());
lockid = -1;
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumMapTasks(1);
jobConf.setNumReduceTasks(1);
TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN,
ProcessContentsMapper.class, jobConf);
TableReduce.initJob(SINGLE_REGION_TABLE_NAME,
IdentityTableReduce.class, jobConf);
JobClient.runJob(jobConf);
} finally {
if (lockid != -1)
table.abort(lockid);
mrCluster.shutdown();
}
}
LOG.info("Print table contents before map/reduce");
scanTable(conf, SINGLE_REGION_TABLE_NAME);
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
LOG.info("Print table contents after map/reduce");
scanTable(conf, SINGLE_REGION_TABLE_NAME);
try {
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumMapTasks(1);
jobConf.setNumReduceTasks(1);
// verify map-reduce results
verify(conf, SINGLE_REGION_TABLE_NAME);
TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN,
ProcessContentsMapper.class, jobConf);
TableReduce.initJob(SINGLE_REGION_TABLE_NAME,
IdentityTableReduce.class, jobConf);
JobClient.runJob(jobConf);
} finally {
mrCluster.shutdown();
table.close();
}
LOG.info("Print table contents after map/reduce");
scanTable(conf, SINGLE_REGION_TABLE_NAME);
// verify map-reduce results
verify(conf, SINGLE_REGION_TABLE_NAME);
}
/*
@ -277,37 +281,42 @@ public class TestTableMapReduce extends MultiRegionTable {
admin.createTable(desc);
// Populate a table into multiple regions
MultiRegionTable.makeMultiRegionTable(conf, hCluster, fs,
MULTI_REGION_TABLE_NAME, INPUT_COLUMN);
makeMultiRegionTable(conf, hCluster, fs, MULTI_REGION_TABLE_NAME,
INPUT_COLUMN);
// Verify table indeed has multiple regions
HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME));
Text[] startKeys = table.getStartKeys();
assertTrue(startKeys.length > 1);
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
try {
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumMapTasks(2);
jobConf.setNumReduceTasks(1);
Text[] startKeys = table.getStartKeys();
assertTrue(startKeys.length > 1);
TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN,
ProcessContentsMapper.class, jobConf);
@SuppressWarnings("deprecation")
MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
TableReduce.initJob(MULTI_REGION_TABLE_NAME,
IdentityTableReduce.class, jobConf);
try {
JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
jobConf.setJobName("process column contents");
jobConf.setNumMapTasks(2);
jobConf.setNumReduceTasks(1);
JobClient.runJob(jobConf);
TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN,
ProcessContentsMapper.class, jobConf);
TableReduce.initJob(MULTI_REGION_TABLE_NAME,
IdentityTableReduce.class, jobConf);
JobClient.runJob(jobConf);
} finally {
mrCluster.shutdown();
}
// verify map-reduce results
verify(conf, MULTI_REGION_TABLE_NAME);
} finally {
mrCluster.shutdown();
table.close();
}
// verify map-reduce results
verify(conf, MULTI_REGION_TABLE_NAME);
}
private void scanTable(HBaseConfiguration conf, String tableName)