HBASE-6569 Extract HStore interface from Store (Jesse Yates)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1373153 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
214c751fb9
commit
3156455462
|
@ -158,7 +158,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
|
|
||||||
public synchronized boolean requestSplit(final HRegion r) {
|
public synchronized boolean requestSplit(final HRegion r) {
|
||||||
// don't split regions that are blocking
|
// don't split regions that are blocking
|
||||||
if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
|
if (shouldSplitRegion() && r.getCompactPriority() >= HStore.PRIORITY_USER) {
|
||||||
byte[] midKey = r.checkSplit();
|
byte[] midKey = r.checkSplit();
|
||||||
if (midKey != null) {
|
if (midKey != null) {
|
||||||
requestSplit(r, midKey);
|
requestSplit(r, midKey);
|
||||||
|
@ -186,19 +186,19 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
|
|
||||||
public synchronized void requestCompaction(final HRegion r,
|
public synchronized void requestCompaction(final HRegion r,
|
||||||
final String why) throws IOException {
|
final String why) throws IOException {
|
||||||
for(Store s : r.getStores().values()) {
|
for (HStore s : r.getStores().values()) {
|
||||||
requestCompaction(r, s, why, Store.NO_PRIORITY);
|
requestCompaction(r, s, why, HStore.NO_PRIORITY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void requestCompaction(final HRegion r, final Store s,
|
public synchronized void requestCompaction(final HRegion r, final HStore s,
|
||||||
final String why) throws IOException {
|
final String why) throws IOException {
|
||||||
requestCompaction(r, s, why, Store.NO_PRIORITY);
|
requestCompaction(r, s, why, HStore.NO_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void requestCompaction(final HRegion r, final String why,
|
public synchronized void requestCompaction(final HRegion r, final String why,
|
||||||
int p) throws IOException {
|
int p) throws IOException {
|
||||||
for(Store s : r.getStores().values()) {
|
for (HStore s : r.getStores().values()) {
|
||||||
requestCompaction(r, s, why, p);
|
requestCompaction(r, s, why, p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -209,7 +209,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
* @param why Why compaction requested -- used in debug messages
|
* @param why Why compaction requested -- used in debug messages
|
||||||
* @param priority override the default priority (NO_PRIORITY == decide)
|
* @param priority override the default priority (NO_PRIORITY == decide)
|
||||||
*/
|
*/
|
||||||
public synchronized void requestCompaction(final HRegion r, final Store s,
|
public synchronized void requestCompaction(final HRegion r, final HStore s,
|
||||||
final String why, int priority) throws IOException {
|
final String why, int priority) throws IOException {
|
||||||
if (this.server.isStopped()) {
|
if (this.server.isStopped()) {
|
||||||
return;
|
return;
|
||||||
|
@ -217,7 +217,7 @@ public class CompactSplitThread implements CompactionRequestor {
|
||||||
CompactionRequest cr = s.requestCompaction(priority);
|
CompactionRequest cr = s.requestCompaction(priority);
|
||||||
if (cr != null) {
|
if (cr != null) {
|
||||||
cr.setServer(server);
|
cr.setServer(server);
|
||||||
if (priority != Store.NO_PRIORITY) {
|
if (priority != HStore.NO_PRIORITY) {
|
||||||
cr.setPriority(priority);
|
cr.setPriority(priority);
|
||||||
}
|
}
|
||||||
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
|
ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
|
||||||
|
|
|
@ -37,7 +37,8 @@ public interface CompactionRequestor {
|
||||||
* @param why Why compaction was requested -- used in debug messages
|
* @param why Why compaction was requested -- used in debug messages
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void requestCompaction(final HRegion r, final Store s, final String why) throws IOException;
|
public void requestCompaction(final HRegion r, final HStore s, final String why)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param r Region to compact
|
* @param r Region to compact
|
||||||
|
@ -54,7 +55,7 @@ public interface CompactionRequestor {
|
||||||
* @param pri Priority of this compaction. minHeap. <=0 is critical
|
* @param pri Priority of this compaction. minHeap. <=0 is critical
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void requestCompaction(final HRegion r, final Store s,
|
public void requestCompaction(final HRegion r, final HStore s,
|
||||||
final String why, int pri) throws IOException;
|
final String why, int pri) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ public class ConstantSizeRegionSplitPolicy extends RegionSplitPolicy {
|
||||||
boolean force = region.shouldForceSplit();
|
boolean force = region.shouldForceSplit();
|
||||||
boolean foundABigStore = false;
|
boolean foundABigStore = false;
|
||||||
|
|
||||||
for (Store store : region.getStores().values()) {
|
for (HStore store : region.getStores().values()) {
|
||||||
// If any of the stores are unable to split (eg they contain reference files)
|
// If any of the stores are unable to split (eg they contain reference files)
|
||||||
// then don't split
|
// then don't split
|
||||||
if ((!store.canSplit())) {
|
if ((!store.canSplit())) {
|
||||||
|
|
|
@ -203,8 +203,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
|
private final AtomicInteger lockIdGenerator = new AtomicInteger(1);
|
||||||
static private Random rand = new Random();
|
static private Random rand = new Random();
|
||||||
|
|
||||||
protected final Map<byte [], Store> stores =
|
protected final Map<byte[], HStore> stores = new ConcurrentSkipListMap<byte[], HStore>(
|
||||||
new ConcurrentSkipListMap<byte [], Store>(Bytes.BYTES_RAWCOMPARATOR);
|
Bytes.BYTES_RAWCOMPARATOR);
|
||||||
|
|
||||||
// Registered region protocol handlers
|
// Registered region protocol handlers
|
||||||
private ClassToInstanceMap<CoprocessorProtocol>
|
private ClassToInstanceMap<CoprocessorProtocol>
|
||||||
|
@ -642,7 +642,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @return True if this region has references.
|
* @return True if this region has references.
|
||||||
*/
|
*/
|
||||||
public boolean hasReferences() {
|
public boolean hasReferences() {
|
||||||
for (Store store : this.stores.values()) {
|
for (HStore store : this.stores.values()) {
|
||||||
for (StoreFile sf : store.getStorefiles()) {
|
for (StoreFile sf : store.getStorefiles()) {
|
||||||
// Found a reference, return.
|
// Found a reference, return.
|
||||||
if (sf.isReference()) return true;
|
if (sf.isReference()) return true;
|
||||||
|
@ -660,7 +660,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
HDFSBlocksDistribution hdfsBlocksDistribution =
|
HDFSBlocksDistribution hdfsBlocksDistribution =
|
||||||
new HDFSBlocksDistribution();
|
new HDFSBlocksDistribution();
|
||||||
synchronized (this.stores) {
|
synchronized (this.stores) {
|
||||||
for (Store store : this.stores.values()) {
|
for (HStore store : this.stores.values()) {
|
||||||
for (StoreFile sf : store.getStorefiles()) {
|
for (StoreFile sf : store.getStorefiles()) {
|
||||||
HDFSBlocksDistribution storeFileBlocksDistribution =
|
HDFSBlocksDistribution storeFileBlocksDistribution =
|
||||||
sf.getHDFSBlockDistribution();
|
sf.getHDFSBlockDistribution();
|
||||||
|
@ -977,7 +977,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
storeCloserThreadPool);
|
storeCloserThreadPool);
|
||||||
|
|
||||||
// close each store in parallel
|
// close each store in parallel
|
||||||
for (final Store store : stores.values()) {
|
for (final HStore store : stores.values()) {
|
||||||
completionService
|
completionService
|
||||||
.submit(new Callable<ImmutableList<StoreFile>>() {
|
.submit(new Callable<ImmutableList<StoreFile>>() {
|
||||||
public ImmutableList<StoreFile> call() throws IOException {
|
public ImmutableList<StoreFile> call() throws IOException {
|
||||||
|
@ -1173,7 +1173,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
/** @return returns size of largest HStore. */
|
/** @return returns size of largest HStore. */
|
||||||
public long getLargestHStoreSize() {
|
public long getLargestHStoreSize() {
|
||||||
long size = 0;
|
long size = 0;
|
||||||
for (Store h: stores.values()) {
|
for (HStore h : stores.values()) {
|
||||||
long storeSize = h.getSize();
|
long storeSize = h.getSize();
|
||||||
if (storeSize > size) {
|
if (storeSize > size) {
|
||||||
size = storeSize;
|
size = storeSize;
|
||||||
|
@ -1205,7 +1205,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
|
|
||||||
void triggerMajorCompaction() {
|
void triggerMajorCompaction() {
|
||||||
for (Store h: stores.values()) {
|
for (HStore h : stores.values()) {
|
||||||
h.triggerMajorCompaction();
|
h.triggerMajorCompaction();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1232,7 +1232,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @throws IOException e
|
* @throws IOException e
|
||||||
*/
|
*/
|
||||||
public void compactStores() throws IOException {
|
public void compactStores() throws IOException {
|
||||||
for(Store s : getStores().values()) {
|
for (HStore s : getStores().values()) {
|
||||||
CompactionRequest cr = s.requestCompaction();
|
CompactionRequest cr = s.requestCompaction();
|
||||||
if(cr != null) {
|
if(cr != null) {
|
||||||
try {
|
try {
|
||||||
|
@ -1500,7 +1500,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
|
wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
|
||||||
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
|
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
|
||||||
|
|
||||||
for (Store s : stores.values()) {
|
for (HStore s : stores.values()) {
|
||||||
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
|
storeFlushers.add(s.getStoreFlusher(completeSequenceId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1658,7 +1658,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
startRegionOperation();
|
startRegionOperation();
|
||||||
this.readRequestsCount.increment();
|
this.readRequestsCount.increment();
|
||||||
try {
|
try {
|
||||||
Store store = getStore(family);
|
HStore store = getStore(family);
|
||||||
// get the closest key. (HStore.getRowKeyAtOrBefore can return null)
|
// get the closest key. (HStore.getRowKeyAtOrBefore can return null)
|
||||||
KeyValue key = store.getRowKeyAtOrBefore(row);
|
KeyValue key = store.getRowKeyAtOrBefore(row);
|
||||||
Result result = null;
|
Result result = null;
|
||||||
|
@ -2662,7 +2662,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
byte[] family = e.getKey();
|
byte[] family = e.getKey();
|
||||||
List<KeyValue> edits = e.getValue();
|
List<KeyValue> edits = e.getValue();
|
||||||
|
|
||||||
Store store = getStore(family);
|
HStore store = getStore(family);
|
||||||
for (KeyValue kv: edits) {
|
for (KeyValue kv: edits) {
|
||||||
kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
|
kv.setMemstoreTS(localizedWriteEntry.getWriteNumber());
|
||||||
size += store.add(kv);
|
size += store.add(kv);
|
||||||
|
@ -2702,7 +2702,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
// Remove those keys from the memstore that matches our
|
// Remove those keys from the memstore that matches our
|
||||||
// key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
|
// key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
|
||||||
// that even the memstoreTS has to match for keys that will be rolleded-back.
|
// that even the memstoreTS has to match for keys that will be rolleded-back.
|
||||||
Store store = getStore(family);
|
HStore store = getStore(family);
|
||||||
for (KeyValue kv: edits) {
|
for (KeyValue kv: edits) {
|
||||||
store.rollback(kv);
|
store.rollback(kv);
|
||||||
kvsRolledback++;
|
kvsRolledback++;
|
||||||
|
@ -2918,7 +2918,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
long editsCount = 0;
|
long editsCount = 0;
|
||||||
long intervalEdits = 0;
|
long intervalEdits = 0;
|
||||||
HLog.Entry entry;
|
HLog.Entry entry;
|
||||||
Store store = null;
|
HStore store = null;
|
||||||
boolean reported_once = false;
|
boolean reported_once = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -3056,7 +3056,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @param kv KeyValue to add.
|
* @param kv KeyValue to add.
|
||||||
* @return True if we should flush.
|
* @return True if we should flush.
|
||||||
*/
|
*/
|
||||||
protected boolean restoreEdit(final Store s, final KeyValue kv) {
|
protected boolean restoreEdit(final HStore s, final KeyValue kv) {
|
||||||
long kvSize = s.add(kv);
|
long kvSize = s.add(kv);
|
||||||
if (this.rsAccounting != null) {
|
if (this.rsAccounting != null) {
|
||||||
rsAccounting.addAndGetRegionReplayEditsSize(this.regionInfo.getRegionName(), kvSize);
|
rsAccounting.addAndGetRegionReplayEditsSize(this.regionInfo.getRegionName(), kvSize);
|
||||||
|
@ -3091,11 +3091,11 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @return Store that goes with the family on passed <code>column</code>.
|
* @return Store that goes with the family on passed <code>column</code>.
|
||||||
* TODO: Make this lookup faster.
|
* TODO: Make this lookup faster.
|
||||||
*/
|
*/
|
||||||
public Store getStore(final byte [] column) {
|
public HStore getStore(final byte[] column) {
|
||||||
return this.stores.get(column);
|
return this.stores.get(column);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<byte[], Store> getStores() {
|
public Map<byte[], HStore> getStores() {
|
||||||
return this.stores;
|
return this.stores;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3111,7 +3111,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
List<String> storeFileNames = new ArrayList<String>();
|
List<String> storeFileNames = new ArrayList<String>();
|
||||||
synchronized(closeLock) {
|
synchronized(closeLock) {
|
||||||
for(byte[] column : columns) {
|
for(byte[] column : columns) {
|
||||||
Store store = this.stores.get(column);
|
HStore store = this.stores.get(column);
|
||||||
if (store == null) {
|
if (store == null) {
|
||||||
throw new IllegalArgumentException("No column family : " +
|
throw new IllegalArgumentException("No column family : " +
|
||||||
new String(column) + " available");
|
new String(column) + " available");
|
||||||
|
@ -3331,7 +3331,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
byte[] familyName = p.getFirst();
|
byte[] familyName = p.getFirst();
|
||||||
String path = p.getSecond();
|
String path = p.getSecond();
|
||||||
|
|
||||||
Store store = getStore(familyName);
|
HStore store = getStore(familyName);
|
||||||
if (store == null) {
|
if (store == null) {
|
||||||
IOException ioe = new DoNotRetryIOException(
|
IOException ioe = new DoNotRetryIOException(
|
||||||
"No such column family " + Bytes.toStringBinary(familyName));
|
"No such column family " + Bytes.toStringBinary(familyName));
|
||||||
|
@ -3373,7 +3373,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
for (Pair<byte[], String> p : familyPaths) {
|
for (Pair<byte[], String> p : familyPaths) {
|
||||||
byte[] familyName = p.getFirst();
|
byte[] familyName = p.getFirst();
|
||||||
String path = p.getSecond();
|
String path = p.getSecond();
|
||||||
Store store = getStore(familyName);
|
HStore store = getStore(familyName);
|
||||||
try {
|
try {
|
||||||
store.bulkLoadHFile(path);
|
store.bulkLoadHFile(path);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -3474,7 +3474,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
|
|
||||||
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
|
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
|
||||||
scan.getFamilyMap().entrySet()) {
|
scan.getFamilyMap().entrySet()) {
|
||||||
Store store = stores.get(entry.getKey());
|
HStore store = stores.get(entry.getKey());
|
||||||
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
|
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
|
||||||
scanners.add(scanner);
|
scanners.add(scanner);
|
||||||
}
|
}
|
||||||
|
@ -4252,7 +4252,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
boolean isMajorCompaction() throws IOException {
|
boolean isMajorCompaction() throws IOException {
|
||||||
for (Store store: this.stores.values()) {
|
for (HStore store : this.stores.values()) {
|
||||||
if (store.isMajorCompaction()) {
|
if (store.isMajorCompaction()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -4638,7 +4638,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
boolean flush = false;
|
boolean flush = false;
|
||||||
WALEdit walEdits = null;
|
WALEdit walEdits = null;
|
||||||
List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
|
List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
|
||||||
Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
|
Map<HStore, List<KeyValue>> tempMemstore = new HashMap<HStore, List<KeyValue>>();
|
||||||
long before = EnvironmentEdgeManager.currentTimeMillis();
|
long before = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
long size = 0;
|
long size = 0;
|
||||||
long txid = 0;
|
long txid = 0;
|
||||||
|
@ -4655,7 +4655,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
|
for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
|
||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
|
|
||||||
Store store = stores.get(family.getKey());
|
HStore store = stores.get(family.getKey());
|
||||||
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
|
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
|
||||||
|
|
||||||
// Get previous values for all columns in this family
|
// Get previous values for all columns in this family
|
||||||
|
@ -4738,8 +4738,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
|
|
||||||
//Actually write to Memstore now
|
//Actually write to Memstore now
|
||||||
for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
|
for (Map.Entry<HStore, List<KeyValue>> entry : tempMemstore.entrySet()) {
|
||||||
Store store = entry.getKey();
|
HStore store = entry.getKey();
|
||||||
size += store.upsert(entry.getValue());
|
size += store.upsert(entry.getValue());
|
||||||
allKVs.addAll(entry.getValue());
|
allKVs.addAll(entry.getValue());
|
||||||
}
|
}
|
||||||
|
@ -4791,7 +4791,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
boolean flush = false;
|
boolean flush = false;
|
||||||
WALEdit walEdits = null;
|
WALEdit walEdits = null;
|
||||||
List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
|
List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
|
||||||
Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
|
Map<HStore, List<KeyValue>> tempMemstore = new HashMap<HStore, List<KeyValue>>();
|
||||||
long before = EnvironmentEdgeManager.currentTimeMillis();
|
long before = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
long size = 0;
|
long size = 0;
|
||||||
long txid = 0;
|
long txid = 0;
|
||||||
|
@ -4808,7 +4808,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
|
for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
|
||||||
increment.getFamilyMap().entrySet()) {
|
increment.getFamilyMap().entrySet()) {
|
||||||
|
|
||||||
Store store = stores.get(family.getKey());
|
HStore store = stores.get(family.getKey());
|
||||||
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
|
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
|
||||||
|
|
||||||
// Get previous values for all columns in this family
|
// Get previous values for all columns in this family
|
||||||
|
@ -4860,8 +4860,8 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
}
|
}
|
||||||
|
|
||||||
//Actually write to Memstore now
|
//Actually write to Memstore now
|
||||||
for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
|
for (Map.Entry<HStore, List<KeyValue>> entry : tempMemstore.entrySet()) {
|
||||||
Store store = entry.getKey();
|
HStore store = entry.getKey();
|
||||||
size += store.upsert(entry.getValue());
|
size += store.upsert(entry.getValue());
|
||||||
allKVs.addAll(entry.getValue());
|
allKVs.addAll(entry.getValue());
|
||||||
}
|
}
|
||||||
|
@ -4918,7 +4918,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
Integer lid = obtainRowLock(row);
|
Integer lid = obtainRowLock(row);
|
||||||
this.updatesLock.readLock().lock();
|
this.updatesLock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
Store store = stores.get(family);
|
HStore store = stores.get(family);
|
||||||
|
|
||||||
// Get the old value:
|
// Get the old value:
|
||||||
Get get = new Get(row);
|
Get get = new Get(row);
|
||||||
|
@ -5029,7 +5029,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
@Override
|
@Override
|
||||||
public long heapSize() {
|
public long heapSize() {
|
||||||
long heapSize = DEEP_OVERHEAD;
|
long heapSize = DEEP_OVERHEAD;
|
||||||
for(Store store : this.stores.values()) {
|
for (HStore store : this.stores.values()) {
|
||||||
heapSize += store.heapSize();
|
heapSize += store.heapSize();
|
||||||
}
|
}
|
||||||
// this does not take into account row locks, recent flushes, mvcc entries
|
// this does not take into account row locks, recent flushes, mvcc entries
|
||||||
|
@ -5274,7 +5274,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
*/
|
*/
|
||||||
public int getCompactPriority() {
|
public int getCompactPriority() {
|
||||||
int count = Integer.MAX_VALUE;
|
int count = Integer.MAX_VALUE;
|
||||||
for(Store store : stores.values()) {
|
for (HStore store : stores.values()) {
|
||||||
count = Math.min(count, store.getCompactPriority());
|
count = Math.min(count, store.getCompactPriority());
|
||||||
}
|
}
|
||||||
return count;
|
return count;
|
||||||
|
@ -5286,7 +5286,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||||
* @return true if any store has too many store files
|
* @return true if any store has too many store files
|
||||||
*/
|
*/
|
||||||
public boolean needsCompaction() {
|
public boolean needsCompaction() {
|
||||||
for(Store store : stores.values()) {
|
for (HStore store : stores.values()) {
|
||||||
if(store.needsCompaction()) {
|
if(store.needsCompaction()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1142,7 +1142,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
long currentCompactedKVs = 0;
|
long currentCompactedKVs = 0;
|
||||||
synchronized (r.stores) {
|
synchronized (r.stores) {
|
||||||
stores += r.stores.size();
|
stores += r.stores.size();
|
||||||
for (Store store : r.stores.values()) {
|
for (HStore store : r.stores.values()) {
|
||||||
storefiles += store.getStorefilesCount();
|
storefiles += store.getStorefilesCount();
|
||||||
storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
|
storeUncompressedSizeMB += (int) (store.getStoreSizeUncompressed()
|
||||||
/ 1024 / 1024);
|
/ 1024 / 1024);
|
||||||
|
@ -1228,7 +1228,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
for (HRegion r : this.instance.onlineRegions.values()) {
|
for (HRegion r : this.instance.onlineRegions.values()) {
|
||||||
if (r == null)
|
if (r == null)
|
||||||
continue;
|
continue;
|
||||||
for (Store s : r.getStores().values()) {
|
for (HStore s : r.getStores().values()) {
|
||||||
try {
|
try {
|
||||||
if (s.needsCompaction()) {
|
if (s.needsCompaction()) {
|
||||||
// Queue a compaction. Will recognize if major is needed.
|
// Queue a compaction. Will recognize if major is needed.
|
||||||
|
@ -1369,8 +1369,8 @@ public class HRegionServer implements ClientProtocol,
|
||||||
writeRequestsCount += r.writeRequestsCount.get();
|
writeRequestsCount += r.writeRequestsCount.get();
|
||||||
synchronized (r.stores) {
|
synchronized (r.stores) {
|
||||||
stores += r.stores.size();
|
stores += r.stores.size();
|
||||||
for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
|
for (Map.Entry<byte[], HStore> ee : r.stores.entrySet()) {
|
||||||
final Store store = ee.getValue();
|
final HStore store = ee.getValue();
|
||||||
final SchemaMetrics schemaMetrics = store.getSchemaMetrics();
|
final SchemaMetrics schemaMetrics = store.getSchemaMetrics();
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -1644,7 +1644,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString() +
|
LOG.info("Post open deploy tasks for region=" + r.getRegionNameAsString() +
|
||||||
", daughter=" + daughter);
|
", daughter=" + daughter);
|
||||||
// Do checks to see if we need to compact (references or too many files)
|
// Do checks to see if we need to compact (references or too many files)
|
||||||
for (Store s : r.getStores().values()) {
|
for (HStore s : r.getStores().values()) {
|
||||||
if (s.hasReferences() || s.needsCompaction()) {
|
if (s.hasReferences() || s.needsCompaction()) {
|
||||||
getCompactionRequester().requestCompaction(r, s, "Opening Region");
|
getCompactionRequester().requestCompaction(r, s, "Opening Region");
|
||||||
}
|
}
|
||||||
|
@ -2009,7 +2009,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
int storefileIndexSizeMB = 0;
|
int storefileIndexSizeMB = 0;
|
||||||
synchronized (r.stores) {
|
synchronized (r.stores) {
|
||||||
stores += r.stores.size();
|
stores += r.stores.size();
|
||||||
for (Store store : r.stores.values()) {
|
for (HStore store : r.stores.values()) {
|
||||||
storefiles += store.getStorefilesCount();
|
storefiles += store.getStorefilesCount();
|
||||||
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
|
storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
|
||||||
storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
|
storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
|
||||||
|
@ -3590,7 +3590,7 @@ public class HRegionServer implements ClientProtocol,
|
||||||
region.getRegionNameAsString());
|
region.getRegionNameAsString());
|
||||||
compactSplitThread.requestCompaction(region,
|
compactSplitThread.requestCompaction(region,
|
||||||
"User-triggered " + (major ? "major " : "") + "compaction",
|
"User-triggered " + (major ? "major " : "") + "compaction",
|
||||||
Store.PRIORITY_USER);
|
HStore.PRIORITY_USER);
|
||||||
return CompactRegionResponse.newBuilder().build();
|
return CompactRegionResponse.newBuilder().build();
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
throw new ServiceException(ie);
|
throw new ServiceException(ie);
|
||||||
|
|
|
@ -0,0 +1,287 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NavigableSet;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or
|
||||||
|
* more StoreFiles, which stretch backwards over time.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public interface HStore extends SchemaAware, HeapSize {
|
||||||
|
|
||||||
|
/* The default priority for user-specified compaction requests.
|
||||||
|
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
|
||||||
|
*/
|
||||||
|
public static final int PRIORITY_USER = 1;
|
||||||
|
public static final int NO_PRIORITY = Integer.MIN_VALUE;
|
||||||
|
|
||||||
|
// General Accessors
|
||||||
|
public KeyValue.KVComparator getComparator();
|
||||||
|
|
||||||
|
public List<StoreFile> getStorefiles();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all the readers We don't need to worry about subsequent requests because the HRegion
|
||||||
|
* holds a write lock that will prevent any more reads or writes.
|
||||||
|
* @return the {@link StoreFile StoreFiles} that were previously being used.
|
||||||
|
* @throws IOException on failure
|
||||||
|
*/
|
||||||
|
public ImmutableList<StoreFile> close() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a scanner for both the memstore and the HStore files. Assumes we are not in a
|
||||||
|
* compaction.
|
||||||
|
* @param scan Scan to apply when scanning the stores
|
||||||
|
* @param targetCols columns to scan
|
||||||
|
* @return a scanner over the current key values
|
||||||
|
* @throws IOException on failure
|
||||||
|
*/
|
||||||
|
public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the value for the given row/family/qualifier. This function will always be seen as
|
||||||
|
* atomic by other readers because it only puts a single KV to memstore. Thus no read/write
|
||||||
|
* control necessary.
|
||||||
|
* @param row row to update
|
||||||
|
* @param f family to update
|
||||||
|
* @param qualifier qualifier to update
|
||||||
|
* @param newValue the new value to set into memstore
|
||||||
|
* @return memstore size delta
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public long updateColumnValue(byte[] row, byte[] f, byte[] qualifier, long newValue)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds or replaces the specified KeyValues.
|
||||||
|
* <p>
|
||||||
|
* For each KeyValue specified, if a cell with the same row, family, and qualifier exists in
|
||||||
|
* MemStore, it will be replaced. Otherwise, it will just be inserted to MemStore.
|
||||||
|
* <p>
|
||||||
|
* This operation is atomic on each KeyValue (row/family/qualifier) but not necessarily atomic
|
||||||
|
* across all of them.
|
||||||
|
* @param kvs
|
||||||
|
* @return memstore size delta
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public long upsert(Iterable<KeyValue> kvs) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a value to the memstore
|
||||||
|
* @param kv
|
||||||
|
* @return memstore size delta
|
||||||
|
*/
|
||||||
|
public long add(KeyValue kv);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the
|
||||||
|
* key & memstoreTS value of the kv parameter.
|
||||||
|
* @param kv
|
||||||
|
*/
|
||||||
|
public void rollback(final KeyValue kv);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. WARNING:
|
||||||
|
* Only use this method on a table where writes occur with strictly increasing timestamps. This
|
||||||
|
* method assumes this pattern of writes in order to make it reasonably performant. Also our
|
||||||
|
* search is dependent on the axiom that deletes are for cells that are in the container that
|
||||||
|
* follows whether a memstore snapshot or a storefile, not for the current container: i.e. we'll
|
||||||
|
* see deletes before we come across cells we are to delete. Presumption is that the
|
||||||
|
* memstore#kvset is processed before memstore#snapshot and so on.
|
||||||
|
* @param row The row key of the targeted row.
|
||||||
|
* @return Found keyvalue or null if none found.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException;
|
||||||
|
|
||||||
|
// Compaction oriented methods
|
||||||
|
|
||||||
|
public boolean throttleCompaction(long compactionSize);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* getter for CompactionProgress object
|
||||||
|
* @return CompactionProgress object; can be null
|
||||||
|
*/
|
||||||
|
public CompactionProgress getCompactionProgress();
|
||||||
|
|
||||||
|
public CompactionRequest requestCompaction() throws IOException;
|
||||||
|
|
||||||
|
public CompactionRequest requestCompaction(int priority) throws IOException;
|
||||||
|
|
||||||
|
public void finishRequest(CompactionRequest cr);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return true if we should run a major compaction.
|
||||||
|
*/
|
||||||
|
public boolean isMajorCompaction() throws IOException;
|
||||||
|
|
||||||
|
public void triggerMajorCompaction();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See if there's too much store files in this store
|
||||||
|
* @return true if number of store files is greater than the number defined in minFilesToCompact
|
||||||
|
*/
|
||||||
|
public boolean needsCompaction();
|
||||||
|
|
||||||
|
public int getCompactPriority();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param priority priority to check against. When priority is {@link HStore#PRIORITY_USER},
|
||||||
|
* {@link HStore#PRIORITY_USER} is returned.
|
||||||
|
* @return The priority that this store has in the compaction queue.
|
||||||
|
*/
|
||||||
|
public int getCompactPriority(int priority);
|
||||||
|
|
||||||
|
public StoreFlusher getStoreFlusher(long cacheFlushId);
|
||||||
|
|
||||||
|
// Split oriented methods
|
||||||
|
|
||||||
|
public boolean canSplit();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determines if Store should be split
|
||||||
|
* @return byte[] if store should be split, null otherwise.
|
||||||
|
*/
|
||||||
|
public byte[] getSplitPoint();
|
||||||
|
|
||||||
|
// Bulk Load methods
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This throws a WrongRegionException if the HFile does not fit in this region, or an
|
||||||
|
* InvalidHFileException if the HFile is not valid.
|
||||||
|
*/
|
||||||
|
public void assertBulkLoadHFileOk(Path srcPath) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method should only be called from HRegion. It is assumed that the ranges of values in the
|
||||||
|
* HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
|
||||||
|
*/
|
||||||
|
public void bulkLoadHFile(String srcPathStr) throws IOException;
|
||||||
|
|
||||||
|
// General accessors into the state of the store
|
||||||
|
// TODO abstract some of this out into a metrics class
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return <tt>true</tt> if the store has any underlying reference files to older HFiles
|
||||||
|
*/
|
||||||
|
public boolean hasReferences();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The size of this store's memstore, in bytes
|
||||||
|
*/
|
||||||
|
public long getMemStoreSize();
|
||||||
|
|
||||||
|
public HColumnDescriptor getFamily();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The maximum memstoreTS in all store files.
|
||||||
|
*/
|
||||||
|
public long getMaxMemstoreTS();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the data block encoder
|
||||||
|
*/
|
||||||
|
public HFileDataBlockEncoder getDataBlockEncoder();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of files in this store
|
||||||
|
*/
|
||||||
|
public int getNumberOfStoreFiles();
|
||||||
|
|
||||||
|
/** @return aggregate size of all HStores used in the last compaction */
|
||||||
|
public long getLastCompactSize();
|
||||||
|
|
||||||
|
/** @return aggregate size of HStore */
|
||||||
|
public long getSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Count of store files
|
||||||
|
*/
|
||||||
|
public int getStorefilesCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The size of the store files, in bytes, uncompressed.
|
||||||
|
*/
|
||||||
|
public long getStoreSizeUncompressed();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The size of the store files, in bytes.
|
||||||
|
*/
|
||||||
|
public long getStorefilesSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The size of the store file indexes, in bytes.
|
||||||
|
*/
|
||||||
|
public long getStorefilesIndexSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the total size of all index blocks in the data block indexes, including the root level,
|
||||||
|
* intermediate levels, and the leaf level for multi-level indexes, or just the root level for
|
||||||
|
* single-level indexes.
|
||||||
|
* @return the total size of block indexes in the store
|
||||||
|
*/
|
||||||
|
public long getTotalStaticIndexSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the total byte size of all Bloom filter bit arrays. For compound Bloom filters even the
|
||||||
|
* Bloom blocks currently not loaded into the block cache are counted.
|
||||||
|
* @return the total size of all Bloom filters in the store
|
||||||
|
*/
|
||||||
|
public long getTotalStaticBloomSize();
|
||||||
|
|
||||||
|
// Test-helper methods
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compact the most recent N files. Used in testing.
|
||||||
|
* @param N number of files to compact. Must be less than or equal to current number of files.
|
||||||
|
* @throws IOException on failure
|
||||||
|
*/
|
||||||
|
public void compactRecentForTesting(int N) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for tests.
|
||||||
|
* @return cache configuration for this Store.
|
||||||
|
*/
|
||||||
|
public CacheConfig getCacheConfig();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the parent region hosting this store
|
||||||
|
*/
|
||||||
|
public HRegion getHRegion();
|
||||||
|
}
|
|
@ -60,7 +60,7 @@ extends ConstantSizeRegionSplitPolicy {
|
||||||
// Get size to check
|
// Get size to check
|
||||||
long sizeToCheck = getSizeToCheck(tableRegionsCount);
|
long sizeToCheck = getSizeToCheck(tableRegionsCount);
|
||||||
|
|
||||||
for (Store store : region.getStores().values()) {
|
for (HStore store : region.getStores().values()) {
|
||||||
// If any of the stores is unable to split (eg they contain reference files)
|
// If any of the stores is unable to split (eg they contain reference files)
|
||||||
// then don't split
|
// then don't split
|
||||||
if ((!store.canSplit())) {
|
if ((!store.canSplit())) {
|
||||||
|
|
|
@ -522,7 +522,7 @@ public class MemStore implements HeapSize {
|
||||||
* @param kvs
|
* @param kvs
|
||||||
* @return change in memstore size
|
* @return change in memstore size
|
||||||
*/
|
*/
|
||||||
public long upsert(List<KeyValue> kvs) {
|
public long upsert(Iterable<KeyValue> kvs) {
|
||||||
this.lock.readLock().lock();
|
this.lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
long size = 0;
|
long size = 0;
|
||||||
|
|
|
@ -437,7 +437,7 @@ class MemStoreFlusher extends HasThread implements FlushRequester {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isTooManyStoreFiles(HRegion region) {
|
private boolean isTooManyStoreFiles(HRegion region) {
|
||||||
for (Store hstore: region.stores.values()) {
|
for (HStore hstore : region.stores.values()) {
|
||||||
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
|
if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,11 +73,11 @@ public abstract class RegionSplitPolicy extends Configured {
|
||||||
if (explicitSplitPoint != null) {
|
if (explicitSplitPoint != null) {
|
||||||
return explicitSplitPoint;
|
return explicitSplitPoint;
|
||||||
}
|
}
|
||||||
Map<byte[], Store> stores = region.getStores();
|
Map<byte[], HStore> stores = region.getStores();
|
||||||
|
|
||||||
byte[] splitPointFromLargestStore = null;
|
byte[] splitPointFromLargestStore = null;
|
||||||
long largestStoreSize = 0;
|
long largestStoreSize = 0;
|
||||||
for (Store s : stores.values()) {
|
for (HStore s : stores.values()) {
|
||||||
byte[] splitPoint = s.getSplitPoint();
|
byte[] splitPoint = s.getSplitPoint();
|
||||||
long storeSize = s.getSize();
|
long storeSize = s.getSize();
|
||||||
if (splitPoint != null && largestStoreSize < storeSize) {
|
if (splitPoint != null && largestStoreSize < storeSize) {
|
||||||
|
|
|
@ -107,7 +107,7 @@ import com.google.common.collect.Lists;
|
||||||
* not be called directly but by an HRegion manager.
|
* not be called directly but by an HRegion manager.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class Store extends SchemaConfigured implements HeapSize {
|
public class Store extends SchemaConfigured implements HStore {
|
||||||
static final Log LOG = LogFactory.getLog(Store.class);
|
static final Log LOG = LogFactory.getLog(Store.class);
|
||||||
|
|
||||||
protected final MemStore memstore;
|
protected final MemStore memstore;
|
||||||
|
@ -135,12 +135,6 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
private final boolean verifyBulkLoads;
|
private final boolean verifyBulkLoads;
|
||||||
|
|
||||||
/* The default priority for user-specified compaction requests.
|
|
||||||
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
|
|
||||||
*/
|
|
||||||
public static final int PRIORITY_USER = 1;
|
|
||||||
public static final int NO_PRIORITY = Integer.MIN_VALUE;
|
|
||||||
|
|
||||||
// not private for testing
|
// not private for testing
|
||||||
/* package */ScanInfo scanInfo;
|
/* package */ScanInfo scanInfo;
|
||||||
/*
|
/*
|
||||||
|
@ -322,9 +316,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
|
return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return The maximum memstoreTS in all store files.
|
|
||||||
*/
|
|
||||||
public long getMaxMemstoreTS() {
|
public long getMaxMemstoreTS() {
|
||||||
return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
|
return StoreFile.getMaxMemstoreTSInList(this.getStorefiles());
|
||||||
}
|
}
|
||||||
|
@ -349,9 +341,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return homedir;
|
return homedir;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return the data block encoder
|
|
||||||
*/
|
|
||||||
public HFileDataBlockEncoder getDataBlockEncoder() {
|
public HFileDataBlockEncoder getDataBlockEncoder() {
|
||||||
return dataBlockEncoder;
|
return dataBlockEncoder;
|
||||||
}
|
}
|
||||||
|
@ -364,7 +354,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
this.dataBlockEncoder = blockEncoder;
|
this.dataBlockEncoder = blockEncoder;
|
||||||
}
|
}
|
||||||
|
|
||||||
FileStatus [] getStoreFiles() throws IOException {
|
FileStatus[] getStoreFiles() throws IOException {
|
||||||
return FSUtils.listStatus(this.fs, this.homedir, null);
|
return FSUtils.listStatus(this.fs, this.homedir, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,13 +428,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Adds a value to the memstore
|
public long add(final KeyValue kv) {
|
||||||
*
|
|
||||||
* @param kv
|
|
||||||
* @return memstore size delta
|
|
||||||
*/
|
|
||||||
protected long add(final KeyValue kv) {
|
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
return this.memstore.add(kv);
|
return this.memstore.add(kv);
|
||||||
|
@ -468,14 +453,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Removes a kv from the memstore. The KeyValue is removed only
|
public void rollback(final KeyValue kv) {
|
||||||
* if its key & memstoreTS matches the key & memstoreTS value of the
|
|
||||||
* kv parameter.
|
|
||||||
*
|
|
||||||
* @param kv
|
|
||||||
*/
|
|
||||||
protected void rollback(final KeyValue kv) {
|
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
this.memstore.rollback(kv);
|
this.memstore.rollback(kv);
|
||||||
|
@ -487,15 +466,13 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
/**
|
/**
|
||||||
* @return All store files.
|
* @return All store files.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public List<StoreFile> getStorefiles() {
|
public List<StoreFile> getStorefiles() {
|
||||||
return this.storefiles;
|
return this.storefiles;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* This throws a WrongRegionException if the HFile does not fit in this
|
public void assertBulkLoadHFileOk(Path srcPath) throws IOException {
|
||||||
* region, or an InvalidHFileException if the HFile is not valid.
|
|
||||||
*/
|
|
||||||
void assertBulkLoadHFileOk(Path srcPath) throws IOException {
|
|
||||||
HFile.Reader reader = null;
|
HFile.Reader reader = null;
|
||||||
try {
|
try {
|
||||||
LOG.info("Validating hfile at " + srcPath + " for inclusion in "
|
LOG.info("Validating hfile at " + srcPath + " for inclusion in "
|
||||||
|
@ -555,12 +532,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* This method should only be called from HRegion. It is assumed that the
|
public void bulkLoadHFile(String srcPathStr) throws IOException {
|
||||||
* ranges of values in the HFile fit within the stores assigned region.
|
|
||||||
* (assertBulkLoadHFileOk checks this)
|
|
||||||
*/
|
|
||||||
void bulkLoadHFile(String srcPathStr) throws IOException {
|
|
||||||
Path srcPath = new Path(srcPathStr);
|
Path srcPath = new Path(srcPathStr);
|
||||||
|
|
||||||
// Copy the file if it's on another filesystem
|
// Copy the file if it's on another filesystem
|
||||||
|
@ -619,15 +592,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
fs, region.getTmpDir());
|
fs, region.getTmpDir());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Close all the readers
|
public ImmutableList<StoreFile> close() throws IOException {
|
||||||
*
|
|
||||||
* We don't need to worry about subsequent requests because the HRegion holds
|
|
||||||
* a write lock that will prevent any more reads or writes.
|
|
||||||
*
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
ImmutableList<StoreFile> close() throws IOException {
|
|
||||||
this.lock.writeLock().lock();
|
this.lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
ImmutableList<StoreFile> result = storefiles;
|
ImmutableList<StoreFile> result = storefiles;
|
||||||
|
@ -674,8 +640,9 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Snapshot this stores memstore. Call before running
|
* Snapshot this stores memstore. Call before running
|
||||||
* {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
|
* {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} so it has
|
||||||
|
* some work to do.
|
||||||
*/
|
*/
|
||||||
void snapshot() {
|
void snapshot() {
|
||||||
this.memstore.snapshot();
|
this.memstore.snapshot();
|
||||||
|
@ -1066,9 +1033,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return sf;
|
return sf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Compact the most recent N files. Used in testing.
|
|
||||||
*/
|
|
||||||
public void compactRecentForTesting(int N) throws IOException {
|
public void compactRecentForTesting(int N) throws IOException {
|
||||||
List<StoreFile> filesToCompact;
|
List<StoreFile> filesToCompact;
|
||||||
long maxId;
|
long maxId;
|
||||||
|
@ -1117,7 +1082,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean hasReferences() {
|
@Override
|
||||||
|
public boolean hasReferences() {
|
||||||
return hasReferences(this.storefiles);
|
return hasReferences(this.storefiles);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1152,17 +1118,13 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return minTs;
|
return minTs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** getter for CompactionProgress object
|
@Override
|
||||||
* @return CompactionProgress object; can be null
|
|
||||||
*/
|
|
||||||
public CompactionProgress getCompactionProgress() {
|
public CompactionProgress getCompactionProgress() {
|
||||||
return this.compactor.getProgress();
|
return this.compactor.getProgress();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
@Override
|
||||||
* @return True if we should run a major compaction.
|
public boolean isMajorCompaction() throws IOException {
|
||||||
*/
|
|
||||||
boolean isMajorCompaction() throws IOException {
|
|
||||||
for (StoreFile sf : this.storefiles) {
|
for (StoreFile sf : this.storefiles) {
|
||||||
if (sf.getReader() == null) {
|
if (sf.getReader() == null) {
|
||||||
LOG.debug("StoreFile " + sf + " has null Reader");
|
LOG.debug("StoreFile " + sf + " has null Reader");
|
||||||
|
@ -1259,7 +1221,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompactionRequest requestCompaction() throws IOException {
|
public CompactionRequest requestCompaction() throws IOException {
|
||||||
return requestCompaction(NO_PRIORITY);
|
return requestCompaction(HStore.NO_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompactionRequest requestCompaction(int priority) throws IOException {
|
public CompactionRequest requestCompaction(int priority) throws IOException {
|
||||||
|
@ -1350,7 +1312,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
|
CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
|
||||||
return compactSelection(candidates,NO_PRIORITY);
|
return compactSelection(candidates,HStore.NO_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1423,7 +1385,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
// Force a major compaction if this is a user-requested major compaction,
|
// Force a major compaction if this is a user-requested major compaction,
|
||||||
// or if we do not have too many files to compact and this was requested
|
// or if we do not have too many files to compact and this was requested
|
||||||
// as a major compaction
|
// as a major compaction
|
||||||
boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) ||
|
boolean majorcompaction = (forcemajor && priority == HStore.PRIORITY_USER) ||
|
||||||
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
|
(forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
|
||||||
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
|
(compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
|
||||||
);
|
);
|
||||||
|
@ -1515,7 +1477,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
|
if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
|
||||||
LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
|
LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
|
||||||
" files, probably because of a user-requested major compaction");
|
" files, probably because of a user-requested major compaction");
|
||||||
if(priority != PRIORITY_USER) {
|
if(priority != HStore.PRIORITY_USER) {
|
||||||
LOG.error("Compacting more than max files on a non user-requested compaction");
|
LOG.error("Compacting more than max files on a non user-requested compaction");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1663,9 +1625,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
// Accessors.
|
// Accessors.
|
||||||
// (This is the only section that is directly useful!)
|
// (This is the only section that is directly useful!)
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
/**
|
@Override
|
||||||
* @return the number of files in this store
|
|
||||||
*/
|
|
||||||
public int getNumberOfStoreFiles() {
|
public int getNumberOfStoreFiles() {
|
||||||
return this.storefiles.size();
|
return this.storefiles.size();
|
||||||
}
|
}
|
||||||
|
@ -1687,21 +1647,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return key.getTimestamp() < oldestTimestamp;
|
return key.getTimestamp() < oldestTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Find the key that matches <i>row</i> exactly, or the one that immediately
|
public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
|
||||||
* precedes it. WARNING: Only use this method on a table where writes occur
|
|
||||||
* with strictly increasing timestamps. This method assumes this pattern of
|
|
||||||
* writes in order to make it reasonably performant. Also our search is
|
|
||||||
* dependent on the axiom that deletes are for cells that are in the container
|
|
||||||
* that follows whether a memstore snapshot or a storefile, not for the
|
|
||||||
* current container: i.e. we'll see deletes before we come across cells we
|
|
||||||
* are to delete. Presumption is that the memstore#kvset is processed before
|
|
||||||
* memstore#snapshot and so on.
|
|
||||||
* @param row The row key of the targeted row.
|
|
||||||
* @return Found keyvalue or null if none found.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
|
|
||||||
// If minVersions is set, we will not ignore expired KVs.
|
// If minVersions is set, we will not ignore expired KVs.
|
||||||
// As we're only looking for the latest matches, that should be OK.
|
// As we're only looking for the latest matches, that should be OK.
|
||||||
// With minVersions > 0 we guarantee that any KV that has any version
|
// With minVersions > 0 we guarantee that any KV that has any version
|
||||||
|
@ -1855,10 +1802,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
this.lock.readLock().unlock();
|
this.lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Determines if Store should be split
|
@Override
|
||||||
* @return byte[] if store should be split, null otherwise.
|
|
||||||
*/
|
|
||||||
public byte[] getSplitPoint() {
|
public byte[] getSplitPoint() {
|
||||||
this.lock.readLock().lock();
|
this.lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
|
@ -1928,12 +1873,12 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return aggregate size of all HStores used in the last compaction */
|
@Override
|
||||||
public long getLastCompactSize() {
|
public long getLastCompactSize() {
|
||||||
return this.lastCompactSize;
|
return this.lastCompactSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return aggregate size of HStore */
|
@Override
|
||||||
public long getSize() {
|
public long getSize() {
|
||||||
return storeSize;
|
return storeSize;
|
||||||
}
|
}
|
||||||
|
@ -1950,11 +1895,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
// File administration
|
// File administration
|
||||||
//////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Return a scanner for both the memstore and the HStore files. Assumes we
|
|
||||||
* are not in a compaction.
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public KeyValueScanner getScanner(Scan scan,
|
public KeyValueScanner getScanner(Scan scan,
|
||||||
final NavigableSet<byte []> targetCols) throws IOException {
|
final NavigableSet<byte []> targetCols) throws IOException {
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
|
@ -1977,24 +1918,18 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return getColumnFamilyName();
|
return getColumnFamilyName();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return Count of store files
|
public int getStorefilesCount() {
|
||||||
*/
|
|
||||||
int getStorefilesCount() {
|
|
||||||
return this.storefiles.size();
|
return this.storefiles.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return The size of the store files, in bytes, uncompressed.
|
public long getStoreSizeUncompressed() {
|
||||||
*/
|
|
||||||
long getStoreSizeUncompressed() {
|
|
||||||
return this.totalUncompressedBytes;
|
return this.totalUncompressedBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return The size of the store files, in bytes.
|
public long getStorefilesSize() {
|
||||||
*/
|
|
||||||
long getStorefilesSize() {
|
|
||||||
long size = 0;
|
long size = 0;
|
||||||
for (StoreFile s: storefiles) {
|
for (StoreFile s: storefiles) {
|
||||||
StoreFile.Reader r = s.getReader();
|
StoreFile.Reader r = s.getReader();
|
||||||
|
@ -2007,10 +1942,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return The size of the store file indexes, in bytes.
|
public long getStorefilesIndexSize() {
|
||||||
*/
|
|
||||||
long getStorefilesIndexSize() {
|
|
||||||
long size = 0;
|
long size = 0;
|
||||||
for (StoreFile s: storefiles) {
|
for (StoreFile s: storefiles) {
|
||||||
StoreFile.Reader r = s.getReader();
|
StoreFile.Reader r = s.getReader();
|
||||||
|
@ -2023,14 +1956,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Returns the total size of all index blocks in the data block indexes,
|
public long getTotalStaticIndexSize() {
|
||||||
* including the root level, intermediate levels, and the leaf level for
|
|
||||||
* multi-level indexes, or just the root level for single-level indexes.
|
|
||||||
*
|
|
||||||
* @return the total size of block indexes in the store
|
|
||||||
*/
|
|
||||||
long getTotalStaticIndexSize() {
|
|
||||||
long size = 0;
|
long size = 0;
|
||||||
for (StoreFile s : storefiles) {
|
for (StoreFile s : storefiles) {
|
||||||
size += s.getReader().getUncompressedDataIndexSize();
|
size += s.getReader().getUncompressedDataIndexSize();
|
||||||
|
@ -2038,14 +1965,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Returns the total byte size of all Bloom filter bit arrays. For compound
|
public long getTotalStaticBloomSize() {
|
||||||
* Bloom filters even the Bloom blocks currently not loaded into the block
|
|
||||||
* cache are counted.
|
|
||||||
*
|
|
||||||
* @return the total size of all Bloom filters in the store
|
|
||||||
*/
|
|
||||||
long getTotalStaticBloomSize() {
|
|
||||||
long size = 0;
|
long size = 0;
|
||||||
for (StoreFile s : storefiles) {
|
for (StoreFile s : storefiles) {
|
||||||
StoreFile.Reader r = s.getReader();
|
StoreFile.Reader r = s.getReader();
|
||||||
|
@ -2054,31 +1975,27 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return The size of this store's memstore, in bytes
|
public long getMemStoreSize() {
|
||||||
*/
|
|
||||||
long getMemStoreSize() {
|
|
||||||
return this.memstore.heapSize();
|
return this.memstore.heapSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCompactPriority() {
|
public int getCompactPriority() {
|
||||||
return getCompactPriority(NO_PRIORITY);
|
return getCompactPriority(HStore.NO_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* @return The priority that this store should have in the compaction queue
|
|
||||||
* @param priority
|
|
||||||
*/
|
|
||||||
public int getCompactPriority(int priority) {
|
public int getCompactPriority(int priority) {
|
||||||
// If this is a user-requested compaction, leave this at the highest priority
|
// If this is a user-requested compaction, leave this at the highest priority
|
||||||
if(priority == PRIORITY_USER) {
|
if(priority == HStore.PRIORITY_USER) {
|
||||||
return PRIORITY_USER;
|
return HStore.PRIORITY_USER;
|
||||||
} else {
|
} else {
|
||||||
return this.blockingStoreFileCount - this.storefiles.size();
|
return this.blockingStoreFileCount - this.storefiles.size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean throttleCompaction(long compactionSize) {
|
@Override
|
||||||
|
public boolean throttleCompaction(long compactionSize) {
|
||||||
// see HBASE-5867 for discussion on the default
|
// see HBASE-5867 for discussion on the default
|
||||||
long throttlePoint = conf.getLong(
|
long throttlePoint = conf.getLong(
|
||||||
"hbase.regionserver.thread.compaction.throttle",
|
"hbase.regionserver.thread.compaction.throttle",
|
||||||
|
@ -2086,6 +2003,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return compactionSize > throttlePoint;
|
return compactionSize > throttlePoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public HRegion getHRegion() {
|
public HRegion getHRegion() {
|
||||||
return this.region;
|
return this.region;
|
||||||
}
|
}
|
||||||
|
@ -2094,20 +2012,7 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
return this.region.regionInfo;
|
return this.region.regionInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Increments the value for the given row/family/qualifier.
|
|
||||||
*
|
|
||||||
* This function will always be seen as atomic by other readers
|
|
||||||
* because it only puts a single KV to memstore. Thus no
|
|
||||||
* read/write control necessary.
|
|
||||||
*
|
|
||||||
* @param row
|
|
||||||
* @param f
|
|
||||||
* @param qualifier
|
|
||||||
* @param newValue the new value to set into memstore
|
|
||||||
* @return memstore size delta
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public long updateColumnValue(byte [] row, byte [] f,
|
public long updateColumnValue(byte [] row, byte [] f,
|
||||||
byte [] qualifier, long newValue)
|
byte [] qualifier, long newValue)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -2127,21 +2032,8 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Adds or replaces the specified KeyValues.
|
public long upsert(Iterable<KeyValue> kvs) throws IOException {
|
||||||
* <p>
|
|
||||||
* For each KeyValue specified, if a cell with the same row, family, and
|
|
||||||
* qualifier exists in MemStore, it will be replaced. Otherwise, it will just
|
|
||||||
* be inserted to MemStore.
|
|
||||||
* <p>
|
|
||||||
* This operation is atomic on each KeyValue (row/family/qualifier) but not
|
|
||||||
* necessarily atomic across all of them.
|
|
||||||
* @param kvs
|
|
||||||
* @return memstore size delta
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public long upsert(List<KeyValue> kvs)
|
|
||||||
throws IOException {
|
|
||||||
this.lock.readLock().lock();
|
this.lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
// TODO: Make this operation atomic w/ MVCC
|
// TODO: Make this operation atomic w/ MVCC
|
||||||
|
@ -2201,18 +2093,12 @@ public class Store extends SchemaConfigured implements HeapSize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* See if there's too much store files in this store
|
|
||||||
* @return true if number of store files is greater than
|
|
||||||
* the number defined in minFilesToCompact
|
|
||||||
*/
|
|
||||||
public boolean needsCompaction() {
|
public boolean needsCompaction() {
|
||||||
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
|
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Used for tests. Get the cache configuration for this Store.
|
|
||||||
*/
|
|
||||||
public CacheConfig getCacheConfig() {
|
public CacheConfig getCacheConfig() {
|
||||||
return this.cacheConf;
|
return this.cacheConf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
|
||||||
import org.apache.hadoop.hbase.regionserver.CheckedArchivingHFileCleaner;
|
import org.apache.hadoop.hbase.regionserver.CheckedArchivingHFileCleaner;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
@ -177,7 +178,7 @@ public class TestZooKeeperTableArchiveClient {
|
||||||
loadAndCompact(region);
|
loadAndCompact(region);
|
||||||
|
|
||||||
// check that we actually have some store files that were archived
|
// check that we actually have some store files that were archived
|
||||||
Store store = region.getStore(TEST_FAM);
|
HStore store = region.getStore(TEST_FAM);
|
||||||
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
|
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
|
||||||
region, store);
|
region, store);
|
||||||
|
|
||||||
|
@ -337,7 +338,7 @@ public class TestZooKeeperTableArchiveClient {
|
||||||
loadAndCompact(region);
|
loadAndCompact(region);
|
||||||
|
|
||||||
// check that we actually have some store files that were archived
|
// check that we actually have some store files that were archived
|
||||||
Store store = region.getStore(TEST_FAM);
|
HStore store = region.getStore(TEST_FAM);
|
||||||
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
|
Path storeArchiveDir = HFileArchiveTestingUtil.getStoreArchivePath(UTIL.getConfiguration(),
|
||||||
region, store);
|
region, store);
|
||||||
|
|
||||||
|
@ -375,7 +376,7 @@ public class TestZooKeeperTableArchiveClient {
|
||||||
* Compact all the store files in a given region.
|
* Compact all the store files in a given region.
|
||||||
*/
|
*/
|
||||||
private void compactRegion(HRegion region, byte[] family) throws IOException {
|
private void compactRegion(HRegion region, byte[] family) throws IOException {
|
||||||
Store store = region.getStores().get(TEST_FAM);
|
HStore store = region.getStores().get(TEST_FAM);
|
||||||
store.compactRecentForTesting(store.getStorefiles().size());
|
store.compactRecentForTesting(store.getStorefiles().size());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
@ -4593,7 +4594,7 @@ public class TestFromClientSide {
|
||||||
String regionName = table.getRegionLocations().firstKey().getEncodedName();
|
String regionName = table.getRegionLocations().firstKey().getEncodedName();
|
||||||
HRegion region = TEST_UTIL.getRSForFirstRegionInTable(
|
HRegion region = TEST_UTIL.getRSForFirstRegionInTable(
|
||||||
tableName).getFromOnlineRegions(regionName);
|
tableName).getFromOnlineRegions(regionName);
|
||||||
Store store = region.getStores().values().iterator().next();
|
HStore store = region.getStores().values().iterator().next();
|
||||||
CacheConfig cacheConf = store.getCacheConfig();
|
CacheConfig cacheConf = store.getCacheConfig();
|
||||||
cacheConf.setCacheDataOnWrite(true);
|
cacheConf.setCacheDataOnWrite(true);
|
||||||
cacheConf.setEvictOnClose(true);
|
cacheConf.setEvictOnClose(true);
|
||||||
|
@ -4668,7 +4669,7 @@ public class TestFromClientSide {
|
||||||
assertEquals(++expectedBlockMiss, cache.getStats().getMissCount());
|
assertEquals(++expectedBlockMiss, cache.getStats().getMissCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForStoreFileCount(Store store, int count, int timeout)
|
private void waitForStoreFileCount(HStore store, int count, int timeout)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
while (start + timeout > System.currentTimeMillis() &&
|
while (start + timeout > System.currentTimeMillis() &&
|
||||||
|
|
|
@ -123,7 +123,7 @@ public class CompactionTool implements Tool {
|
||||||
// list of files instead of have Store search its home dir.
|
// list of files instead of have Store search its home dir.
|
||||||
return new Store(tmpdir, region, hcd, fs, getConf()) {
|
return new Store(tmpdir, region, hcd, fs, getConf()) {
|
||||||
@Override
|
@Override
|
||||||
FileStatus[] getStoreFiles() throws IOException {
|
public FileStatus[] getStoreFiles() throws IOException {
|
||||||
return this.fs.listStatus(getHomedir());
|
return this.fs.listStatus(getHomedir());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -127,7 +127,7 @@ public class TestAtomicOperation extends HBaseTestCase {
|
||||||
|
|
||||||
assertEquals(value+amount, result);
|
assertEquals(value+amount, result);
|
||||||
|
|
||||||
Store store = region.getStore(fam1);
|
Store store = (Store) region.getStore(fam1);
|
||||||
// ICV removes any extra values floating around in there.
|
// ICV removes any extra values floating around in there.
|
||||||
assertEquals(1, store.memstore.kvset.size());
|
assertEquals(1, store.memstore.kvset.size());
|
||||||
assertTrue(store.memstore.snapshot.isEmpty());
|
assertTrue(store.memstore.snapshot.isEmpty());
|
||||||
|
|
|
@ -170,10 +170,10 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
throws Exception {
|
throws Exception {
|
||||||
Map<Store, HFileDataBlockEncoder> replaceBlockCache =
|
Map<Store, HFileDataBlockEncoder> replaceBlockCache =
|
||||||
new HashMap<Store, HFileDataBlockEncoder>();
|
new HashMap<Store, HFileDataBlockEncoder>();
|
||||||
for (Entry<byte[], Store> pair : r.getStores().entrySet()) {
|
for (Entry<byte[], HStore> pair : r.getStores().entrySet()) {
|
||||||
Store store = pair.getValue();
|
Store store = (Store) pair.getValue();
|
||||||
HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
|
HFileDataBlockEncoder blockEncoder = store.getDataBlockEncoder();
|
||||||
replaceBlockCache.put(pair.getValue(), blockEncoder);
|
replaceBlockCache.put(store, blockEncoder);
|
||||||
final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
|
final DataBlockEncoding inCache = DataBlockEncoding.PREFIX;
|
||||||
final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
|
final DataBlockEncoding onDisk = inCacheOnly ? DataBlockEncoding.NONE :
|
||||||
inCache;
|
inCache;
|
||||||
|
@ -206,7 +206,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
assertEquals(compactionThreshold, result.size());
|
assertEquals(compactionThreshold, result.size());
|
||||||
|
|
||||||
// see if CompactionProgress is in place but null
|
// see if CompactionProgress is in place but null
|
||||||
for (Store store: this.r.stores.values()) {
|
for (HStore store : this.r.stores.values()) {
|
||||||
assertNull(store.getCompactionProgress());
|
assertNull(store.getCompactionProgress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +215,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
|
|
||||||
// see if CompactionProgress has done its thing on at least one store
|
// see if CompactionProgress has done its thing on at least one store
|
||||||
int storeCount = 0;
|
int storeCount = 0;
|
||||||
for (Store store: this.r.stores.values()) {
|
for (HStore store : this.r.stores.values()) {
|
||||||
CompactionProgress progress = store.getCompactionProgress();
|
CompactionProgress progress = store.getCompactionProgress();
|
||||||
if( progress != null ) {
|
if( progress != null ) {
|
||||||
++storeCount;
|
++storeCount;
|
||||||
|
@ -281,7 +281,8 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
// Multiple versions allowed for an entry, so the delete isn't enough
|
// Multiple versions allowed for an entry, so the delete isn't enough
|
||||||
// Lower TTL and expire to ensure that all our entries have been wiped
|
// Lower TTL and expire to ensure that all our entries have been wiped
|
||||||
final int ttl = 1000;
|
final int ttl = 1000;
|
||||||
for (Store store: this.r.stores.values()) {
|
for (HStore hstore : this.r.stores.values()) {
|
||||||
|
Store store = ((Store) hstore);
|
||||||
Store.ScanInfo old = store.scanInfo;
|
Store.ScanInfo old = store.scanInfo;
|
||||||
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
|
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
|
||||||
old.getMinVersions(), old.getMaxVersions(), ttl,
|
old.getMinVersions(), old.getMaxVersions(), ttl,
|
||||||
|
@ -302,7 +303,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
|
conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, delay);
|
||||||
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
|
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
|
||||||
|
|
||||||
Store s = r.getStore(COLUMN_FAMILY);
|
Store s = ((Store) r.getStore(COLUMN_FAMILY));
|
||||||
try {
|
try {
|
||||||
createStoreFile(r);
|
createStoreFile(r);
|
||||||
createStoreFile(r);
|
createStoreFile(r);
|
||||||
|
@ -435,7 +436,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
assertEquals(compactionThreshold, result.size());
|
assertEquals(compactionThreshold, result.size());
|
||||||
|
|
||||||
// do a compaction
|
// do a compaction
|
||||||
Store store2 = this.r.stores.get(fam2);
|
HStore store2 = this.r.stores.get(fam2);
|
||||||
int numFiles1 = store2.getStorefiles().size();
|
int numFiles1 = store2.getStorefiles().size();
|
||||||
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
|
assertTrue("Was expecting to see 4 store files", numFiles1 > compactionThreshold); // > 3
|
||||||
store2.compactRecentForTesting(compactionThreshold); // = 3
|
store2.compactRecentForTesting(compactionThreshold); // = 3
|
||||||
|
@ -512,7 +513,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
spyR.compactStores();
|
spyR.compactStores();
|
||||||
|
|
||||||
// ensure that the compaction stopped, all old files are intact,
|
// ensure that the compaction stopped, all old files are intact,
|
||||||
Store s = r.stores.get(COLUMN_FAMILY);
|
HStore s = r.stores.get(COLUMN_FAMILY);
|
||||||
assertEquals(compactionThreshold, s.getStorefilesCount());
|
assertEquals(compactionThreshold, s.getStorefilesCount());
|
||||||
assertTrue(s.getStorefilesSize() > 15*1000);
|
assertTrue(s.getStorefilesSize() > 15*1000);
|
||||||
// and no new store files persisted past compactStores()
|
// and no new store files persisted past compactStores()
|
||||||
|
@ -536,7 +537,8 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
// Multiple versions allowed for an entry, so the delete isn't enough
|
// Multiple versions allowed for an entry, so the delete isn't enough
|
||||||
// Lower TTL and expire to ensure that all our entries have been wiped
|
// Lower TTL and expire to ensure that all our entries have been wiped
|
||||||
final int ttl = 1000;
|
final int ttl = 1000;
|
||||||
for (Store store: this.r.stores.values()) {
|
for (HStore hstore: this.r.stores.values()) {
|
||||||
|
Store store = (Store)hstore;
|
||||||
Store.ScanInfo old = store.scanInfo;
|
Store.ScanInfo old = store.scanInfo;
|
||||||
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
|
Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
|
||||||
old.getMinVersions(), old.getMaxVersions(), ttl,
|
old.getMinVersions(), old.getMaxVersions(), ttl,
|
||||||
|
@ -583,7 +585,7 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
for (int i = 0; i < nfiles; i++) {
|
for (int i = 0; i < nfiles; i++) {
|
||||||
createStoreFile(r);
|
createStoreFile(r);
|
||||||
}
|
}
|
||||||
Store store = r.getStore(COLUMN_FAMILY);
|
Store store = (Store) r.getStore(COLUMN_FAMILY);
|
||||||
|
|
||||||
List<StoreFile> storeFiles = store.getStorefiles();
|
List<StoreFile> storeFiles = store.getStorefiles();
|
||||||
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
|
long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
|
||||||
|
@ -621,14 +623,14 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
* Test for HBASE-5920 - Test user requested major compactions always occurring
|
* Test for HBASE-5920 - Test user requested major compactions always occurring
|
||||||
*/
|
*/
|
||||||
public void testNonUserMajorCompactionRequest() throws Exception {
|
public void testNonUserMajorCompactionRequest() throws Exception {
|
||||||
Store store = r.getStore(COLUMN_FAMILY);
|
HStore store = r.getStore(COLUMN_FAMILY);
|
||||||
createStoreFile(r);
|
createStoreFile(r);
|
||||||
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
|
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
|
||||||
createStoreFile(r);
|
createStoreFile(r);
|
||||||
}
|
}
|
||||||
store.triggerMajorCompaction();
|
store.triggerMajorCompaction();
|
||||||
|
|
||||||
CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
|
CompactionRequest request = store.requestCompaction(HStore.NO_PRIORITY);
|
||||||
assertNotNull("Expected to receive a compaction request", request);
|
assertNotNull("Expected to receive a compaction request", request);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"System-requested major compaction should not occur if there are too many store files",
|
"System-requested major compaction should not occur if there are too many store files",
|
||||||
|
@ -640,13 +642,13 @@ public class TestCompaction extends HBaseTestCase {
|
||||||
* Test for HBASE-5920
|
* Test for HBASE-5920
|
||||||
*/
|
*/
|
||||||
public void testUserMajorCompactionRequest() throws IOException{
|
public void testUserMajorCompactionRequest() throws IOException{
|
||||||
Store store = r.getStore(COLUMN_FAMILY);
|
HStore store = r.getStore(COLUMN_FAMILY);
|
||||||
createStoreFile(r);
|
createStoreFile(r);
|
||||||
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
|
for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
|
||||||
createStoreFile(r);
|
createStoreFile(r);
|
||||||
}
|
}
|
||||||
store.triggerMajorCompaction();
|
store.triggerMajorCompaction();
|
||||||
CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
|
CompactionRequest request = store.requestCompaction(HStore.PRIORITY_USER);
|
||||||
assertNotNull("Expected to receive a compaction request", request);
|
assertNotNull("Expected to receive a compaction request", request);
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"User-requested major compaction should always occur, even if there are too many store files",
|
"User-requested major compaction should always occur, even if there are too many store files",
|
||||||
|
|
|
@ -236,7 +236,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
MonitoredTask status = TaskMonitor.get().createStatus(method);
|
MonitoredTask status = TaskMonitor.get().createStatus(method);
|
||||||
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
|
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
|
||||||
Bytes.BYTES_COMPARATOR);
|
Bytes.BYTES_COMPARATOR);
|
||||||
for (Store store : region.getStores().values()) {
|
for (HStore store : region.getStores().values()) {
|
||||||
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
||||||
minSeqId - 1);
|
minSeqId - 1);
|
||||||
}
|
}
|
||||||
|
@ -288,7 +288,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
MonitoredTask status = TaskMonitor.get().createStatus(method);
|
MonitoredTask status = TaskMonitor.get().createStatus(method);
|
||||||
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
|
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
|
||||||
Bytes.BYTES_COMPARATOR);
|
Bytes.BYTES_COMPARATOR);
|
||||||
for (Store store : region.getStores().values()) {
|
for (HStore store : region.getStores().values()) {
|
||||||
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
||||||
recoverSeqId - 1);
|
recoverSeqId - 1);
|
||||||
}
|
}
|
||||||
|
@ -336,7 +336,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
|
|
||||||
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
|
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(
|
||||||
Bytes.BYTES_COMPARATOR);
|
Bytes.BYTES_COMPARATOR);
|
||||||
for (Store store : region.getStores().values()) {
|
for (HStore store : region.getStores().values()) {
|
||||||
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
|
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId);
|
||||||
}
|
}
|
||||||
long seqId = region.replayRecoveredEditsIfAny(regiondir,
|
long seqId = region.replayRecoveredEditsIfAny(regiondir,
|
||||||
|
@ -864,7 +864,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
put.add(kv);
|
put.add(kv);
|
||||||
|
|
||||||
//checkAndPut with wrong value
|
//checkAndPut with wrong value
|
||||||
Store store = region.getStore(fam1);
|
Store store = (Store) region.getStore(fam1);
|
||||||
store.memstore.kvset.size();
|
store.memstore.kvset.size();
|
||||||
|
|
||||||
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
|
boolean res = region.checkAndMutate(row1, fam1, qf1, CompareOp.EQUAL,
|
||||||
|
@ -1379,10 +1379,10 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
// extract the key values out the memstore:
|
// extract the key values out the memstore:
|
||||||
// This is kinda hacky, but better than nothing...
|
// This is kinda hacky, but better than nothing...
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
KeyValue firstKv = region.getStore(fam1).memstore.kvset.first();
|
KeyValue firstKv = ((Store) region.getStore(fam1)).memstore.kvset.first();
|
||||||
assertTrue(firstKv.getTimestamp() <= now);
|
assertTrue(firstKv.getTimestamp() <= now);
|
||||||
now = firstKv.getTimestamp();
|
now = firstKv.getTimestamp();
|
||||||
for (KeyValue kv: region.getStore(fam1).memstore.kvset) {
|
for (KeyValue kv : ((Store) region.getStore(fam1)).memstore.kvset) {
|
||||||
assertTrue(kv.getTimestamp() <= now);
|
assertTrue(kv.getTimestamp() <= now);
|
||||||
now = kv.getTimestamp();
|
now = kv.getTimestamp();
|
||||||
}
|
}
|
||||||
|
@ -2320,7 +2320,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
|
|
||||||
assertEquals(value+amount, result);
|
assertEquals(value+amount, result);
|
||||||
|
|
||||||
Store store = region.getStore(fam1);
|
Store store = (Store) region.getStore(fam1);
|
||||||
// ICV removes any extra values floating around in there.
|
// ICV removes any extra values floating around in there.
|
||||||
assertEquals(1, store.memstore.kvset.size());
|
assertEquals(1, store.memstore.kvset.size());
|
||||||
assertTrue(store.memstore.snapshot.isEmpty());
|
assertTrue(store.memstore.snapshot.isEmpty());
|
||||||
|
@ -2346,7 +2346,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
region.put(put);
|
region.put(put);
|
||||||
|
|
||||||
// get the store in question:
|
// get the store in question:
|
||||||
Store s = region.getStore(fam1);
|
Store s = (Store) region.getStore(fam1);
|
||||||
s.snapshot(); //bam
|
s.snapshot(); //bam
|
||||||
|
|
||||||
// now increment:
|
// now increment:
|
||||||
|
@ -2490,7 +2490,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
// flush to disk.
|
// flush to disk.
|
||||||
region.flushcache();
|
region.flushcache();
|
||||||
|
|
||||||
Store store = region.getStore(fam1);
|
Store store = (Store) region.getStore(fam1);
|
||||||
assertEquals(0, store.memstore.kvset.size());
|
assertEquals(0, store.memstore.kvset.size());
|
||||||
|
|
||||||
long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
|
||||||
|
@ -2516,7 +2516,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
region.put(put);
|
region.put(put);
|
||||||
region.flushcache();
|
region.flushcache();
|
||||||
|
|
||||||
Store store = region.getStore(fam1);
|
Store store = (Store) region.getStore(fam1);
|
||||||
assertEquals(0, store.memstore.kvset.size());
|
assertEquals(0, store.memstore.kvset.size());
|
||||||
|
|
||||||
long r = region.incrementColumnValue(row, fam1, qual3, amount, true);
|
long r = region.incrementColumnValue(row, fam1, qual3, amount, true);
|
||||||
|
@ -2562,7 +2562,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
|
|
||||||
assertEquals(value+amount, result);
|
assertEquals(value+amount, result);
|
||||||
|
|
||||||
Store store = region.getStore(fam1);
|
Store store = (Store) region.getStore(fam1);
|
||||||
// ICV should update the existing Put with the same timestamp
|
// ICV should update the existing Put with the same timestamp
|
||||||
assertEquals(1, store.memstore.kvset.size());
|
assertEquals(1, store.memstore.kvset.size());
|
||||||
assertTrue(store.memstore.snapshot.isEmpty());
|
assertTrue(store.memstore.snapshot.isEmpty());
|
||||||
|
@ -2578,7 +2578,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
|
|
||||||
assertEquals(value+amount, result);
|
assertEquals(value+amount, result);
|
||||||
|
|
||||||
store = region.getStore(fam1);
|
store = (Store) region.getStore(fam1);
|
||||||
// ICV should update the existing Put with the same timestamp
|
// ICV should update the existing Put with the same timestamp
|
||||||
assertEquals(2, store.memstore.kvset.size());
|
assertEquals(2, store.memstore.kvset.size());
|
||||||
assertTrue(store.memstore.snapshot.isEmpty());
|
assertTrue(store.memstore.snapshot.isEmpty());
|
||||||
|
@ -3397,7 +3397,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||||
region.flushcache();
|
region.flushcache();
|
||||||
}
|
}
|
||||||
//before compaction
|
//before compaction
|
||||||
Store store = region.getStore(fam1);
|
Store store = (Store) region.getStore(fam1);
|
||||||
List<StoreFile> storeFiles = store.getStorefiles();
|
List<StoreFile> storeFiles = store.getStorefiles();
|
||||||
for (StoreFile storefile : storeFiles) {
|
for (StoreFile storefile : storeFiles) {
|
||||||
StoreFile.Reader reader = storefile.getReader();
|
StoreFile.Reader reader = storefile.getReader();
|
||||||
|
|
|
@ -240,7 +240,7 @@ public class TestRegionServerMetrics {
|
||||||
rs.doMetrics();
|
rs.doMetrics();
|
||||||
for (HRegion r : TEST_UTIL.getMiniHBaseCluster().getRegions(
|
for (HRegion r : TEST_UTIL.getMiniHBaseCluster().getRegions(
|
||||||
Bytes.toBytes(TABLE_NAME))) {
|
Bytes.toBytes(TABLE_NAME))) {
|
||||||
for (Map.Entry<byte[], Store> storeEntry : r.getStores().entrySet()) {
|
for (Map.Entry<byte[], HStore> storeEntry : r.getStores().entrySet()) {
|
||||||
LOG.info("For region " + r.getRegionNameAsString() + ", CF " +
|
LOG.info("For region " + r.getRegionNameAsString() + ", CF " +
|
||||||
Bytes.toStringBinary(storeEntry.getKey()) + " found store files " +
|
Bytes.toStringBinary(storeEntry.getKey()) + " found store files " +
|
||||||
": " + storeEntry.getValue().getStorefiles());
|
": " + storeEntry.getValue().getStorefiles());
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -181,7 +182,7 @@ public class TestWALReplay {
|
||||||
// flush region and make major compaction
|
// flush region and make major compaction
|
||||||
destServer.getOnlineRegion(destRegion.getRegionName()).flushcache();
|
destServer.getOnlineRegion(destRegion.getRegionName()).flushcache();
|
||||||
// wait to complete major compaction
|
// wait to complete major compaction
|
||||||
for (Store store : destServer.getOnlineRegion(destRegion.getRegionName())
|
for (HStore store : destServer.getOnlineRegion(destRegion.getRegionName())
|
||||||
.getStores().values()) {
|
.getStores().values()) {
|
||||||
store.triggerMajorCompaction();
|
store.triggerMajorCompaction();
|
||||||
}
|
}
|
||||||
|
@ -421,7 +422,7 @@ public class TestWALReplay {
|
||||||
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
|
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
|
||||||
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
|
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
|
||||||
@Override
|
@Override
|
||||||
protected boolean restoreEdit(Store s, KeyValue kv) {
|
protected boolean restoreEdit(HStore s, KeyValue kv) {
|
||||||
boolean b = super.restoreEdit(s, kv);
|
boolean b = super.restoreEdit(s, kv);
|
||||||
countOfRestoredEdits.incrementAndGet();
|
countOfRestoredEdits.incrementAndGet();
|
||||||
return b;
|
return b;
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.Store;
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -221,7 +222,7 @@ public class HFileArchiveTestingUtil {
|
||||||
* @param store store that is archiving files
|
* @param store store that is archiving files
|
||||||
* @return {@link Path} to the store archive directory for the given region
|
* @return {@link Path} to the store archive directory for the given region
|
||||||
*/
|
*/
|
||||||
public static Path getStoreArchivePath(Configuration conf, HRegion region, Store store) {
|
public static Path getStoreArchivePath(Configuration conf, HRegion region, HStore store) {
|
||||||
return HFileArchiveUtil.getStoreArchivePath(conf, region, store.getFamily().getName());
|
return HFileArchiveUtil.getStoreArchivePath(conf, region, store.getFamily().getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,7 +234,7 @@ public class HFileArchiveTestingUtil {
|
||||||
HRegion region = servingRegions.get(0);
|
HRegion region = servingRegions.get(0);
|
||||||
|
|
||||||
// check that we actually have some store files that were archived
|
// check that we actually have some store files that were archived
|
||||||
Store store = region.getStore(storeName);
|
HStore store = region.getStore(storeName);
|
||||||
return HFileArchiveTestingUtil.getStoreArchivePath(util.getConfiguration(), region, store);
|
return HFileArchiveTestingUtil.getStoreArchivePath(util.getConfiguration(), region, store);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue