HBASE-15016 Services a Store needs from a Region

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
eshcar 2016-02-24 09:56:25 +02:00 committed by stack
parent 28cd48b673
commit 876a6ab73e
10 changed files with 165 additions and 33 deletions

View File

@ -110,6 +110,8 @@ public class ClassSize {
/** Overhead for CellSkipListSet */
public static final int CELL_SKIPLIST_SET;
public static final int STORE_SERVICES;
/* Are we running on jdk7? */
private static final boolean JDK7;
static {
@ -193,6 +195,8 @@ public class ClassSize {
TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
CELL_SKIPLIST_SET = align(OBJECT + REFERENCE);
STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);
}
/**

View File

@ -162,6 +162,10 @@ public class DefaultMemStore extends AbstractMemStore {
return;
}
@Override
public void finalizeFlush() {
}
/**
* Code to help figure if our approximation of object heap sizes is close
* enough. See hbase-900. Fills memstores then waits so user can heap

View File

@ -511,6 +511,9 @@ public class HMobStore extends HStore {
}
}
@Override public void finalizeFlush() {
}
public void updateCellsCountCompactedToMob(long count) {
cellsCountCompactedToMob += count;
}

View File

@ -17,6 +17,20 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.TextFormat;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -181,20 +195,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.TextFormat;
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@ -258,6 +258,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
private final AtomicLong memstoreSize = new AtomicLong(0);
private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
// Debug possible data loss due to WAL off
final Counter numMutationsWithoutWAL = new Counter();
@ -999,6 +1000,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return false;
}
public void blockUpdates() {
this.updatesLock.writeLock().lock();
}
public void unblockUpdates() {
this.updatesLock.writeLock().unlock();
}
@Override
public HDFSBlocksDistribution getHDFSBlocksDistribution() {
HDFSBlocksDistribution hdfsBlocksDistribution =
@ -1116,6 +1125,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return memstoreSize.get();
}
public RegionServicesForStores getRegionServicesForStores() {
return regionServicesForStores;
}
@Override
public long getNumMutationsWithoutWAL() {
return numMutationsWithoutWAL.get();
@ -2035,7 +2048,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Should the store be flushed because it is old enough.
* <p>
* Every FlushPolicy should call this to determine whether a store is old enough to flush (except
* that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
* that you always flush all stores). Otherwise the method will always
* returns true which will make a lot of flush requests.
*/
boolean shouldFlushStore(Store store) {
@ -2477,6 +2490,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// If we get to here, the HStores have been written.
for(Store storeToFlush :storesToFlush) {
storeToFlush.finalizeFlush();
}
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}
@ -2883,9 +2899,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
long addedSize = doMiniBatchMutate(batchOp);
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
if (isFlushSize(newSize)) {
requestFlush();
}
requestFlushIfNeeded(newSize);
}
} finally {
closeRegionOperation(op);
@ -3762,6 +3776,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
if(memstoreTotalSize > this.getMemstoreFlushSize()) {
requestFlush();
}
}
private void requestFlush() {
if (this.rsServices == null) {
return;
@ -5170,7 +5190,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long c = count.decrementAndGet();
if (c <= 0) {
synchronized (lock) {
if (count.get() <= 0 ){
if (count.get() <= 0){
usable.set(false);
RowLockContext removed = lockedRows.remove(row);
assert removed == this: "we should never remove a different context";
@ -5978,7 +5998,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected boolean isStopRow(Cell currentRowCell) {
return currentRowCell == null
|| (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan);
|| (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow
.length) >= isScan);
}
@Override
@ -6860,8 +6881,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
processor.postProcess(this, walEdit, success);
} finally {
closeRegionOperation();
if (!mutations.isEmpty() && isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
requestFlush();
if (!mutations.isEmpty()) {
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
requestFlushIfNeeded(newSize);
}
}
}
@ -7290,7 +7312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Do a specific Get on passed <code>columnFamily</code> and column qualifiers.
* @param mutation Mutation we are doing this Get for.
* @param columnFamily Which column family on row (TODO: Go all Gets in one go)
* @param store Which column family on row (TODO: Go all Gets in one go)
* @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
* @return Return list of Cells found.
*/
@ -7340,7 +7362,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
46 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@ -7365,6 +7387,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
+ ClassSize.TREEMAP // maxSeqIdInStores
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
+ ClassSize.STORE_SERVICES // store services
;
@Override
@ -7847,4 +7870,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public long getMemstoreFlushSize() {
return this.memstoreFlushSize;
}
//// method for debugging tests
void throwException(String title, String regionName) {
StringBuffer buf = new StringBuffer();
buf.append(title + ", ");
buf.append(getRegionInfo().toString());
buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " ");
buf.append(getRegionInfo().isMetaTable() ? " meta table " : " ");
buf.append("stores: ");
for (Store s : getStores()) {
buf.append(s.getFamily().getNameAsString());
buf.append(" size: ");
buf.append(s.getMemStoreSize());
buf.append(" ");
}
buf.append("end-of-stores");
buf.append(", memstore size ");
buf.append(getMemstoreSize());
if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) {
throw new RuntimeException(buf.toString());
}
}
}

View File

@ -2448,6 +2448,10 @@ public class HStore implements Store {
}
}
@Override public void finalizeFlush() {
memstore.finalizeFlush();
}
private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");

View File

@ -144,4 +144,11 @@ public interface MemStore extends HeapSize {
* @return Total memory occupied by this MemStore.
*/
long size();
/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the wal with sequence number that is known only at the store level.
*/
void finalizeFlush();
}

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@ -49,11 +53,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
/**
* Regions store data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more Regions.
@ -200,6 +199,9 @@ public interface Region extends ConfigurationObserver {
/** @return memstore size for this region, in bytes */
long getMemstoreSize();
/** @return store services for this region, to access services required by store level needs */
RegionServicesForStores getRegionServicesForStores();
/** @return the number of mutations processed bypassing the WAL */
long getNumMutationsWithoutWAL();

View File

@ -0,0 +1,53 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Services a Store needs from a Region.
* RegionServicesForStores class is the interface through which memstore access services at the
* region level.
* For example, when using alternative memory formats or due to compaction the memstore needs to
* take occasional lock and update size counters at the region level.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RegionServicesForStores {
private final HRegion region;
public RegionServicesForStores(HRegion region) {
this.region = region;
}
public void blockUpdates() {
this.region.blockUpdates();
}
public void unblockUpdates() {
this.region.unblockUpdates();
}
public long addAndGetGlobalMemstoreSize(long size) {
return this.region.addAndGetGlobalMemstoreSize(size);
}
}

View File

@ -515,4 +515,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* Closes and archives the compacted files under this store
*/
void closeAndArchiveCompactedFiles() throws IOException;
/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the wal with sequence number that is known only at the store level.
*/
void finalizeFlush();
}

View File

@ -17,14 +17,12 @@
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -54,7 +52,8 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions
@ -206,6 +205,9 @@ public class TestIOFencing {
}
super.completeCompaction(compactedFiles);
}
@Override public void finalizeFlush() {
}
}
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();