HADOOP-2467 scanner truncates resultset when > 1 column families
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@605811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
db809b2145
commit
3b36d543de
|
@ -89,6 +89,7 @@ Trunk (unreleased changes)
|
|||
HADOOP-2465 When split parent regions are cleaned up, not all the columns are
|
||||
deleted
|
||||
HADOOP-2468 TestRegionServerExit failed in Hadoop-Nightly #338
|
||||
HADOOP-2467 scanner truncates resultset when > 1 column families
|
||||
|
||||
IMPROVEMENTS
|
||||
HADOOP-2401 Add convenience put method that takes writable
|
||||
|
|
|
@ -592,7 +592,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||
/** {@inheritDoc} */
|
||||
@Override
|
||||
public String toString() {
|
||||
return "regionname: " + this.regionName.toString() + ", startKey: <" +
|
||||
return "{regionname: " + this.regionName.toString() + ", startKey: <" +
|
||||
this.startKey.toString() + ">, server: " + this.server.toString() + "}";
|
||||
}
|
||||
|
||||
|
|
|
@ -1113,6 +1113,7 @@ public class HRegion implements HConstants {
|
|||
continue;
|
||||
}
|
||||
storelist.add(stores.get(family));
|
||||
|
||||
}
|
||||
return new HScanner(cols, firstRow, timestamp,
|
||||
storelist.toArray(new HStore [storelist.size()]), filter);
|
||||
|
@ -1296,7 +1297,6 @@ public class HRegion implements HConstants {
|
|||
|
||||
try {
|
||||
// find the HStore for the column family
|
||||
LOG.info(family);
|
||||
HStore store = stores.get(HStoreKey.extractFamily(family));
|
||||
// find all the keys that match our criteria
|
||||
List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp), ALL_VERSIONS);
|
||||
|
@ -1422,8 +1422,8 @@ public class HRegion implements HConstants {
|
|||
* @throws IOException
|
||||
*/
|
||||
private void checkColumn(Text columnName) throws IOException {
|
||||
Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
|
||||
if(! regionInfo.getTableDesc().hasFamily(family)) {
|
||||
Text family = HStoreKey.extractFamily(columnName, true);
|
||||
if (!regionInfo.getTableDesc().hasFamily(family)) {
|
||||
throw new IOException("Requested column family " + family
|
||||
+ " does not exist in HRegion " + regionInfo.getRegionName()
|
||||
+ " for table " + regionInfo.getTableDesc().getName());
|
||||
|
@ -1529,14 +1529,21 @@ public class HRegion implements HConstants {
|
|||
/** Create an HScanner with a handle on many HStores. */
|
||||
@SuppressWarnings("unchecked")
|
||||
HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
|
||||
RowFilterInterface filter) throws IOException {
|
||||
|
||||
RowFilterInterface filter)
|
||||
throws IOException {
|
||||
this.scanners = new HInternalScannerInterface[stores.length];
|
||||
try {
|
||||
for (int i = 0; i < stores.length; i++) {
|
||||
scanners[i] = stores[i].getScanner(timestamp, cols, firstRow, filter);
|
||||
// TODO: The cols passed in here can include columns from other
|
||||
// stores; add filter so only pertinent columns are passed.
|
||||
//
|
||||
// Also, if more than one store involved, need to replicate filters.
|
||||
// At least WhileMatchRowFilter will mess up the scan if only
|
||||
// one shared across many rows. See HADOOP-2467.
|
||||
scanners[i] = stores[i].getScanner(timestamp, cols, firstRow,
|
||||
(i > 0 && filter != null)?
|
||||
(RowFilterInterface)Writables.clone(filter, conf): filter);
|
||||
}
|
||||
|
||||
} catch(IOException e) {
|
||||
for (int i = 0; i < this.scanners.length; i++) {
|
||||
if(scanners[i] != null) {
|
||||
|
@ -1546,9 +1553,8 @@ public class HRegion implements HConstants {
|
|||
throw e;
|
||||
}
|
||||
|
||||
// Advance to the first key in each store.
|
||||
// All results will match the required column-set and scanTime.
|
||||
|
||||
// Advance to the first key in each store.
|
||||
// All results will match the required column-set and scanTime.
|
||||
this.resultSets = new TreeMap[scanners.length];
|
||||
this.keys = new HStoreKey[scanners.length];
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
|
@ -1616,7 +1622,6 @@ public class HRegion implements HConstants {
|
|||
// row label, then its timestamp is bad. We need to advance it.
|
||||
while ((scanners[i] != null) &&
|
||||
(keys[i].getRow().compareTo(chosenRow) <= 0)) {
|
||||
|
||||
resultSets[i].clear();
|
||||
if (!scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
|
|
|
@ -1848,7 +1848,6 @@ class HStore implements HConstants {
|
|||
this.readers = new MapFile.Reader[storefiles.size()];
|
||||
|
||||
// Most recent map file should be first
|
||||
|
||||
int i = readers.length - 1;
|
||||
for(HStoreFile curHSF: storefiles.values()) {
|
||||
readers[i--] = curHSF.getReader(fs, bloomFilter);
|
||||
|
|
|
@ -815,6 +815,15 @@ public class HStoreFile implements HConstants, WritableComparable {
|
|||
throws IOException {
|
||||
super(fs, dirName, conf);
|
||||
this.bloomFilter = filter;
|
||||
// Force reading of the mapfile index by calling midKey.
|
||||
// Reading the index will bring the index into memory over
|
||||
// here on the client and then close the index file freeing
|
||||
// up socket connection and resources in the datanode.
|
||||
// Usually, the first access on a MapFile.Reader will load the
|
||||
// index force the issue in HStoreFile MapFiles because an
|
||||
// access may not happen for some time; meantime we're
|
||||
// using up datanode resources. See HADOOP-2341.
|
||||
midKey();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
|
|
@ -27,6 +27,8 @@ import java.io.*;
|
|||
* A Key for a stored row
|
||||
*/
|
||||
public class HStoreKey implements WritableComparable {
|
||||
public static final char COLUMN_FAMILY_DELIMITER = ':';
|
||||
|
||||
// TODO: Move these utility methods elsewhere (To a Column class?).
|
||||
/**
|
||||
* Extracts the column family name from a column
|
||||
|
@ -83,7 +85,13 @@ public class HStoreKey implements WritableComparable {
|
|||
|
||||
private static int getColonOffset(final Text col)
|
||||
throws InvalidColumnNameException {
|
||||
int offset = col.find(":");
|
||||
int offset = -1;
|
||||
for (int i = 0; i < col.getLength(); i++) {
|
||||
if (col.charAt(i) == COLUMN_FAMILY_DELIMITER) {
|
||||
offset = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(offset < 0) {
|
||||
throw new InvalidColumnNameException(col + " is missing the colon " +
|
||||
"family/qualifier separator");
|
||||
|
@ -294,23 +302,24 @@ public class HStoreKey implements WritableComparable {
|
|||
|
||||
// Comparable
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public int compareTo(Object o) {
|
||||
HStoreKey other = (HStoreKey) o;
|
||||
int result = this.row.compareTo(other.row);
|
||||
if(result == 0) {
|
||||
result = this.column.compareTo(other.column);
|
||||
if(result == 0) {
|
||||
// The below older timestamps sorting ahead of newer timestamps looks
|
||||
// wrong but it is intentional. This way, newer timestamps are first
|
||||
// found when we iterate over a memcache and newer versions are the
|
||||
// first we trip over when reading from a store file.
|
||||
if(this.timestamp < other.timestamp) {
|
||||
result = 1;
|
||||
} else if(this.timestamp > other.timestamp) {
|
||||
result = -1;
|
||||
}
|
||||
}
|
||||
if (result != 0) {
|
||||
return result;
|
||||
}
|
||||
result = this.column.compareTo(other.column);
|
||||
if (result != 0) {
|
||||
return result;
|
||||
}
|
||||
// The below older timestamps sorting ahead of newer timestamps looks
|
||||
// wrong but it is intentional. This way, newer timestamps are first
|
||||
// found when we iterate over a memcache and newer versions are the
|
||||
// first we trip over when reading from a store file.
|
||||
if (this.timestamp < other.timestamp) {
|
||||
result = 1;
|
||||
} else if (this.timestamp > other.timestamp) {
|
||||
result = -1;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -426,7 +426,7 @@ public class HTable implements HConstants {
|
|||
*/
|
||||
public HScannerInterface obtainScanner(Text[] columns, Text startRow)
|
||||
throws IOException {
|
||||
return obtainScanner(columns, startRow, System.currentTimeMillis(), null);
|
||||
return obtainScanner(columns, startRow, HConstants.LATEST_TIMESTAMP, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -466,7 +466,7 @@ public class HTable implements HConstants {
|
|||
public HScannerInterface obtainScanner(Text[] columns, Text startRow,
|
||||
RowFilterInterface filter)
|
||||
throws IOException {
|
||||
return obtainScanner(columns, startRow, System.currentTimeMillis(), filter);
|
||||
return obtainScanner(columns, startRow, HConstants.LATEST_TIMESTAMP, filter);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -490,7 +490,7 @@ public class HTable implements HConstants {
|
|||
final Text startRow, final Text stopRow)
|
||||
throws IOException {
|
||||
return obtainScanner(columns, startRow, stopRow,
|
||||
System.currentTimeMillis());
|
||||
HConstants.LATEST_TIMESTAMP);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,7 +27,10 @@ import java.io.UnsupportedEncodingException;
|
|||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
||||
|
@ -90,7 +93,25 @@ public class Writables {
|
|||
in.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Make a copy of a writable object using serialization to a buffer.
|
||||
* Copied from WritableUtils only <code>conf</code> type is Configurable
|
||||
* rather than JobConf (Doesn't need to be JobConf -- HADOOP-2469).
|
||||
* @param orig The object to copy
|
||||
* @return The copied object
|
||||
*/
|
||||
public static Writable clone(Writable orig, Configuration conf) {
|
||||
try {
|
||||
Writable newInst =
|
||||
(Writable)ReflectionUtils.newInstance(orig.getClass(), conf);
|
||||
WritableUtils.cloneInto(newInst, orig);
|
||||
return newInst;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Error writing/reading clone buffer", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bytes
|
||||
* @return A HRegionInfo instance built out of passed <code>bytes</code>.
|
||||
|
|
|
@ -77,8 +77,7 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
private void init() {
|
||||
conf = new HBaseConfiguration();
|
||||
try {
|
||||
START_KEY =
|
||||
new String(START_KEY_BYTES, HConstants.UTF8_ENCODING) + PUNCTUATION;
|
||||
START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_ENCODING) + PUNCTUATION;
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
fail();
|
||||
}
|
||||
|
@ -125,10 +124,23 @@ public abstract class HBaseTestCase extends TestCase {
|
|||
null), fs, conf, info, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a table of name <code>name</code> with {@link COLUMNS} for
|
||||
* families.
|
||||
* @param name Name to give table.
|
||||
* @return Column descriptor.
|
||||
*/
|
||||
protected HTableDescriptor createTableDescriptor(final String name) {
|
||||
return createTableDescriptor(name, MAXVERSIONS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a table of name <code>name</code> with {@link COLUMNS} for
|
||||
* families.
|
||||
* @param name Name to give table.
|
||||
* @param versions How many versions to allow per column.
|
||||
* @return Column descriptor.
|
||||
*/
|
||||
protected HTableDescriptor createTableDescriptor(final String name,
|
||||
final int versions) {
|
||||
HTableDescriptor htd = new HTableDescriptor(name);
|
||||
|
|
|
@ -71,6 +71,54 @@ public class TestScanner2 extends HBaseClusterTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for HADOOP-2467 fix. If scanning more than one column family,
|
||||
* filters such as the {@line WhileMatchRowFilter} could prematurely
|
||||
* shutdown scanning if one of the stores ran started returned filter = true.
|
||||
* @throws MasterNotRunningException
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testScanningMultipleFamiliesOfDifferentVintage()
|
||||
throws MasterNotRunningException, IOException {
|
||||
Text tableName = new Text(getName());
|
||||
final Text [] families = createTable(new HBaseAdmin(this.conf), tableName);
|
||||
HTable table = new HTable(this.conf, tableName);
|
||||
HScannerInterface scanner = null;
|
||||
try {
|
||||
long time = System.currentTimeMillis();
|
||||
LOG.info("Current time " + time);
|
||||
for (int i = 0; i < families.length; i++) {
|
||||
final byte [] lastKey = new byte [] {'a', 'a', (byte)('b' + i)};
|
||||
Incommon inc = new HTableIncommon(table);
|
||||
addContent(inc, families[i].toString(),
|
||||
START_KEY_BYTES, new Text(lastKey), time + (1000 * i));
|
||||
// Add in to the first store a record that is in excess of the stop
|
||||
// row specified below setting up the scanner filter. Add 'bbb'.
|
||||
// Use a stop filter of 'aad'. The store scanner going to 'bbb' was
|
||||
// flipping the switch in StopRowFilter stopping us returning all
|
||||
// of the rest of the other store content.
|
||||
if (i == 0) {
|
||||
long id = inc.startBatchUpdate(new Text("bbb"));
|
||||
inc.put(id, families[0], "bbb".getBytes());
|
||||
inc.commit(id);
|
||||
}
|
||||
}
|
||||
RowFilterInterface f =
|
||||
new WhileMatchRowFilter(new StopRowFilter(new Text("aad")));
|
||||
scanner = table.obtainScanner(families, HConstants.EMPTY_START_ROW,
|
||||
HConstants.LATEST_TIMESTAMP, f);
|
||||
int count = 0;
|
||||
for (Map.Entry<HStoreKey, SortedMap<Text, byte []>> e: scanner) {
|
||||
count++;
|
||||
}
|
||||
// Should get back 3 rows: aaa, aab, and aac.
|
||||
assertEquals(count, 3);
|
||||
} finally {
|
||||
scanner.close();
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue