HBASE-1528 Ensure scanners work across memcache snapshot

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@785069 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-06-16 04:27:44 +00:00
parent 24708bb23f
commit 78af4ede60
11 changed files with 181 additions and 71 deletions

View File

@ -192,6 +192,7 @@ Release 0.20.0 - Unreleased
(Evgeny Ryabitskiy via Stack) (Evgeny Ryabitskiy via Stack)
HBASE-1529 familyMap not invalidated when a Result is (re)read as a HBASE-1529 familyMap not invalidated when a Result is (re)read as a
Writable Writable
HBASE-1528 Ensure scanners work across memcache snapshot
IMPROVEMENTS IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -949,8 +949,7 @@ public class KeyValue implements Writable, HeapSize {
byte [] result = new byte[fl + 1 + ql]; byte [] result = new byte[fl + 1 + ql];
System.arraycopy(this.bytes, fo, result, 0, fl); System.arraycopy(this.bytes, fo, result, 0, fl);
result[fl] = COLUMN_FAMILY_DELIMITER; result[fl] = COLUMN_FAMILY_DELIMITER;
System.arraycopy(this.bytes, fo + fl, result, System.arraycopy(this.bytes, fo + fl, result, fl + 1, ql);
fl + 1, ql);
return result; return result;
} }

View File

