HBASE-443 Move internal classes out of HStore
-Moved out classes Memcache, StoreFileScanner, MapFileCompactionReader, and HStoreScanner, and interface CompactionReader -Updated TestHMemcache to use the correct type declaration. git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@636815 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9057e559a1
commit
8c815c72f4
|
@ -83,6 +83,7 @@ Hbase Change Log
|
|||
HBASE-479 Speed up TestLogRolling
|
||||
HBASE-480 Tool to manually merge two regions
|
||||
HBASE-477 Add support for an HBASE_CLASSPATH
|
||||
HBASE-443 Move internal classes out of HStore
|
||||
|
||||
Branch 0.1
|
||||
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/** Interface for generic reader for compactions */
|
||||
interface CompactionReader {
|
||||
|
||||
/**
|
||||
* Closes the reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public void close() throws IOException;
|
||||
|
||||
/**
|
||||
* Get the next key/value pair
|
||||
*
|
||||
* @param key
|
||||
* @param val
|
||||
* @return true if more data was returned
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean next(WritableComparable key, Writable val)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Resets the reader
|
||||
* @throws IOException
|
||||
*/
|
||||
public void reset() throws IOException;
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,293 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.TreeMap;
|
||||
import java.util.SortedMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
||||
/**
|
||||
* Scanner scans both the memcache and the HStore
|
||||
*/
|
||||
class HStoreScanner implements HInternalScannerInterface {
|
||||
static final Log LOG = LogFactory.getLog(HStoreScanner.class);
|
||||
|
||||
private HInternalScannerInterface[] scanners;
|
||||
private TreeMap<Text, byte []>[] resultSets;
|
||||
private HStoreKey[] keys;
|
||||
private boolean wildcardMatch = false;
|
||||
private boolean multipleMatchers = false;
|
||||
private RowFilterInterface dataFilter;
|
||||
private HStore store;
|
||||
|
||||
/** Create an Scanner with a handle on the memcache and HStore files. */
|
||||
@SuppressWarnings("unchecked")
|
||||
HStoreScanner(HStore store, Text[] targetCols, Text firstRow, long timestamp,
|
||||
RowFilterInterface filter)
|
||||
throws IOException {
|
||||
this.store = store;
|
||||
this.dataFilter = filter;
|
||||
if (null != dataFilter) {
|
||||
dataFilter.reset();
|
||||
}
|
||||
this.scanners = new HInternalScannerInterface[2];
|
||||
this.resultSets = new TreeMap[scanners.length];
|
||||
this.keys = new HStoreKey[scanners.length];
|
||||
|
||||
try {
|
||||
scanners[0] = store.memcache.getScanner(timestamp, targetCols, firstRow);
|
||||
scanners[1] = new StoreFileScanner(store, timestamp, targetCols, firstRow);
|
||||
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
if (scanners[i].isWildcardScanner()) {
|
||||
this.wildcardMatch = true;
|
||||
}
|
||||
if (scanners[i].isMultipleMatchScanner()) {
|
||||
this.multipleMatchers = true;
|
||||
}
|
||||
}
|
||||
|
||||
} catch(IOException e) {
|
||||
for (int i = 0; i < this.scanners.length; i++) {
|
||||
if(scanners[i] != null) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Advance to the first key in each scanner.
|
||||
// All results will match the required column-set and scanTime.
|
||||
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
keys[i] = new HStoreKey();
|
||||
resultSets[i] = new TreeMap<Text, byte []>();
|
||||
if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
// As we have now successfully completed initialization, increment the
|
||||
// activeScanner count.
|
||||
store.activeScanners.incrementAndGet();
|
||||
}
|
||||
|
||||
/** @return true if the scanner is a wild card scanner */
|
||||
public boolean isWildcardScanner() {
|
||||
return wildcardMatch;
|
||||
}
|
||||
|
||||
/** @return true if the scanner is a multiple match scanner */
|
||||
public boolean isMultipleMatchScanner() {
|
||||
return multipleMatchers;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
|
||||
throws IOException {
|
||||
|
||||
// Filtered flag is set by filters. If a cell has been 'filtered out'
|
||||
// -- i.e. it is not to be returned to the caller -- the flag is 'true'.
|
||||
boolean filtered = true;
|
||||
boolean moreToFollow = true;
|
||||
while (filtered && moreToFollow) {
|
||||
// Find the lowest-possible key.
|
||||
Text chosenRow = null;
|
||||
long chosenTimestamp = -1;
|
||||
for (int i = 0; i < this.keys.length; i++) {
|
||||
if (scanners[i] != null &&
|
||||
(chosenRow == null ||
|
||||
(keys[i].getRow().compareTo(chosenRow) < 0) ||
|
||||
((keys[i].getRow().compareTo(chosenRow) == 0) &&
|
||||
(keys[i].getTimestamp() > chosenTimestamp)))) {
|
||||
chosenRow = new Text(keys[i].getRow());
|
||||
chosenTimestamp = keys[i].getTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
// Filter whole row by row key?
|
||||
filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
|
||||
|
||||
// Store the key and results for each sub-scanner. Merge them as
|
||||
// appropriate.
|
||||
if (chosenTimestamp >= 0 && !filtered) {
|
||||
// Here we are setting the passed in key with current row+timestamp
|
||||
key.setRow(chosenRow);
|
||||
key.setVersion(chosenTimestamp);
|
||||
key.setColumn(HConstants.EMPTY_TEXT);
|
||||
// Keep list of deleted cell keys within this row. We need this
|
||||
// because as we go through scanners, the delete record may be in an
|
||||
// early scanner and then the same record with a non-delete, non-null
|
||||
// value in a later. Without history of what we've seen, we'll return
|
||||
// deleted values. This List should not ever grow too large since we
|
||||
// are only keeping rows and columns that match those set on the
|
||||
// scanner and which have delete values. If memory usage becomes a
|
||||
// problem, could redo as bloom filter.
|
||||
List<HStoreKey> deletes = new ArrayList<HStoreKey>();
|
||||
for (int i = 0; i < scanners.length && !filtered; i++) {
|
||||
while ((scanners[i] != null
|
||||
&& !filtered
|
||||
&& moreToFollow)
|
||||
&& (keys[i].getRow().compareTo(chosenRow) == 0)) {
|
||||
// If we are doing a wild card match or there are multiple
|
||||
// matchers per column, we need to scan all the older versions of
|
||||
// this row to pick up the rest of the family members
|
||||
if (!wildcardMatch
|
||||
&& !multipleMatchers
|
||||
&& (keys[i].getTimestamp() != chosenTimestamp)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Filter out null criteria columns that are not null
|
||||
if (dataFilter != null) {
|
||||
filtered = dataFilter.filterNotNull(resultSets[i]);
|
||||
}
|
||||
|
||||
// NOTE: We used to do results.putAll(resultSets[i]);
|
||||
// but this had the effect of overwriting newer
|
||||
// values with older ones. So now we only insert
|
||||
// a result if the map does not contain the key.
|
||||
HStoreKey hsk = new HStoreKey(key.getRow(), HConstants.EMPTY_TEXT,
|
||||
key.getTimestamp());
|
||||
for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
|
||||
hsk.setColumn(e.getKey());
|
||||
if (HLogEdit.isDeleted(e.getValue())) {
|
||||
if (!deletes.contains(hsk)) {
|
||||
// Key changes as we cycle the for loop so add a copy to
|
||||
// the set of deletes.
|
||||
deletes.add(new HStoreKey(hsk));
|
||||
}
|
||||
} else if (!deletes.contains(hsk) &&
|
||||
!filtered &&
|
||||
moreToFollow &&
|
||||
!results.containsKey(e.getKey())) {
|
||||
if (dataFilter != null) {
|
||||
// Filter whole row by column data?
|
||||
filtered =
|
||||
dataFilter.filter(chosenRow, e.getKey(), e.getValue());
|
||||
if (filtered) {
|
||||
results.clear();
|
||||
break;
|
||||
}
|
||||
}
|
||||
results.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
resultSets[i].clear();
|
||||
if (!scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
// If the current scanner is non-null AND has a lower-or-equal
|
||||
// row label, then its timestamp is bad. We need to advance it.
|
||||
while ((scanners[i] != null) &&
|
||||
(keys[i].getRow().compareTo(chosenRow) <= 0)) {
|
||||
resultSets[i].clear();
|
||||
if (!scanners[i].next(keys[i], resultSets[i])) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
moreToFollow = chosenTimestamp >= 0;
|
||||
|
||||
if (dataFilter != null) {
|
||||
if (moreToFollow) {
|
||||
dataFilter.rowProcessed(filtered, chosenRow);
|
||||
}
|
||||
if (dataFilter.filterAllRemaining()) {
|
||||
moreToFollow = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (results.size() <= 0 && !filtered) {
|
||||
// There were no results found for this row. Marked it as
|
||||
// 'filtered'-out otherwise we will not move on to the next row.
|
||||
filtered = true;
|
||||
}
|
||||
}
|
||||
|
||||
// If we got no results, then there is no more to follow.
|
||||
if (results == null || results.size() <= 0) {
|
||||
moreToFollow = false;
|
||||
}
|
||||
|
||||
// Make sure scanners closed if no more results
|
||||
if (!moreToFollow) {
|
||||
for (int i = 0; i < scanners.length; i++) {
|
||||
if (null != scanners[i]) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return moreToFollow;
|
||||
}
|
||||
|
||||
/** Shut down a single scanner */
|
||||
void closeScanner(int i) {
|
||||
try {
|
||||
try {
|
||||
scanners[i].close();
|
||||
} catch (IOException e) {
|
||||
LOG.warn(store.storeName + " failed closing scanner " + i, e);
|
||||
}
|
||||
} finally {
|
||||
scanners[i] = null;
|
||||
keys[i] = null;
|
||||
resultSets[i] = null;
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void close() {
|
||||
try {
|
||||
for(int i = 0; i < scanners.length; i++) {
|
||||
if(scanners[i] != null) {
|
||||
closeScanner(i);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
store.updateActiveScanners();
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public Iterator<Map.Entry<HStoreKey, SortedMap<Text, byte[]>>> iterator() {
|
||||
throw new UnsupportedOperationException("Unimplemented serverside. " +
|
||||
"next(HStoreKey, StortedMap(...) is more efficient");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/** A compaction reader for MapFile */
|
||||
class MapFileCompactionReader implements CompactionReader {
|
||||
final MapFile.Reader reader;
|
||||
|
||||
MapFileCompactionReader(final MapFile.Reader r) {
|
||||
this.reader = r;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void close() throws IOException {
|
||||
this.reader.close();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public boolean next(WritableComparable key, Writable val)
|
||||
throws IOException {
|
||||
return this.reader.next(key, val);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void reset() throws IOException {
|
||||
this.reader.reset();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,536 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
||||
|
||||
/**
|
||||
* The Memcache holds in-memory modifications to the HRegion. This is really a
|
||||
* wrapper around a TreeMap that helps us when staging the Memcache out to disk.
|
||||
*/
|
||||
class Memcache {
|
||||
|
||||
// Note that since these structures are always accessed with a lock held,
|
||||
// no additional synchronization is required.
|
||||
|
||||
@SuppressWarnings("hiding")
|
||||
private final SortedMap<HStoreKey, byte[]> memcache =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
|
||||
volatile SortedMap<HStoreKey, byte[]> snapshot;
|
||||
|
||||
@SuppressWarnings("hiding")
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public Memcache() {
|
||||
snapshot =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a snapshot of the current Memcache
|
||||
*/
|
||||
void snapshot() {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
synchronized (memcache) {
|
||||
if (memcache.size() != 0) {
|
||||
snapshot.putAll(memcache);
|
||||
memcache.clear();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return memcache snapshot
|
||||
*/
|
||||
SortedMap<HStoreKey, byte[]> getSnapshot() {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
SortedMap<HStoreKey, byte[]> currentSnapshot = snapshot;
|
||||
snapshot =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>());
|
||||
|
||||
return currentSnapshot;
|
||||
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a value.
|
||||
* @param key
|
||||
* @param value
|
||||
*/
|
||||
void add(final HStoreKey key, final byte[] value) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
memcache.put(key, value);
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Look back through all the backlog TreeMaps to find the target.
|
||||
* @param key
|
||||
* @param numVersions
|
||||
* @return An array of byte arrays ordered by timestamp.
|
||||
*/
|
||||
List<Cell> get(final HStoreKey key, final int numVersions) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
List<Cell> results;
|
||||
synchronized (memcache) {
|
||||
results = internalGet(memcache, key, numVersions);
|
||||
}
|
||||
synchronized (snapshot) {
|
||||
results.addAll(results.size(),
|
||||
internalGet(snapshot, key, numVersions - results.size()));
|
||||
}
|
||||
return results;
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all the available columns for the given key. The key indicates a
|
||||
* row and timestamp, but not a column name.
|
||||
*
|
||||
* The returned object should map column names to byte arrays (byte[]).
|
||||
* @param key
|
||||
* @param results
|
||||
*/
|
||||
void getFull(HStoreKey key, Set<Text> columns, SortedMap<Text, Cell> results) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
synchronized (memcache) {
|
||||
internalGetFull(memcache, key, columns, results);
|
||||
}
|
||||
synchronized (snapshot) {
|
||||
internalGetFull(snapshot, key, columns, results);
|
||||
}
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void internalGetFull(SortedMap<HStoreKey, byte[]> map, HStoreKey key,
|
||||
Set<Text> columns, SortedMap<Text, Cell> results) {
|
||||
|
||||
if (map.isEmpty() || key == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
SortedMap<HStoreKey, byte[]> tailMap = map.tailMap(key);
|
||||
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
|
||||
HStoreKey itKey = es.getKey();
|
||||
Text itCol = itKey.getColumn();
|
||||
if (results.get(itCol) == null && key.matchesWithoutColumn(itKey)) {
|
||||
byte [] val = tailMap.get(itKey);
|
||||
|
||||
if (!HLogEdit.isDeleted(val)) {
|
||||
if (columns == null || columns.contains(itKey.getColumn())) {
|
||||
results.put(itCol, new Cell(val, itKey.getTimestamp()));
|
||||
}
|
||||
}
|
||||
|
||||
} else if (key.getRow().compareTo(itKey.getRow()) < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param row
|
||||
* @param timestamp
|
||||
* @return the key that matches <i>row</i> exactly, or the one that
|
||||
* immediately preceeds it.
|
||||
*/
|
||||
public Text getRowKeyAtOrBefore(final Text row, long timestamp) {
|
||||
this.lock.readLock().lock();
|
||||
|
||||
Text key_memcache = null;
|
||||
Text key_snapshot = null;
|
||||
|
||||
try {
|
||||
synchronized (memcache) {
|
||||
key_memcache = internalGetRowKeyAtOrBefore(memcache, row, timestamp);
|
||||
}
|
||||
synchronized (snapshot) {
|
||||
key_snapshot = internalGetRowKeyAtOrBefore(snapshot, row, timestamp);
|
||||
}
|
||||
|
||||
if (key_memcache == null && key_snapshot == null) {
|
||||
// didn't find any candidates, return null
|
||||
return null;
|
||||
} else if (key_memcache == null && key_snapshot != null) {
|
||||
return key_snapshot;
|
||||
} else if (key_memcache != null && key_snapshot == null) {
|
||||
return key_memcache;
|
||||
} else if ( (key_memcache != null && key_memcache.equals(row))
|
||||
|| (key_snapshot != null && key_snapshot.equals(row)) ) {
|
||||
// if either is a precise match, return the original row.
|
||||
return row;
|
||||
} else if (key_memcache != null) {
|
||||
// no precise matches, so return the one that is closer to the search
|
||||
// key (greatest)
|
||||
return key_memcache.compareTo(key_snapshot) > 0 ?
|
||||
key_memcache : key_snapshot;
|
||||
}
|
||||
return null;
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private Text internalGetRowKeyAtOrBefore(SortedMap<HStoreKey, byte []> map,
|
||||
Text key, long timestamp) {
|
||||
// TODO: account for deleted cells
|
||||
|
||||
HStoreKey search_key = new HStoreKey(key, timestamp);
|
||||
|
||||
// get all the entries that come equal or after our search key
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(search_key);
|
||||
|
||||
// if the first item in the tail has a matching row, then we have an
|
||||
// exact match, and we should return that item
|
||||
if (!tailMap.isEmpty() && tailMap.firstKey().getRow().equals(key)) {
|
||||
// seek forward past any cells that don't fulfill the timestamp
|
||||
// argument
|
||||
Iterator<HStoreKey> key_iterator = tailMap.keySet().iterator();
|
||||
HStoreKey found_key = key_iterator.next();
|
||||
|
||||
// keep seeking so long as we're in the same row, and the timstamp
|
||||
// isn't as small as we'd like, and there are more cells to check
|
||||
while (found_key.getRow().equals(key)
|
||||
&& found_key.getTimestamp() > timestamp && key_iterator.hasNext()) {
|
||||
found_key = key_iterator.next();
|
||||
}
|
||||
|
||||
// if this check fails, then we've iterated through all the keys that
|
||||
// match by row, but none match by timestamp, so we fall through to
|
||||
// the headMap case.
|
||||
if (found_key.getTimestamp() <= timestamp) {
|
||||
// we didn't find a key that matched by timestamp, so we have to
|
||||
// return null;
|
||||
/* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/
|
||||
return found_key.getRow();
|
||||
}
|
||||
}
|
||||
|
||||
// the tail didn't contain the key we're searching for, so we should
|
||||
// use the last key in the headmap as the closest before
|
||||
SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
|
||||
return headMap.isEmpty()? null: headMap.lastKey().getRow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Examine a single map for the desired key.
|
||||
*
|
||||
* TODO - This is kinda slow. We need a data structure that allows for
|
||||
* proximity-searches, not just precise-matches.
|
||||
*
|
||||
* @param map
|
||||
* @param key
|
||||
* @param numVersions
|
||||
* @return Ordered list of items found in passed <code>map</code>. If no
|
||||
* matching values, returns an empty list (does not return null).
|
||||
*/
|
||||
private ArrayList<Cell> internalGet(
|
||||
final SortedMap<HStoreKey, byte []> map, final HStoreKey key,
|
||||
final int numVersions) {
|
||||
|
||||
ArrayList<Cell> result = new ArrayList<Cell>();
|
||||
// TODO: If get is of a particular version -- numVersions == 1 -- we
|
||||
// should be able to avoid all of the tailmap creations and iterations
|
||||
// below.
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
|
||||
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
|
||||
HStoreKey itKey = es.getKey();
|
||||
if (itKey.matchesRowCol(key)) {
|
||||
if (!HLogEdit.isDeleted(es.getValue())) {
|
||||
result.add(new Cell(tailMap.get(itKey), itKey.getTimestamp()));
|
||||
}
|
||||
}
|
||||
if (numVersions > 0 && result.size() >= numVersions) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get <code>versions</code> keys matching the origin key's
|
||||
* row/column/timestamp and those of an older vintage
|
||||
* Default access so can be accessed out of {@link HRegionServer}.
|
||||
* @param origin Where to start searching.
|
||||
* @param versions How many versions to return. Pass
|
||||
* {@link HConstants.ALL_VERSIONS} to retrieve all.
|
||||
* @return Ordered list of <code>versions</code> keys going from newest back.
|
||||
* @throws IOException
|
||||
*/
|
||||
List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
List<HStoreKey> results;
|
||||
synchronized (memcache) {
|
||||
results = internalGetKeys(this.memcache, origin, versions);
|
||||
}
|
||||
synchronized (snapshot) {
|
||||
results.addAll(results.size(), internalGetKeys(snapshot, origin,
|
||||
versions == HConstants.ALL_VERSIONS ? versions :
|
||||
(versions - results.size())));
|
||||
}
|
||||
return results;
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @param origin Where to start searching.
|
||||
* @param versions How many versions to return. Pass
|
||||
* {@link HConstants.ALL_VERSIONS} to retrieve all.
|
||||
* @return List of all keys that are of the same row and column and of
|
||||
* equal or older timestamp. If no keys, returns an empty List. Does not
|
||||
* return null.
|
||||
*/
|
||||
private List<HStoreKey> internalGetKeys(final SortedMap<HStoreKey, byte []> map,
|
||||
final HStoreKey origin, final int versions) {
|
||||
|
||||
List<HStoreKey> result = new ArrayList<HStoreKey>();
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
|
||||
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
|
||||
HStoreKey key = es.getKey();
|
||||
|
||||
// if there's no column name, then compare rows and timestamps
|
||||
if (origin.getColumn().toString().equals("")) {
|
||||
// if the current and origin row don't match, then we can jump
|
||||
// out of the loop entirely.
|
||||
if (!key.getRow().equals(origin.getRow())) {
|
||||
break;
|
||||
}
|
||||
// if the rows match but the timestamp is newer, skip it so we can
|
||||
// get to the ones we actually want.
|
||||
if (key.getTimestamp() > origin.getTimestamp()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else{ // compare rows and columns
|
||||
// if the key doesn't match the row and column, then we're done, since
|
||||
// all the cells are ordered.
|
||||
if (!key.matchesRowCol(origin)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!HLogEdit.isDeleted(es.getValue())) {
|
||||
result.add(key);
|
||||
if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) {
|
||||
// We have enough results. Return.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param key
|
||||
* @return True if an entry and its content is {@link HGlobals.deleteBytes}.
|
||||
* Use checking values in store. On occasion the memcache has the fact that
|
||||
* the cell has been deleted.
|
||||
*/
|
||||
boolean isDeleted(final HStoreKey key) {
|
||||
return HLogEdit.isDeleted(this.memcache.get(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a scanner over the keys in the Memcache
|
||||
*/
|
||||
HInternalScannerInterface getScanner(long timestamp,
|
||||
Text targetCols[], Text firstRow) throws IOException {
|
||||
|
||||
// Here we rely on ReentrantReadWriteLock's ability to acquire multiple
|
||||
// locks by the same thread and to be able to downgrade a write lock to
|
||||
// a read lock. We need to hold a lock throughout this method, but only
|
||||
// need the write lock while creating the memcache snapshot
|
||||
|
||||
this.lock.writeLock().lock(); // hold write lock during memcache snapshot
|
||||
snapshot(); // snapshot memcache
|
||||
this.lock.readLock().lock(); // acquire read lock
|
||||
this.lock.writeLock().unlock(); // downgrade to read lock
|
||||
try {
|
||||
// Prevent a cache flush while we are constructing the scanner
|
||||
|
||||
return new MemcacheScanner(timestamp, targetCols, firstRow);
|
||||
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// MemcacheScanner implements the HScannerInterface.
|
||||
// It lets the caller scan the contents of the Memcache.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
class MemcacheScanner extends HAbstractScanner {
|
||||
SortedMap<HStoreKey, byte []> backingMap;
|
||||
Iterator<HStoreKey> keyIterator;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
MemcacheScanner(final long timestamp, final Text targetCols[],
|
||||
final Text firstRow) throws IOException {
|
||||
|
||||
super(timestamp, targetCols);
|
||||
try {
|
||||
this.backingMap = new TreeMap<HStoreKey, byte[]>();
|
||||
this.backingMap.putAll(snapshot);
|
||||
this.keys = new HStoreKey[1];
|
||||
this.vals = new byte[1][];
|
||||
|
||||
// Generate list of iterators
|
||||
|
||||
HStoreKey firstKey = new HStoreKey(firstRow);
|
||||
if (firstRow != null && firstRow.getLength() != 0) {
|
||||
keyIterator =
|
||||
backingMap.tailMap(firstKey).keySet().iterator();
|
||||
|
||||
} else {
|
||||
keyIterator = backingMap.keySet().iterator();
|
||||
}
|
||||
|
||||
while (getNext(0)) {
|
||||
if (!findFirstRow(0, firstRow)) {
|
||||
continue;
|
||||
}
|
||||
if (columnMatch(0)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (RuntimeException ex) {
|
||||
LOG.error("error initializing Memcache scanner: ", ex);
|
||||
close();
|
||||
IOException e = new IOException("error initializing Memcache scanner");
|
||||
e.initCause(ex);
|
||||
throw e;
|
||||
|
||||
} catch(IOException ex) {
|
||||
LOG.error("error initializing Memcache scanner: ", ex);
|
||||
close();
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The user didn't want to start scanning at the first row. This method
|
||||
* seeks to the requested row.
|
||||
*
|
||||
* @param i which iterator to advance
|
||||
* @param firstRow seek to this row
|
||||
* @return true if this is the first row
|
||||
*/
|
||||
@Override
|
||||
boolean findFirstRow(int i, Text firstRow) {
|
||||
return firstRow.getLength() == 0 ||
|
||||
keys[i].getRow().compareTo(firstRow) >= 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next value from the specified iterator.
|
||||
*
|
||||
* @param i Which iterator to fetch next value from
|
||||
* @return true if there is more data available
|
||||
*/
|
||||
@Override
|
||||
boolean getNext(int i) {
|
||||
boolean result = false;
|
||||
while (true) {
|
||||
if (!keyIterator.hasNext()) {
|
||||
closeSubScanner(i);
|
||||
break;
|
||||
}
|
||||
// Check key is < than passed timestamp for this scanner.
|
||||
HStoreKey hsk = keyIterator.next();
|
||||
if (hsk == null) {
|
||||
throw new NullPointerException("Unexpected null key");
|
||||
}
|
||||
if (hsk.getTimestamp() <= this.timestamp) {
|
||||
this.keys[i] = hsk;
|
||||
this.vals[i] = backingMap.get(keys[i]);
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Shut down an individual map iterator. */
|
||||
@Override
|
||||
void closeSubScanner(int i) {
|
||||
keyIterator = null;
|
||||
keys[i] = null;
|
||||
vals[i] = null;
|
||||
backingMap = null;
|
||||
}
|
||||
|
||||
/** Shut down map iterators */
|
||||
public void close() {
|
||||
if (!scannerClosed) {
|
||||
if(keyIterator != null) {
|
||||
closeSubScanner(0);
|
||||
}
|
||||
scannerClosed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.MapFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
|
||||
/**
|
||||
* A scanner that iterates through the HStore files
|
||||
*/
|
||||
class StoreFileScanner extends HAbstractScanner {
|
||||
@SuppressWarnings("hiding")
|
||||
private MapFile.Reader[] readers;
|
||||
private HStore store;
|
||||
|
||||
public StoreFileScanner(HStore store, long timestamp, Text[] targetCols, Text firstRow)
|
||||
throws IOException {
|
||||
super(timestamp, targetCols);
|
||||
this.store = store;
|
||||
try {
|
||||
this.readers = new MapFile.Reader[store.storefiles.size()];
|
||||
|
||||
// Most recent map file should be first
|
||||
int i = readers.length - 1;
|
||||
for(HStoreFile curHSF: store.storefiles.values()) {
|
||||
readers[i--] = curHSF.getReader(store.fs, store.bloomFilter);
|
||||
}
|
||||
|
||||
this.keys = new HStoreKey[readers.length];
|
||||
this.vals = new byte[readers.length][];
|
||||
|
||||
// Advance the readers to the first pos.
|
||||
for(i = 0; i < readers.length; i++) {
|
||||
keys[i] = new HStoreKey();
|
||||
|
||||
if(firstRow.getLength() != 0) {
|
||||
if(findFirstRow(i, firstRow)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
while(getNext(i)) {
|
||||
if(columnMatch(i)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception ex) {
|
||||
close();
|
||||
IOException e = new IOException("HStoreScanner failed construction");
|
||||
e.initCause(ex);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The user didn't want to start scanning at the first row. This method
|
||||
* seeks to the requested row.
|
||||
*
|
||||
* @param i - which iterator to advance
|
||||
* @param firstRow - seek to this row
|
||||
* @return - true if this is the first row or if the row was not found
|
||||
*/
|
||||
@Override
|
||||
boolean findFirstRow(int i, Text firstRow) throws IOException {
|
||||
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
|
||||
HStoreKey firstKey
|
||||
= (HStoreKey)readers[i].getClosest(new HStoreKey(firstRow), ibw);
|
||||
if (firstKey == null) {
|
||||
// Didn't find it. Close the scanner and return TRUE
|
||||
closeSubScanner(i);
|
||||
return true;
|
||||
}
|
||||
this.vals[i] = ibw.get();
|
||||
keys[i].setRow(firstKey.getRow());
|
||||
keys[i].setColumn(firstKey.getColumn());
|
||||
keys[i].setVersion(firstKey.getTimestamp());
|
||||
return columnMatch(i);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the next value from the specified reader.
|
||||
*
|
||||
* @param i - which reader to fetch next value from
|
||||
* @return - true if there is more data available
|
||||
*/
|
||||
@Override
|
||||
boolean getNext(int i) throws IOException {
|
||||
boolean result = false;
|
||||
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
|
||||
while (true) {
|
||||
if (!readers[i].next(keys[i], ibw)) {
|
||||
closeSubScanner(i);
|
||||
break;
|
||||
}
|
||||
if (keys[i].getTimestamp() <= this.timestamp) {
|
||||
vals[i] = ibw.get();
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Close down the indicated reader. */
|
||||
@Override
|
||||
void closeSubScanner(int i) {
|
||||
try {
|
||||
if(readers[i] != null) {
|
||||
try {
|
||||
readers[i].close();
|
||||
} catch(IOException e) {
|
||||
LOG.error(store.storeName + " closing sub-scanner", e);
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
readers[i] = null;
|
||||
keys[i] = null;
|
||||
vals[i] = null;
|
||||
}
|
||||
}
|
||||
|
||||
/** Shut it down! */
|
||||
public void close() {
|
||||
if(! scannerClosed) {
|
||||
try {
|
||||
for(int i = 0; i < readers.length; i++) {
|
||||
if(readers[i] != null) {
|
||||
try {
|
||||
readers[i].close();
|
||||
} catch(IOException e) {
|
||||
LOG.error(store.storeName + " closing scanner", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
scannerClosed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.io.Cell;
|
|||
/** memcache test case */
|
||||
public class TestHMemcache extends TestCase {
|
||||
|
||||
private HStore.Memcache hmemcache;
|
||||
private Memcache hmemcache;
|
||||
|
||||
private static final int ROW_COUNT = 3;
|
||||
|
||||
|
@ -47,7 +47,7 @@ public class TestHMemcache extends TestCase {
|
|||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
this.hmemcache = new HStore.Memcache();
|
||||
this.hmemcache = new Memcache();
|
||||
}
|
||||
|
||||
private Text getRowName(final int index) {
|
||||
|
@ -63,7 +63,7 @@ public class TestHMemcache extends TestCase {
|
|||
* Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
|
||||
* @param hmc Instance to add rows to.
|
||||
*/
|
||||
private void addRows(final HStore.Memcache hmc)
|
||||
private void addRows(final Memcache hmc)
|
||||
throws UnsupportedEncodingException {
|
||||
|
||||
for (int i = 0; i < ROW_COUNT; i++) {
|
||||
|
@ -76,7 +76,7 @@ public class TestHMemcache extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void runSnapshot(final HStore.Memcache hmc) {
|
||||
private void runSnapshot(final Memcache hmc) {
|
||||
// Save off old state.
|
||||
int oldHistorySize = hmc.snapshot.size();
|
||||
hmc.snapshot();
|
||||
|
|
Loading…
Reference in New Issue