diff --git a/CHANGES.txt b/CHANGES.txt index c85ba78019f..be4364d20ce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -192,6 +192,7 @@ Release 0.20.0 - Unreleased (Evgeny Ryabitskiy via Stack) HBASE-1529 familyMap not invalidated when a Result is (re)read as a Writable + HBASE-1528 Ensure scanners work across memcache snapshot IMPROVEMENTS HBASE-1089 Add count of regions on filesystem to master UI; add percentage diff --git a/src/java/org/apache/hadoop/hbase/KeyValue.java b/src/java/org/apache/hadoop/hbase/KeyValue.java index 9e10d1b372b..74ded099a75 100644 --- a/src/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/java/org/apache/hadoop/hbase/KeyValue.java @@ -949,8 +949,7 @@ public class KeyValue implements Writable, HeapSize { byte [] result = new byte[fl + 1 + ql]; System.arraycopy(this.bytes, fo, result, 0, fl); result[fl] = COLUMN_FAMILY_DELIMITER; - System.arraycopy(this.bytes, fo + fl, result, - fl + 1, ql); + System.arraycopy(this.bytes, fo + fl, result, fl + 1, ql); return result; } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d13adf61988..e861d383004 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -159,7 +159,7 @@ public class HRegion implements HConstants { // , Writable{ volatile boolean writesEnabled = true; // Set if region is read-only volatile boolean readOnly = false; - + /** * Set flags that make this region read-only. */ diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4c9d0768167..dd682af856e 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1857,7 +1857,6 @@ public class HRegionServer implements HConstants, HRegionInterface, checkOpen(); List results = new ArrayList(); try { - long start = System.currentTimeMillis(); String scannerName = String.valueOf(scannerId); InternalScanner s = scanners.get(scannerName); if (s == null) { @@ -1881,10 +1880,7 @@ public class HRegionServer implements HConstants, HRegionInterface, throw convertThrowableToIOE(cleanup(t)); } } - - - - + public void close(final long scannerId) throws IOException { try { checkOpen(); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java index 98456079162..f16ae337e0a 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java @@ -539,17 +539,14 @@ class Memcache { } /** - * @return scanner on memcache and snapshot in this order (if snapshot is - * empty, returns only memcache scanner). + * @return scanner on memcache and snapshot in this order. */ KeyValueScanner [] getScanners() { this.lock.readLock().lock(); try { - boolean noss = this.snapshot == null || this.snapshot.isEmpty(); - KeyValueScanner [] scanners = - new KeyValueScanner[noss? 1: 2]; + KeyValueScanner [] scanners = new KeyValueScanner[2]; scanners[0] = new MemcacheScanner(this.memcache); - if (!noss) scanners[1] = new MemcacheScanner(this.snapshot); + scanners[1] = new MemcacheScanner(this.snapshot); return scanners; } finally { this.lock.readLock().unlock(); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 6ac255773c0..857b663a06f 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -70,10 +70,10 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb } // Constructor for testing. - StoreScanner(Scan scan, byte [] colFamily, - long ttl, KeyValue.KVComparator comparator, + StoreScanner(final Scan scan, final byte [] colFamily, final long ttl, + final KeyValue.KVComparator comparator, final NavigableSet columns, - KeyValueScanner [] scanners) { + final KeyValueScanner [] scanners) { this.store = null; this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl, comparator.getRawComparator(), scan.getMaxVersions()); @@ -82,9 +82,7 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb for(KeyValueScanner scanner : scanners) { scanner.seek(matcher.getStartKey()); } - - heap = new KeyValueHeap( - scanners, comparator); + heap = new KeyValueHeap(scanners, comparator); } /* @@ -134,8 +132,8 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb matcher.setRow(peeked.getRow()); KeyValue kv; while((kv = this.heap.peek()) != null) { - QueryMatcher.MatchCode mc = matcher.match(kv); - switch(mc) { + QueryMatcher.MatchCode qcode = matcher.match(kv); + switch(qcode) { case INCLUDE: KeyValue next = this.heap.next(); result.add(next); @@ -175,6 +173,9 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb case SKIP: this.heap.next(); break; + + default: + throw new RuntimeException("UNEXPECTED"); } } if(result.size() > 0) { @@ -205,7 +206,6 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb if (this.closing.get()) return; KeyValue topKey = this.peek(); if (topKey == null) return; - List scanners = getScanners(); // Seek all scanners to the initial key diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index 5baf7013078..f07d80b5223 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -225,6 +225,7 @@ public abstract class HBaseTestCase extends TestCase { startKeyBytes = START_KEY_BYTES; } return addContent(new HRegionIncommon(r), Bytes.toString(column), + null, startKeyBytes, endKey, -1); } @@ -237,11 +238,16 @@ public abstract class HBaseTestCase extends TestCase { * @throws IOException * @return count of what we added. */ - protected static long addContent(final Incommon updater, final String column) - throws IOException { + protected static long addContent(final Incommon updater, + final String column) throws IOException { return addContent(updater, column, START_KEY_BYTES, null); } + protected static long addContent(final Incommon updater, final String family, + final String column) throws IOException { + return addContent(updater, family, column, START_KEY_BYTES, null); + } + /** * Add content to region r on the passed column * column. @@ -256,7 +262,13 @@ public abstract class HBaseTestCase extends TestCase { protected static long addContent(final Incommon updater, final String column, final byte [] startKeyBytes, final byte [] endKey) throws IOException { - return addContent(updater, column, startKeyBytes, endKey, -1); + return addContent(updater, column, null, startKeyBytes, endKey, -1); + } + + protected static long addContent(final Incommon updater, final String family, + final String column, final byte [] startKeyBytes, + final byte [] endKey) throws IOException { + return addContent(updater, family, column, startKeyBytes, endKey, -1); } /** @@ -271,7 +283,8 @@ public abstract class HBaseTestCase extends TestCase { * @return count of what we added. * @throws IOException */ - protected static long addContent(final Incommon updater, final String column, + protected static long addContent(final Incommon updater, final String columnFamily, + final String column, final byte [] startKeyBytes, final byte [] endKey, final long ts) throws IOException { long count = 0; diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java index 17d48dca011..804e535d9e2 100644 --- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java +++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java @@ -56,7 +56,7 @@ public class MultiRegionTable extends HBaseClusterTestCase { Bytes.toBytes("yyy") }; - protected final byte [] columnName; + protected final byte [] columnFamily; protected HTableDescriptor desc; /** @@ -64,7 +64,7 @@ public class MultiRegionTable extends HBaseClusterTestCase { */ public MultiRegionTable(final String columnName) { super(); - this.columnName = Bytes.toBytes(columnName); + this.columnFamily = Bytes.toBytes(columnName); // These are needed for the new and improved Map/Reduce framework System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir")); conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir")); @@ -101,7 +101,7 @@ public class MultiRegionTable extends HBaseClusterTestCase { private HRegion createARegion(byte [] startKey, byte [] endKey) throws IOException { HRegion region = createNewHRegion(desc, startKey, endKey); - addContent(region, this.columnName); + addContent(region, this.columnFamily); closeRegionAndDeleteLog(region); return region; } diff --git a/src/test/org/apache/hadoop/hbase/client/TestClient.java b/src/test/org/apache/hadoop/hbase/client/TestClient.java index 388b4508ba9..871af1a0d92 100644 --- a/src/test/org/apache/hadoop/hbase/client/TestClient.java +++ b/src/test/org/apache/hadoop/hbase/client/TestClient.java @@ -1,3 +1,23 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.client; import java.io.IOException; diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java b/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java index 04ed019b571..33f97d842dc 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestMemcache.java @@ -27,32 +27,26 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.util.Bytes; -import junit.framework.TestCase; - /** memcache test case */ public class TestMemcache extends TestCase { - + private final Log LOG = LogFactory.getLog(this.getClass()); private Memcache memcache; - private static final int ROW_COUNT = 10; - private static final int QUALIFIER_COUNT = 10; - private static final byte [] FAMILY = Bytes.toBytes("column"); - - private static final int FIRST_ROW = 1; - private static final int NUM_VALS = 1000; private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic"); private static final String CONTENTSTR = "contentstr"; - private static final String ANCHORNUM = "anchor:anchornum-"; - private static final String ANCHORSTR = "anchorstr"; @Override public void setUp() throws Exception { @@ -60,6 +54,65 @@ public class TestMemcache extends TestCase { this.memcache = new Memcache(); } + /** + * Test memcache snapshot happening while scanning. + * @throws IOException + */ + public void testScanAcrossSnapshot() throws IOException { + int rowCount = addRows(this.memcache); + KeyValueScanner [] memcachescanners = this.memcache.getScanners(); + Scan scan = new Scan(); + List result = new ArrayList(); + StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, + this.memcache.comparator, null, memcachescanners); + int count = 0; + try { + while (s.next(result)) { + LOG.info(result); + count++; + result.clear(); + } + } finally { + s.close(); + } + assertEquals(rowCount, count); + // Now assert can count same number even if a snapshot mid-scan. + s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, + this.memcache.comparator, null, memcachescanners); + count = 0; + try { + while (s.next(result)) { + LOG.info(result); + // Assert the stuff is coming out in right order. + assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0); + count++; + if (count == 2) { + this.memcache.snapshot(); + LOG.info("Snapshotted"); + } + result.clear(); + } + } finally { + s.close(); + } + assertEquals(rowCount, count); + } + + /** + * Test memcache snapshots + * @throws IOException + */ + public void testSnapshotting() throws IOException { + final int snapshotCount = 5; + // Add some rows, run a snapshot. Do it a few times. + for (int i = 0; i < snapshotCount; i++) { + addRows(this.memcache); + runSnapshot(this.memcache); + Set ss = this.memcache.getSnapshot(); + assertEquals("History not being cleared", 0, ss.size()); + } + } + public void testMultipleVersionsSimple() throws Exception { Memcache m = new Memcache(HConstants.FOREVER, KeyValue.COMPARATOR); byte [] row = Bytes.toBytes("testRow"); @@ -112,23 +165,6 @@ public class TestMemcache extends TestCase { } } - - /** - * Test memcache snapshots - * @throws IOException - */ - public void testSnapshotting() throws IOException { - final int snapshotCount = 5; - // Add some rows, run a snapshot. Do it a few times. - for (int i = 0; i < snapshotCount; i++) { - addRows(this.memcache); - runSnapshot(this.memcache); - Set ss = this.memcache.getSnapshot(); - assertEquals("History not being cleared", 0, ss.size()); - } - } - - ////////////////////////////////////////////////////////////////////////////// // Get tests ////////////////////////////////////////////////////////////////////////////// @@ -550,9 +586,10 @@ public class TestMemcache extends TestCase { /** * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} * @param hmc Instance to add rows to. + * @return How many rows we added. * @throws IOException */ - private void addRows(final Memcache hmc) { + private int addRows(final Memcache hmc) { for (int i = 0; i < ROW_COUNT; i++) { long timestamp = System.currentTimeMillis(); for (int ii = 0; ii < QUALIFIER_COUNT; ii++) { @@ -561,6 +598,7 @@ public class TestMemcache extends TestCase { hmc.add(new KeyValue(row, FAMILY, qf, timestamp, qf)); } } + return ROW_COUNT; } private void runSnapshot(final Memcache hmc) throws UnexpectedException { diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java index baf1772b31c..d2404226961 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestScanner.java @@ -361,20 +361,19 @@ public class TestScanner extends HBaseTestCase { } /** + * Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner + * update readers code essentially. This is not highly concurrent, since its all 1 thread. * HBase-910. * @throws Exception */ - public void testScanAndConcurrentFlush() throws Exception { + public void testScanAndSyncFlush() throws Exception { this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null); HRegionIncommon hri = new HRegionIncommon(r); try { - String columnString = Bytes.toString(HConstants.CATALOG_FAMILY) + ':' + - Bytes.toString(HConstants.REGIONINFO_QUALIFIER); - LOG.info("Added: " + addContent(hri, columnString)); - int count = count(hri, -1); - assertEquals(count, count(hri, 100)); - assertEquals(count, count(hri, 0)); - assertEquals(count, count(hri, count - 1)); + LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), + Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); + int count = count(hri, -1, false); + assertEquals(count, count(hri, 100, false)); // do a sync flush. } catch (Exception e) { LOG.error("Failed", e); throw e; @@ -384,26 +383,73 @@ public class TestScanner extends HBaseTestCase { shutdownDfs(cluster); } } + + /** + * Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both + * the StoreScanner update readers and the transition from memcache -> snapshot -> store file. + * + * @throws Exception + */ + public void testScanAndRealConcurrentFlush() throws Exception { + this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null); + HRegionIncommon hri = new HRegionIncommon(r); + try { + LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY), + Bytes.toString(HConstants.REGIONINFO_QUALIFIER))); + int count = count(hri, -1, false); + assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush + } catch (Exception e) { + LOG.error("Failed", e); + throw e; + } finally { + this.r.close(); + this.r.getLog().closeAndDelete(); + shutdownDfs(cluster); + } + } + /* * @param hri Region * @param flushIndex At what row we start the flush. + * @param concurrent if the flush should be concurrent or sync. * @return Count of rows found. * @throws IOException */ - private int count(final HRegionIncommon hri, final int flushIndex) + private int count(final HRegionIncommon hri, final int flushIndex, + boolean concurrent) throws IOException { LOG.info("Taking out counting scan"); ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS, HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP); List values = new ArrayList(); int count = 0; + boolean justFlushed = false; while (s.next(values)) { + if (justFlushed) { + LOG.info("after next() just after next flush"); + justFlushed=false; + } count++; if (flushIndex == count) { LOG.info("Starting flush at flush index " + flushIndex); - hri.flushcache(); - LOG.info("Finishing flush"); + Thread t = new Thread() { + public void run() { + try { + hri.flushcache(); + LOG.info("Finishing flush"); + } catch (IOException e) { + LOG.info("Failed flush cache"); + } + } + }; + if (concurrent) { + t.start(); // concurrently flush. + } else { + t.run(); // sync flush + } + LOG.info("Continuing on after kicking off background flush"); + justFlushed = true; } } s.close();