@ -1857,7 +1857,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
checkOpen(); checkOpen();
List<Result> results = new ArrayList<Result>(); List<Result> results = new ArrayList<Result>();
try { try {
long start = System.currentTimeMillis();
String scannerName = String.valueOf(scannerId); String scannerName = String.valueOf(scannerId);
InternalScanner s = scanners.get(scannerName); InternalScanner s = scanners.get(scannerName);
if (s == null) { if (s == null) {
@ -1882,9 +1881,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
} }
} }
public void close(final long scannerId) throws IOException { public void close(final long scannerId) throws IOException {
try { try {
checkOpen(); checkOpen();

View File

@ -539,17 +539,14 @@ class Memcache {
} }
/** /**
* @return scanner on memcache and snapshot in this order (if snapshot is * @return scanner on memcache and snapshot in this order.
* empty, returns only memcache scanner).
*/ */
KeyValueScanner [] getScanners() { KeyValueScanner [] getScanners() {
this.lock.readLock().lock(); this.lock.readLock().lock();
try { try {
boolean noss = this.snapshot == null || this.snapshot.isEmpty(); KeyValueScanner [] scanners = new KeyValueScanner[2];
KeyValueScanner [] scanners =
new KeyValueScanner[noss? 1: 2];
scanners[0] = new MemcacheScanner(this.memcache); scanners[0] = new MemcacheScanner(this.memcache);
if (!noss) scanners[1] = new MemcacheScanner(this.snapshot); scanners[1] = new MemcacheScanner(this.snapshot);
return scanners; return scanners;
} finally { } finally {
this.lock.readLock().unlock(); this.lock.readLock().unlock();

View File

@ -70,10 +70,10 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
} }
// Constructor for testing. // Constructor for testing.
StoreScanner(Scan scan, byte [] colFamily, StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
long ttl, KeyValue.KVComparator comparator, final KeyValue.KVComparator comparator,
final NavigableSet<byte[]> columns, final NavigableSet<byte[]> columns,
KeyValueScanner [] scanners) { final KeyValueScanner [] scanners) {
this.store = null; this.store = null;
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
comparator.getRawComparator(), scan.getMaxVersions()); comparator.getRawComparator(), scan.getMaxVersions());
@ -82,9 +82,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
for(KeyValueScanner scanner : scanners) { for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey()); scanner.seek(matcher.getStartKey());
} }
heap = new KeyValueHeap(scanners, comparator);
heap = new KeyValueHeap(
scanners, comparator);
} }
/* /*
@ -134,8 +132,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
matcher.setRow(peeked.getRow()); matcher.setRow(peeked.getRow());
KeyValue kv; KeyValue kv;
while((kv = this.heap.peek()) != null) { while((kv = this.heap.peek()) != null) {
QueryMatcher.MatchCode mc = matcher.match(kv); QueryMatcher.MatchCode qcode = matcher.match(kv);
switch(mc) { switch(qcode) {
case INCLUDE: case INCLUDE:
KeyValue next = this.heap.next(); KeyValue next = this.heap.next();
result.add(next); result.add(next);
@ -175,6 +173,9 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
case SKIP: case SKIP:
this.heap.next(); this.heap.next();
break; break;
default:
throw new RuntimeException("UNEXPECTED");
} }
} }
if(result.size() > 0) { if(result.size() > 0) {
@ -205,7 +206,6 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
if (this.closing.get()) return; if (this.closing.get()) return;
KeyValue topKey = this.peek(); KeyValue topKey = this.peek();
if (topKey == null) return; if (topKey == null) return;
List<KeyValueScanner> scanners = getScanners(); List<KeyValueScanner> scanners = getScanners();
// Seek all scanners to the initial key // Seek all scanners to the initial key

View File

@ -225,6 +225,7 @@ public abstract class HBaseTestCase extends TestCase {
startKeyBytes = START_KEY_BYTES; startKeyBytes = START_KEY_BYTES;
} }
return addContent(new HRegionIncommon(r), Bytes.toString(column), return addContent(new HRegionIncommon(r), Bytes.toString(column),
null,
startKeyBytes, endKey, -1); startKeyBytes, endKey, -1);
} }
@ -237,11 +238,16 @@ public abstract class HBaseTestCase extends TestCase {
* @throws IOException * @throws IOException
* @return count of what we added. * @return count of what we added.
*/ */
protected static long addContent(final Incommon updater, final String column) protected static long addContent(final Incommon updater,
throws IOException { final String column) throws IOException {
return addContent(updater, column, START_KEY_BYTES, null); return addContent(updater, column, START_KEY_BYTES, null);
} }
protected static long addContent(final Incommon updater, final String family,
final String column) throws IOException {
return addContent(updater, family, column, START_KEY_BYTES, null);
}
/** /**
* Add content to region <code>r</code> on the passed column * Add content to region <code>r</code> on the passed column
* <code>column</code>. * <code>column</code>.
@ -256,7 +262,13 @@ public abstract class HBaseTestCase extends TestCase {
protected static long addContent(final Incommon updater, final String column, protected static long addContent(final Incommon updater, final String column,
final byte [] startKeyBytes, final byte [] endKey) final byte [] startKeyBytes, final byte [] endKey)
throws IOException { throws IOException {
return addContent(updater, column, startKeyBytes, endKey, -1); return addContent(updater, column, null, startKeyBytes, endKey, -1);
}
protected static long addContent(final Incommon updater, final String family,
final String column, final byte [] startKeyBytes,
final byte [] endKey) throws IOException {
return addContent(updater, family, column, startKeyBytes, endKey, -1);
} }
/** /**
@ -271,7 +283,8 @@ public abstract class HBaseTestCase extends TestCase {
* @return count of what we added. * @return count of what we added.
* @throws IOException * @throws IOException
*/ */
protected static long addContent(final Incommon updater, final String column, protected static long addContent(final Incommon updater, final String columnFamily,
final String column,
final byte [] startKeyBytes, final byte [] endKey, final long ts) final byte [] startKeyBytes, final byte [] endKey, final long ts)
throws IOException { throws IOException {
long count = 0; long count = 0;

View File

@ -56,7 +56,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
Bytes.toBytes("yyy") Bytes.toBytes("yyy")
}; };
protected final byte [] columnName; protected final byte [] columnFamily;
protected HTableDescriptor desc; protected HTableDescriptor desc;
/** /**
@ -64,7 +64,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
*/ */
public MultiRegionTable(final String columnName) { public MultiRegionTable(final String columnName) {
super(); super();
this.columnName = Bytes.toBytes(columnName); this.columnFamily = Bytes.toBytes(columnName);
// These are needed for the new and improved Map/Reduce framework // These are needed for the new and improved Map/Reduce framework
System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir")); System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir"));
conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir")); conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir"));
@ -101,7 +101,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
private HRegion createARegion(byte [] startKey, byte [] endKey) throws IOException { private HRegion createARegion(byte [] startKey, byte [] endKey) throws IOException {
HRegion region = createNewHRegion(desc, startKey, endKey); HRegion region = createNewHRegion(desc, startKey, endKey);
addContent(region, this.columnName); addContent(region, this.columnFamily);
closeRegionAndDeleteLog(region); closeRegionAndDeleteLog(region);
return region; return region;
} }

