HBASE-613 Timestamp-anchored scanning fails to find all records

HBASE-681 NPE in Memcache

HAbstractScanner
- remove HAbstactScanner.iterator() - iterator is not a method on InternalScanner

HRegion
- make getScanner more efficient by iterating only once to find the stores we need to scan
- only pass columns relevant to a store to a HStoreScanner
- remove HScanner.iterator() - iterator is not a method on InternalScanner

Memcache, MemcacheScanner
- Fix NPE in Memcache
- never return HConstants.LATEST_TIMESTAMP as the timestamp value for a row. Instead use the largest timestamp from the cells being returned. This allows a scanner to determine a timestamp that can be used to fetch the same data again should new versions be inserted later.

StoreFileScanner
- getNextViableRow would find a row that matched the row key, but did not consider the requested timestamp. Now if the row it finds has a timestamp greater than the one desired it advances to determine if a row with a timestamp less than or equal to the requested one exists since timestamps are sorted descending.
- removed an unnecessary else

testScanMultipleVersions
- Test program that fails on current trunk but passes when this patch is applied.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@670124 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2008-06-21 02:52:35 +00:00
parent d5b1dfe30c
commit 80fa33698f
11 changed files with 275 additions and 90 deletions

View File

@ -62,6 +62,9 @@ Hbase Change Log
(Rong-En Fan via Stack)
HBASE-699 Fix TestMigrate up on Hudson
HBASE-615 Region balancer oscillates during cluster startup
HBASE-613 Timestamp-anchored scanning fails to find all records
HBASE-681 NPE in Memcache
IMPROVEMENTS
HBASE-559 MR example job to count table rows

View File

