HADOOP-2350 Scanner api returns null row names, or skips row names if different column families do not have entries for some rows
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@602334 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c80b8c75e
commit
870cb3b5cf
|
@ -64,6 +64,8 @@ Trunk (unreleased changes)
|
|||
HADOOP-2338 Fix NullPointerException in master server.
|
||||
HADOOP-2380 REST servlet throws NPE when any value node has an empty string
|
||||
(Bryan Duxbury via Stack)
|
||||
HADOOP-2350 Scanner api returns null row names, or skips row names if
|
||||
different column families do not have entries for some rows
|
||||
|
||||
IMPROVEMENTS
|
||||
HADOOP-2401 Add convenience put method that takes writable
|
||||
|
|
|
@ -332,7 +332,7 @@ class HMerge implements HConstants {
|
|||
HRegion root =
|
||||
new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null);
|
||||
|
||||
HInternalScannerInterface rootScanner =
|
||||
HScannerInterface rootScanner =
|
||||
root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
|
||||
|
||||
try {
|
||||
|
|
|
@ -1088,7 +1088,7 @@ public class HRegion implements HConstants {
|
|||
* @return HScannerInterface
|
||||
* @throws IOException
|
||||
*/
|
||||
public HInternalScannerInterface getScanner(Text[] cols, Text firstRow,
|
||||
public HScannerInterface getScanner(Text[] cols, Text firstRow,
|
||||
long timestamp, RowFilterInterface filter) throws IOException {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
|
@ -1485,33 +1485,21 @@ public class HRegion implements HConstants {
|
|||
/**
|
||||
* HScanner is an iterator through a bunch of rows in an HRegion.
|
||||
*/
|
||||
private class HScanner implements HInternalScannerInterface {
|
||||
private class HScanner implements HScannerInterface {
|
||||
private HInternalScannerInterface[] scanners;
|
||||
private boolean wildcardMatch = false;
|
||||
private boolean multipleMatchers = false;
|
||||
private TreeMap<Text, byte []>[] resultSets;
|
||||
private HStoreKey[] keys;
|
||||
|
||||
/** Create an HScanner with a handle on many HStores. */
|
||||
@SuppressWarnings("unchecked")
|
||||
HScanner(Text[] cols, Text firstRow, long timestamp, HStore[] stores,
|
||||
RowFilterInterface filter) throws IOException {
|
||||
this.scanners = new HInternalScannerInterface[stores.length];
|
||||
|
||||
// Advance to the first key in each store.
|
||||
// All results will match the required column-set and scanTime.
|
||||
|
||||
this.scanners = new HInternalScannerInterface[stores.length];
|
||||
try {
|
||||
for (int i = 0; i < stores.length; i++) {
|
||||
HInternalScannerInterface scanner =
|
||||
scanners[i] =
|
||||
stores[i].getScanner(timestamp, cols, firstRow, filter);
|
||||
|
||||
if (scanner.isWildcardScanner()) {
|
||||
this.wildcardMatch = true;
|
||||
}
|
||||
if (scanner.isMultipleMatchScanner()) {
|
||||
this.multipleMatchers = true;
|
||||
}
|
||||
}
|
||||
scanners[i] = stores[i].getScanner(timestamp, cols, firstRow, filter);
|
||||
}
|
||||
|
||||
} catch(IOException e) {
|
||||
for (int i = 0; i < this.scanners.length; i++) {
|
||||
|
@ -1521,35 +1509,100 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Advance to the first key in each store.
|
||||
// All results will match the required column-set and scanTime.
|
||||
|
||||
this.resultSets = new TreeMap[scanners.length];
|
||||
this.keys = new HStoreKey[scanners.length];
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
keys[i] = new HStoreKey();
|
||||
resultSets[i] = new TreeMap<Text, byte []>();
|
||||
if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
|
||||
// As we have now successfully completed initialization, increment the
|
||||
// activeScanner count.
|
||||
activeScannerCount.incrementAndGet();
|
||||
}
|
||||
|
||||
/** @return true if the scanner is a wild card scanner */
|
||||
public boolean isWildcardScanner() {
|
||||
return wildcardMatch;
|
||||
}
|
||||
|
||||
/** @return true if the scanner is a multiple match scanner */
|
||||
public boolean isMultipleMatchScanner() {
|
||||
return multipleMatchers;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
|
||||
throws IOException {
|
||||
boolean haveResults = false;
|
||||
boolean moreToFollow = false;
|
||||
|
||||
// Find the lowest-possible key.
|
||||
|
||||
Text chosenRow = null;
|
||||
long chosenTimestamp = -1;
|
||||
for (int i = 0; i < this.keys.length; i++) {
|
||||
if (scanners[i] != null &&
|
||||
(chosenRow == null ||
|
||||
(keys[i].getRow().compareTo(chosenRow) < 0) ||
|
||||
((keys[i].getRow().compareTo(chosenRow) == 0) &&
|
||||
(keys[i].getTimestamp() > chosenTimestamp)))) {
|
||||
chosenRow = new Text(keys[i].getRow());
|
||||
chosenTimestamp = keys[i].getTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
// Store the key and results for each sub-scanner. Merge them as
|
||||
// appropriate.
|
||||
if (chosenTimestamp >= 0) {
|
||||
// Here we are setting the passed in key with current row+timestamp
|
||||
key.setRow(chosenRow);
|
||||
key.setVersion(chosenTimestamp);
|
||||
key.setColumn(HConstants.EMPTY_TEXT);
|
||||
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
if (scanners[i] != null && keys[i].getRow().compareTo(chosenRow) == 0) {
|
||||
// NOTE: We used to do results.putAll(resultSets[i]);
|
||||
// but this had the effect of overwriting newer
|
||||
// values with older ones. So now we only insert
|
||||
// a result if the map does not contain the key.
|
||||
for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
|
||||
if (!results.containsKey(e.getKey())) {
|
||||
results.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
resultSets[i].clear();
|
||||
if (!scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
if (scanners[i] != null) {
|
||||
if (scanners[i].next(key, results)) {
|
||||
haveResults = true;
|
||||
} else {
|
||||
// If the current scanner is non-null AND has a lower-or-equal
|
||||
// row label, then its timestamp is bad. We need to advance it.
|
||||
while ((scanners[i] != null) &&
|
||||
(keys[i].getRow().compareTo(chosenRow) <= 0)) {
|
||||
|
||||
resultSets[i].clear();
|
||||
if (!scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return haveResults;
|
||||
|
||||
moreToFollow = chosenTimestamp >= 0;
|
||||
if (results == null || results.size() <= 0) {
|
||||
// If we got no results, then there is no more to follow.
|
||||
moreToFollow = false;
|
||||
}
|
||||
|
||||
// Make sure scanners closed if no more results
|
||||
if (!moreToFollow) {
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
if (null != scanners[i]) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
return moreToFollow;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1563,6 +1616,8 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
} finally {
|
||||
scanners[i] = null;
|
||||
resultSets[i] = null;
|
||||
keys[i] = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1373,7 +1373,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
requestCount.incrementAndGet();
|
||||
try {
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
HInternalScannerInterface s = scanners.get(scannerName);
|
||||
HScannerInterface s = scanners.get(scannerName);
|
||||
if (s == null) {
|
||||
throw new UnknownScannerException("Name: " + scannerName);
|
||||
}
|
||||
|
@ -1433,7 +1433,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
try {
|
||||
HRegion r = getRegion(regionName);
|
||||
long scannerId = -1L;
|
||||
HInternalScannerInterface s =
|
||||
HScannerInterface s =
|
||||
r.getScanner(cols, firstRow, timestamp, filter);
|
||||
scannerId = rand.nextLong();
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
|
@ -1457,7 +1457,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
requestCount.incrementAndGet();
|
||||
try {
|
||||
String scannerName = String.valueOf(scannerId);
|
||||
HInternalScannerInterface s = null;
|
||||
HScannerInterface s = null;
|
||||
synchronized(scanners) {
|
||||
s = scanners.remove(scannerName);
|
||||
}
|
||||
|
@ -1472,9 +1472,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
Map<String, HInternalScannerInterface> scanners =
|
||||
Collections.synchronizedMap(new HashMap<String,
|
||||
HInternalScannerInterface>());
|
||||
Map<String, HScannerInterface> scanners =
|
||||
Collections.synchronizedMap(new HashMap<String, HScannerInterface>());
|
||||
|
||||
/**
|
||||
* Instantiated as a scanner lease.
|
||||
|
@ -1490,7 +1489,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
/** {@inheritDoc} */
|
||||
public void leaseExpired() {
|
||||
LOG.info("Scanner " + this.scannerName + " lease expired");
|
||||
HInternalScannerInterface s = null;
|
||||
HScannerInterface s = null;
|
||||
synchronized(scanners) {
|
||||
s = scanners.remove(this.scannerName);
|
||||
}
|
||||
|
|
|
@ -283,7 +283,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
|
|||
|
||||
startTime = System.currentTimeMillis();
|
||||
|
||||
HInternalScannerInterface s =
|
||||
HScannerInterface s =
|
||||
r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
|
||||
int numFetched = 0;
|
||||
try {
|
||||
|
@ -630,7 +630,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
|
|||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
HInternalScannerInterface s =
|
||||
HScannerInterface s =
|
||||
r.getScanner(cols, new Text(), System.currentTimeMillis(), null);
|
||||
|
||||
try {
|
||||
|
|
|
@ -69,7 +69,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
private void scan(boolean validateStartcode, String serverName)
|
||||
throws IOException {
|
||||
|
||||
HInternalScannerInterface scanner = null;
|
||||
HScannerInterface scanner = null;
|
||||
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
|
||||
HStoreKey key = new HStoreKey();
|
||||
|
||||
|
@ -108,7 +108,7 @@ public class TestScanner extends HBaseTestCase {
|
|||
}
|
||||
|
||||
} finally {
|
||||
HInternalScannerInterface s = scanner;
|
||||
HScannerInterface s = scanner;
|
||||
scanner = null;
|
||||
if(s != null) {
|
||||
s.close();
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/** test the scanner API at all levels */
|
||||
public class TestScannerAPI extends HBaseClusterTestCase {
|
||||
private final Text[] columns = new Text[] {
|
||||
new Text("a:"),
|
||||
new Text("b:")
|
||||
};
|
||||
private final Text startRow = new Text("0");
|
||||
|
||||
private final TreeMap<Text, SortedMap<Text, byte[]>> values =
|
||||
new TreeMap<Text, SortedMap<Text, byte[]>>();
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
*/
|
||||
public TestScannerAPI() throws Exception {
|
||||
super();
|
||||
try {
|
||||
TreeMap<Text, byte[]> columns = new TreeMap<Text, byte[]>();
|
||||
columns.put(new Text("a:1"), "1".getBytes(HConstants.UTF8_ENCODING));
|
||||
values.put(new Text("1"), columns);
|
||||
columns = new TreeMap<Text, byte[]>();
|
||||
columns.put(new Text("a:2"), "2".getBytes(HConstants.UTF8_ENCODING));
|
||||
columns.put(new Text("b:2"), "2".getBytes(HConstants.UTF8_ENCODING));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testApi() throws IOException {
|
||||
final String tableName = getName();
|
||||
|
||||
// Create table
|
||||
|
||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
||||
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
|
||||
for (int i = 0; i < columns.length; i++) {
|
||||
tableDesc.addFamily(new HColumnDescriptor(columns[i].toString()));
|
||||
}
|
||||
admin.createTable(tableDesc);
|
||||
|
||||
// Insert values
|
||||
|
||||
HTable table = new HTable(conf, new Text(getName()));
|
||||
|
||||
for (Map.Entry<Text, SortedMap<Text, byte[]>> row: values.entrySet()) {
|
||||
long lockid = table.startUpdate(row.getKey());
|
||||
for (Map.Entry<Text, byte[]> val: row.getValue().entrySet()) {
|
||||
table.put(lockid, val.getKey(), val.getValue());
|
||||
}
|
||||
table.commit(lockid);
|
||||
}
|
||||
|
||||
HRegion region = null;
|
||||
try {
|
||||
SortedMap<Text, HRegion> regions =
|
||||
cluster.getRegionThreads().get(0).getRegionServer().getOnlineRegions();
|
||||
for (Map.Entry<Text, HRegion> e: regions.entrySet()) {
|
||||
if (!e.getValue().getRegionInfo().isMetaRegion()) {
|
||||
region = e.getValue();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
IOException iox = new IOException("error finding region");
|
||||
iox.initCause(e);
|
||||
throw iox;
|
||||
}
|
||||
@SuppressWarnings("null")
|
||||
HScannerInterface scanner =
|
||||
region.getScanner(columns, startRow, System.currentTimeMillis(), null);
|
||||
try {
|
||||
verify(scanner);
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
scanner = table.obtainScanner(columns, startRow);
|
||||
try {
|
||||
verify(scanner);
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
scanner = table.obtainScanner(columns, startRow);
|
||||
try {
|
||||
for (Iterator<Map.Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator =
|
||||
scanner.iterator();
|
||||
iterator.hasNext();
|
||||
) {
|
||||
Map.Entry<HStoreKey, SortedMap<Text, byte[]>> row = iterator.next();
|
||||
HStoreKey key = row.getKey();
|
||||
assertTrue("row key", values.containsKey(key.getRow()));
|
||||
|
||||
SortedMap<Text, byte[]> results = row.getValue();
|
||||
SortedMap<Text, byte[]> columnValues = values.get(key.getRow());
|
||||
assertEquals(columnValues.size(), results.size());
|
||||
for (Map.Entry<Text, byte[]> e: columnValues.entrySet()) {
|
||||
Text column = e.getKey();
|
||||
assertTrue("column", results.containsKey(column));
|
||||
assertTrue("value", Arrays.equals(columnValues.get(column),
|
||||
results.get(column)));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void verify(HScannerInterface scanner) throws IOException {
|
||||
HStoreKey key = new HStoreKey();
|
||||
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
while (scanner.next(key, results)) {
|
||||
Text row = key.getRow();
|
||||
assertTrue("row key", values.containsKey(row));
|
||||
|
||||
SortedMap<Text, byte[]> columnValues = values.get(row);
|
||||
assertEquals(columnValues.size(), results.size());
|
||||
for (Map.Entry<Text, byte[]> e: columnValues.entrySet()) {
|
||||
Text column = e.getKey();
|
||||
assertTrue("column", results.containsKey(column));
|
||||
assertTrue("value", Arrays.equals(columnValues.get(column),
|
||||
results.get(column)));
|
||||
}
|
||||
results.clear();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -228,7 +228,7 @@ public class TestSplit extends MultiRegionTable {
|
|||
final Text firstValue)
|
||||
throws IOException {
|
||||
Text [] cols = new Text[] {new Text(column)};
|
||||
HInternalScannerInterface s = r.getScanner(cols,
|
||||
HScannerInterface s = r.getScanner(cols,
|
||||
HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
|
||||
try {
|
||||
HStoreKey curKey = new HStoreKey();
|
||||
|
|
Loading…
Reference in New Issue