View File

@ -1,3 +1,23 @@
/*
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import java.io.IOException; import java.io.IOException;

View File

@ -27,32 +27,26 @@ import java.util.NavigableSet;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import junit.framework.TestCase;
/** memcache test case */ /** memcache test case */
public class TestMemcache extends TestCase { public class TestMemcache extends TestCase {
private final Log LOG = LogFactory.getLog(this.getClass());
private Memcache memcache; private Memcache memcache;
private static final int ROW_COUNT = 10; private static final int ROW_COUNT = 10;
private static final int QUALIFIER_COUNT = 10; private static final int QUALIFIER_COUNT = 10;
private static final byte [] FAMILY = Bytes.toBytes("column"); private static final byte [] FAMILY = Bytes.toBytes("column");
private static final int FIRST_ROW = 1;
private static final int NUM_VALS = 1000;
private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic"); private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic");
private static final String CONTENTSTR = "contentstr"; private static final String CONTENTSTR = "contentstr";
private static final String ANCHORNUM = "anchor:anchornum-";
private static final String ANCHORSTR = "anchorstr";
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
@ -60,6 +54,65 @@ public class TestMemcache extends TestCase {
this.memcache = new Memcache(); this.memcache = new Memcache();
} }
/**
* Test memcache snapshot happening while scanning.
* @throws IOException
*/
public void testScanAcrossSnapshot() throws IOException {
int rowCount = addRows(this.memcache);
KeyValueScanner [] memcachescanners = this.memcache.getScanners();
Scan scan = new Scan();
List<KeyValue> result = new ArrayList<KeyValue>();
StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memcache.comparator, null, memcachescanners);
int count = 0;
try {
while (s.next(result)) {
LOG.info(result);
count++;
result.clear();
}
} finally {
s.close();
}
assertEquals(rowCount, count);
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memcache.comparator, null, memcachescanners);
count = 0;
try {
while (s.next(result)) {
LOG.info(result);
// Assert the stuff is coming out in right order.
assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0);
count++;
if (count == 2) {
this.memcache.snapshot();
LOG.info("Snapshotted");
}
result.clear();
}
} finally {
s.close();
}
assertEquals(rowCount, count);
}
/**
* Test memcache snapshots
* @throws IOException
*/
public void testSnapshotting() throws IOException {
final int snapshotCount = 5;
// Add some rows, run a snapshot. Do it a few times.
for (int i = 0; i < snapshotCount; i++) {
addRows(this.memcache);
runSnapshot(this.memcache);
Set<KeyValue> ss = this.memcache.getSnapshot();
assertEquals("History not being cleared", 0, ss.size());
}
}
public void testMultipleVersionsSimple() throws Exception { public void testMultipleVersionsSimple() throws Exception {
Memcache m = new Memcache(HConstants.FOREVER, KeyValue.COMPARATOR); Memcache m = new Memcache(HConstants.FOREVER, KeyValue.COMPARATOR);
byte [] row = Bytes.toBytes("testRow"); byte [] row = Bytes.toBytes("testRow");
@ -112,23 +165,6 @@ public class TestMemcache extends TestCase {
} }
} }
/**
* Test memcache snapshots
* @throws IOException
*/
public void testSnapshotting() throws IOException {
final int snapshotCount = 5;
// Add some rows, run a snapshot. Do it a few times.
for (int i = 0; i < snapshotCount; i++) {
addRows(this.memcache);
runSnapshot(this.memcache);
Set<KeyValue> ss = this.memcache.getSnapshot();
assertEquals("History not being cleared", 0, ss.size());
}
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// Get tests // Get tests
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -550,9 +586,10 @@ public class TestMemcache extends TestCase {
/** /**
* Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
* @param hmc Instance to add rows to. * @param hmc Instance to add rows to.
* @return How many rows we added.
* @throws IOException * @throws IOException
*/ */
private void addRows(final Memcache hmc) { private int addRows(final Memcache hmc) {
for (int i = 0; i < ROW_COUNT; i++) { for (int i = 0; i < ROW_COUNT; i++) {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
for (int ii = 0; ii < QUALIFIER_COUNT; ii++) { for (int ii = 0; ii < QUALIFIER_COUNT; ii++) {
@ -561,6 +598,7 @@ public class TestMemcache extends TestCase {
hmc.add(new KeyValue(row, FAMILY, qf, timestamp, qf)); hmc.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
} }
} }
return ROW_COUNT;
} }
private void runSnapshot(final Memcache hmc) throws UnexpectedException { private void runSnapshot(final Memcache hmc) throws UnexpectedException {

View File

@ -361,20 +361,19 @@ public class TestScanner extends HBaseTestCase {
} }
/** /**
* Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
* update readers code essentially. This is not highly concurrent, since its all 1 thread.
* HBase-910. * HBase-910.
* @throws Exception * @throws Exception
*/ */
public void testScanAndConcurrentFlush() throws Exception { public void testScanAndSyncFlush() throws Exception {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null); this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
HRegionIncommon hri = new HRegionIncommon(r); HRegionIncommon hri = new HRegionIncommon(r);
try { try {
String columnString = Bytes.toString(HConstants.CATALOG_FAMILY) + ':' + LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
Bytes.toString(HConstants.REGIONINFO_QUALIFIER); Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
LOG.info("Added: " + addContent(hri, columnString)); int count = count(hri, -1, false);
int count = count(hri, -1); assertEquals(count, count(hri, 100, false)); // do a sync flush.
assertEquals(count, count(hri, 100));
assertEquals(count, count(hri, 0));
assertEquals(count, count(hri, count - 1));
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed", e); LOG.error("Failed", e);
throw e; throw e;
@ -385,25 +384,72 @@ public class TestScanner extends HBaseTestCase {
} }
} }
/**
* Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both
* the StoreScanner update readers and the transition from memcache -> snapshot -> store file.
*
* @throws Exception
*/
public void testScanAndRealConcurrentFlush() throws Exception {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
HRegionIncommon hri = new HRegionIncommon(r);
try {
LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
int count = count(hri, -1, false);
assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
} catch (Exception e) {
LOG.error("Failed", e);
throw e;
} finally {
this.r.close();
this.r.getLog().closeAndDelete();
shutdownDfs(cluster);
}
}
/* /*
* @param hri Region * @param hri Region
* @param flushIndex At what row we start the flush. * @param flushIndex At what row we start the flush.
* @param concurrent if the flush should be concurrent or sync.
* @return Count of rows found. * @return Count of rows found.
* @throws IOException * @throws IOException
*/ */
private int count(final HRegionIncommon hri, final int flushIndex) private int count(final HRegionIncommon hri, final int flushIndex,
boolean concurrent)
throws IOException { throws IOException {
LOG.info("Taking out counting scan"); LOG.info("Taking out counting scan");
ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS, ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP); HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
List<KeyValue> values = new ArrayList<KeyValue>(); List<KeyValue> values = new ArrayList<KeyValue>();
int count = 0; int count = 0;
boolean justFlushed = false;
while (s.next(values)) { while (s.next(values)) {
if (justFlushed) {
LOG.info("after next() just after next flush");
justFlushed=false;
}
count++; count++;
if (flushIndex == count) { if (flushIndex == count) {
LOG.info("Starting flush at flush index " + flushIndex); LOG.info("Starting flush at flush index " + flushIndex);
Thread t = new Thread() {
public void run() {
try {
hri.flushcache(); hri.flushcache();
LOG.info("Finishing flush"); LOG.info("Finishing flush");
} catch (IOException e) {
LOG.info("Failed flush cache");
}
}
};
if (concurrent) {
t.start(); // concurrently flush.
} else {
t.run(); // sync flush
}
LOG.info("Continuing on after kicking off background flush");
justFlushed = true;
} }
} }
s.close(); s.close();