@ -21,11 +21,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.Vector;
import java.util.Map.Entry;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@ -183,12 +181,9 @@ public abstract class HAbstractScanner implements InternalScanner {
return this.multipleMatchers;
}
/** {@inheritDoc} */
public abstract boolean next(HStoreKey key,
SortedMap<byte [], byte []> results)
throws IOException;
public Iterator<Entry<HStoreKey, SortedMap<byte [], byte[]>>> iterator() {
throw new UnsupportedOperationException("Unimplemented serverside. " +
"next(HStoreKey, StortedMap(...) is more efficient");
}
}

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -31,7 +31,6 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -64,7 +63,6 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
@ -1268,20 +1266,15 @@ public class HRegion implements HConstants {
if (this.closed.get()) {
throw new IOException("Region " + this + " closed");
}
TreeSet<byte []> families = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
HashSet<HStore> storeSet = new HashSet<HStore>();
for (int i = 0; i < cols.length; i++) {
families.add(HStoreKey.getFamily(cols[i]));
HStore s = stores.get(Bytes.mapKey(HStoreKey.getFamily(cols[i])));
if (s != null) {
storeSet.add(s);
}
List<HStore> storelist = new ArrayList<HStore>();
for (byte [] family: families) {
HStore s = stores.get(Bytes.mapKey(family));
if (s == null) {
continue;
}
storelist.add(s);
}
return new HScanner(cols, firstRow, timestamp,
storelist.toArray(new HStore [storelist.size()]), filter);
storeSet.toArray(new HStore [storeSet.size()]), filter);
} finally {
splitsAndClosesLock.readLock().unlock();
}
@ -1750,15 +1743,26 @@ public class HRegion implements HConstants {
this.scanners = new InternalScanner[stores.length];
try {
for (int i = 0; i < stores.length; i++) {
// TODO: The cols passed in here can include columns from other
// stores; add filter so only pertinent columns are passed.
//
// Also, if more than one store involved, need to replicate filters.
// Only pass relevant columns to each store
List<byte[]> columns = new ArrayList<byte[]>();
for (int j = 0; j < cols.length; j++) {
if (Bytes.equals(HStoreKey.getFamily(cols[j]),
stores[i].getFamily().getName())) {
columns.add(cols[j]);
}
}
RowFilterInterface f = filter;
if (f != null) {
// Need to replicate filters.
// At least WhileMatchRowFilter will mess up the scan if only
// one shared across many rows. See HADOOP-2467.
scanners[i] = stores[i].getScanner(timestamp, cols, firstRow,
filter != null ?
(RowFilterInterface)WritableUtils.clone(filter, conf) : filter);
f = (RowFilterInterface)WritableUtils.clone(filter, conf);
}
scanners[i] = stores[i].getScanner(timestamp,
columns.toArray(new byte[columns.size()][]), firstRow, f);
}
} catch (IOException e) {
for (int i = 0; i < this.scanners.length; i++) {
@ -1928,14 +1932,6 @@ public class HRegion implements HConstants {
}
}
/**
* @return an iterator for the scanner
*/
public Iterator<Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
throw new UnsupportedOperationException("Unimplemented serverside. " +
"next(HStoreKey, StortedMap(...) is more efficient");
}
/** {@inheritDoc} */
public boolean isWildcardScanner() {
throw new UnsupportedOperationException("Unimplemented on HScanner");

View File

@ -584,7 +584,7 @@ class Memcache {
HStoreKey key = es.getKey();
// if there's no column name, then compare rows and timestamps
if (origin.getColumn().length == 0) {
if (origin.getColumn() != null && origin.getColumn().length == 0) {
// if the current and origin row don't match, then we can jump
// out of the loop entirely.
if (!Bytes.equals(key.getRow(), origin.getRow())) {
@ -697,6 +697,7 @@ class Memcache {
if (results.size() > 0) {
results.clear();
}
long latestTimestamp = -1;
while (results.size() <= 0 && this.currentRow != null) {
if (deletes.size() > 0) {
deletes.clear();
@ -723,11 +724,23 @@ class Memcache {
continue;
}
}
// We should never return HConstants.LATEST_TIMESTAMP as the time for
// the row. As a compromise, we return the largest timestamp for the
// entries that we find that match.
if (c.getTimestamp() != HConstants.LATEST_TIMESTAMP &&
c.getTimestamp() > latestTimestamp) {
latestTimestamp = c.getTimestamp();
}
results.put(column, c.getValue());
}
this.currentRow = getNextRow(this.currentRow);
}
// Set the timestamp to the largest one for the row if we would otherwise
// return HConstants.LATEST_TIMESTAMP
if (key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
key.setVersion(latestTimestamp);
}
return results.size() > 0;
}

View File

@ -49,6 +49,13 @@ implements ChangedReadersObserver {
// Used around replacement of Readers if they change while we're scanning.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* @param store
* @param timestamp
* @param targetCols
* @param firstRow
* @throws IOException
*/
public StoreFileScanner(final HStore store, final long timestamp,
final byte [][] targetCols, final byte [] firstRow)
throws IOException {
@ -221,9 +228,21 @@ implements ChangedReadersObserver {
long now = System.currentTimeMillis();
long ttl = store.ttl;
for(int i = 0; i < keys.length; i++) {
// The first key that we find that matches may have a timestamp greater
// than the one we're looking for. We have to advance to see if there
// is an older version present, since timestamps are sorted descending
while (keys[i] != null &&
keys[i].getTimestamp() > this.timestamp &&
columnMatch(i) &&
getNext(i)) {
if (columnMatch(i)) {
break;
}
}
if((keys[i] != null)
&& (columnMatch(i))
&& (keys[i].getTimestamp() <= this.timestamp)
// If we get here and keys[i] is not null, we already know that the
// column matches and the timestamp of the row is less than or equal
// to this.timestamp, so we do not need to test that here
&& ((viableRow == null)
|| (Bytes.compareTo(keys[i].getRow(), viableRow) < 0)
|| ((Bytes.compareTo(keys[i].getRow(), viableRow) == 0)
@ -293,13 +312,12 @@ implements ChangedReadersObserver {
vals[i] = ibw.get();
result = true;
break;
} else {
}
if (LOG.isDebugEnabled()) {
LOG.debug("getNext: " + keys[i] + ": expired, skipped");
}
}
}
}
return result;
}
@ -343,6 +361,8 @@ implements ChangedReadersObserver {
}
// Implementation of ChangedReadersObserver
/** {@inheritDoc} */
public void updateReaders() throws IOException {
this.lock.writeLock().lock();
try {

View File

@ -100,20 +100,13 @@ public abstract class AbstractMergeTestBase extends HBaseClusterTestCase {
// Now create the root and meta regions and insert the data regions
// created above into the meta
HRegion root = HRegion.createHRegion(HRegionInfo.ROOT_REGIONINFO,
testDir, this.conf);
HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
testDir, this.conf);
HRegion.addRegionToMETA(root, meta);
createRootAndMetaRegions();
for(int i = 0; i < regions.length; i++) {
HRegion.addRegionToMETA(meta, regions[i]);
}
root.close();
root.getLog().closeAndDelete();
meta.close();
meta.getLog().closeAndDelete();
closeRootAndMeta();
}
private HRegion createAregion(byte [] startKey, byte [] endKey, int firstRow,

View File

@ -60,6 +60,8 @@ public abstract class HBaseTestCase extends TestCase {
private boolean localfs = false;
protected Path testDir = null;
protected FileSystem fs = null;
protected HRegion root = null;
protected HRegion meta = null;
protected static final char FIRST_CHAR = 'a';
protected static final char LAST_CHAR = 'z';
protected static final String PUNCTUATION = "~`@#$%^&*()-_+=:;',.<>/?[]{}|";
@ -626,4 +628,22 @@ public abstract class HBaseTestCase extends TestCase {
}
}
}
protected void createRootAndMetaRegions() throws IOException {
root = HRegion.createHRegion(HRegionInfo.ROOT_REGIONINFO, testDir, conf);
meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO, testDir,
conf);
HRegion.addRegionToMETA(root, meta);
}
protected void closeRootAndMeta() throws IOException {
if (meta != null) {
meta.close();
meta.getLog().closeAndDelete();
}
if (root != null) {
root.close();
root.getLog().closeAndDelete();
}
}
}

View File

@ -85,16 +85,14 @@ public class MultiRegionTable extends HBaseClusterTestCase {
// Now create the root and meta regions and insert the data regions
// created above into the meta
HRegion root = HRegion.createHRegion(HRegionInfo.ROOT_REGIONINFO,
testDir, this.conf);
HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
testDir, this.conf);
HRegion.addRegionToMETA(root, meta);
createRootAndMetaRegions();
for(int i = 0; i < regions.length; i++) {
HRegion.addRegionToMETA(meta, regions[i]);
}
closeRegionAndDeleteLog(root);
closeRegionAndDeleteLog(meta);
closeRootAndMeta();
} catch (Exception e) {
shutdownDfs(dfsCluster);
throw e;

View File

@ -84,20 +84,13 @@ public class TestRegionRebalancing extends HBaseClusterTestCase {
// Now create the root and meta regions and insert the data regions
// created above into the meta
HRegion root = HRegion.createHRegion(HRegionInfo.ROOT_REGIONINFO,
testDir, conf);
HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
testDir, conf);
HRegion.addRegionToMETA(root, meta);
createRootAndMetaRegions();
for (HRegion region : regions) {
HRegion.addRegionToMETA(meta, region);
}
root.close();
root.getLog().closeAndDelete();
meta.close();
meta.getLog().closeAndDelete();
closeRootAndMeta();
}
/**

View File

@ -0,0 +1,167 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Regression test for HBASE-613
*/
public class TestScanMultipleVersions extends HBaseClusterTestCase {
private final byte[] TABLE_NAME = Bytes.toBytes("TestScanMultipleVersions");
private final HRegionInfo[] INFOS = new HRegionInfo[2];
private final HRegion[] REGIONS = new HRegion[2];
private final byte[][] ROWS = new byte[][] {
Bytes.toBytes("row_0200"),
Bytes.toBytes("row_0800")
};
private final long[] TIMESTAMPS = new long[] {
100L,
1000L
};
private HTableDescriptor desc = null;
@Override
protected void preHBaseClusterSetup() throws Exception {
testDir = new Path(conf.get(HConstants.HBASE_DIR));
// Create table description
this.desc = new HTableDescriptor(TABLE_NAME);
this.desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY));
// Region 0 will contain the key range [,row_0500)
INFOS[0] = new HRegionInfo(this.desc, HConstants.EMPTY_START_ROW,
Bytes.toBytes("row_0500"));
// Region 1 will contain the key range [row_0500,)
INFOS[1] = new HRegionInfo(this.desc, Bytes.toBytes("row_0500"),
HConstants.EMPTY_END_ROW);
// Create root and meta regions
createRootAndMetaRegions();
// Create the regions
for (int i = 0; i < REGIONS.length; i++) {
REGIONS[i] =
HRegion.createHRegion(this.INFOS[i], this.testDir, this.conf);
// Insert data
for (int j = 0; j < TIMESTAMPS.length; j++) {
BatchUpdate b = new BatchUpdate(ROWS[i], TIMESTAMPS[j]);
b.put(HConstants.COLUMN_FAMILY, Bytes.toBytes(TIMESTAMPS[j]));
REGIONS[i].batchUpdate(b);
}
// Insert the region we created into the meta
HRegion.addRegionToMETA(meta, REGIONS[i]);
// Close region
REGIONS[i].close();
REGIONS[i].getLog().closeAndDelete();
}
// Close root and meta regions
closeRootAndMeta();
}
/**
* @throws Exception
*/
public void testScanMultipleVersions() throws Exception {
// At this point we have created multiple regions and both HDFS and HBase
// are running. There are 5 cases we have to test. Each is described below.
HTable t = new HTable(conf, TABLE_NAME);
// Case 1: scan with LATEST_TIMESTAMP. Should get two rows
int count = 0;
Scanner s = t.getScanner(HConstants.COLUMN_FAMILY_ARRAY);
try {
while (s.next() != null) {
count += 1;
}
assertEquals("Number of rows should be 2", 2, count);
} finally {
s.close();
}
// Case 2: Scan with a timestamp greater than most recent timestamp
// (in this case > 1000 and < LATEST_TIMESTAMP. Should get 2 rows.
count = 0;
s = t.getScanner(HConstants.COLUMN_FAMILY_ARRAY, HConstants.EMPTY_START_ROW,
10000L);
try {
while (s.next() != null) {
count += 1;
}
assertEquals("Number of rows should be 2", 2, count);
} finally {
s.close();
}
// Case 3: scan with timestamp equal to most recent timestamp
// (in this case == 1000. Should get 2 rows.
count = 0;
s = t.getScanner(HConstants.COLUMN_FAMILY_ARRAY, HConstants.EMPTY_START_ROW,
1000L);
try {
while (s.next() != null) {
count += 1;
}
assertEquals("Number of rows should be 2", 2, count);
} finally {
s.close();
}
// Case 4: scan with timestamp greater than first timestamp but less than
// second timestamp (100 < timestamp < 1000). Should get 2 rows.
count = 0;
s = t.getScanner(HConstants.COLUMN_FAMILY_ARRAY, HConstants.EMPTY_START_ROW,
500L);
try {
while (s.next() != null) {
count += 1;
}
assertEquals("Number of rows should be 2", 2, count);
} finally {
s.close();
}
// Case 5: scan with timestamp equal to first timestamp (100)
// Should get 2 rows.
count = 0;
s = t.getScanner(HConstants.COLUMN_FAMILY_ARRAY, HConstants.EMPTY_START_ROW,
100L);
try {
while (s.next() != null) {
count += 1;
}
assertEquals("Number of rows should be 2", 2, count);
} finally {
s.close();
}
}
}

View File

@ -47,7 +47,6 @@ public class TestMergeTool extends HBaseTestCase {
private final HRegion[] regions = new HRegion[5];
private HTableDescriptor desc;
private byte [][][] rows;
private Path rootdir = null;
private MiniDFSCluster dfsCluster = null;
private FileSystem fs;
@ -102,9 +101,6 @@ public class TestMergeTool extends HBaseTestCase {
// Start up dfs
this.dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
this.fs = this.dfsCluster.getFileSystem();
// Set the hbase.rootdir to be the home directory in mini dfs.
this.rootdir = new Path(this.fs.getHomeDirectory(), "hbase");
this.conf.set(HConstants.HBASE_DIR, this.rootdir.toString());
// Note: we must call super.setUp after starting the mini cluster or
// we will end up with a local file system
@ -117,7 +113,7 @@ public class TestMergeTool extends HBaseTestCase {
*/
for (int i = 0; i < sourceRegions.length; i++) {
regions[i] =
HRegion.createHRegion(this.sourceRegions[i], this.rootdir, this.conf);
HRegion.createHRegion(this.sourceRegions[i], this.testDir, this.conf);
/*
* Insert data
*/
@ -128,23 +124,14 @@ public class TestMergeTool extends HBaseTestCase {
regions[i].batchUpdate(b);
}
}
// Create root region
HRegion root = HRegion.createHRegion(HRegionInfo.ROOT_REGIONINFO,
this.rootdir, this.conf);
// Create meta region
HRegion meta = HRegion.createHRegion(HRegionInfo.FIRST_META_REGIONINFO,
this.rootdir, this.conf);
// Insert meta into root region
HRegion.addRegionToMETA(root, meta);
// Create root and meta regions
createRootAndMetaRegions();
// Insert the regions we created into the meta
for(int i = 0; i < regions.length; i++) {
HRegion.addRegionToMETA(meta, regions[i]);
}
// Close root and meta regions
root.close();
root.getLog().closeAndDelete();
meta.close();
meta.getLog().closeAndDelete();
closeRootAndMeta();
} catch (Exception e) {
shutdownDfs(dfsCluster);
@ -182,7 +169,7 @@ public class TestMergeTool extends HBaseTestCase {
// Now verify that we can read all the rows from regions 0, 1
// in the new merged region.
HRegion merged =
HRegion.openHRegion(mergedInfo, this.rootdir, log, this.conf);
HRegion.openHRegion(mergedInfo, this.testDir, log, this.conf);
verifyMerge(merged, upperbound);
merged.close();
LOG.info("Verified " + msg);