HBASE-11202 Cleanup on HRegion class
This commit is contained in:
parent
220037c465
commit
7ee058bc20
|
@ -217,7 +217,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
|
||||
* file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
|
||||
* Its default value is {@link HLog.NO_SEQUENCE_ID}. This default is used as a marker to indicate
|
||||
* Its default value is -1L. This default is used as a marker to indicate
|
||||
* that the region hasn't opened yet. Once it is opened, it is set to the derived
|
||||
* {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
|
||||
*
|
||||
|
@ -481,7 +481,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
private volatile long lastFlushTime;
|
||||
final RegionServerServices rsServices;
|
||||
private RegionServerAccounting rsAccounting;
|
||||
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
|
||||
private long flushCheckInterval;
|
||||
// flushPerChanges is to prevent too many changes in memstore
|
||||
private long flushPerChanges;
|
||||
|
@ -910,7 +909,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Increase the size of mem store in this region and the size of global mem
|
||||
* store
|
||||
* @param memStoreSize
|
||||
* @return the size of memstore in this region
|
||||
*/
|
||||
public long addAndGetGlobalMemstoreSize(long memStoreSize) {
|
||||
|
@ -961,7 +959,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Reset recovering state of current region
|
||||
* @param newState
|
||||
*/
|
||||
public void setRecovering(boolean newState) {
|
||||
boolean wasRecovering = this.isRecovering;
|
||||
|
@ -1182,7 +1179,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// close each store in parallel
|
||||
for (final Store store : stores.values()) {
|
||||
assert abort? true: store.getFlushableSize() == 0;
|
||||
assert abort || store.getFlushableSize() == 0;
|
||||
completionService
|
||||
.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
|
||||
@Override
|
||||
|
@ -1646,7 +1643,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* than the last edit applied to this region. The returned id does not refer to an actual edit.
|
||||
* The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
|
||||
* that was the result of this flush, etc.
|
||||
* @param status
|
||||
* @return object describing the flush's state
|
||||
*
|
||||
* @throws IOException general io exceptions
|
||||
|
@ -1661,7 +1657,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* @param wal Null if we're NOT to go via hlog/wal.
|
||||
* @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
|
||||
* @param status
|
||||
* @return object describing the flush's state
|
||||
* @throws IOException
|
||||
* @see #internalFlushcache(MonitoredTask)
|
||||
|
@ -1671,7 +1666,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
throws IOException {
|
||||
if (this.rsServices != null && this.rsServices.isAborted()) {
|
||||
// Don't flush when server aborting, it's unsafe
|
||||
throw new IOException("Aborting flush because server is abortted...");
|
||||
throw new IOException("Aborting flush because server is aborted...");
|
||||
}
|
||||
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
// If nothing to flush, return, but we need to safely update the region sequence id
|
||||
|
@ -1753,7 +1748,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.updatesLock.writeLock().unlock();
|
||||
}
|
||||
String s = "Finished memstore snapshotting " + this +
|
||||
", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize;
|
||||
", syncing WAL and waiting on mvcc, flushSize=" + totalFlushableSize;
|
||||
status.setStatus(s);
|
||||
if (LOG.isTraceEnabled()) LOG.trace(s);
|
||||
|
||||
|
@ -1844,7 +1839,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
((wal == null)? "; wal=null": "");
|
||||
LOG.info(msg);
|
||||
status.setStatus(msg);
|
||||
this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
|
||||
|
||||
return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
|
||||
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
|
||||
|
@ -1852,14 +1846,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Method to safely get the next sequence number.
|
||||
* @param wal
|
||||
* @param now
|
||||
* @return Next sequence number unassociated with any actual edit.
|
||||
* @throws IOException
|
||||
*/
|
||||
private long getNextSequenceId(final HLog wal, final long now) throws IOException {
|
||||
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable());
|
||||
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
|
||||
// Call append but with an empty WALEdit. The returned sequence id will not be associated
|
||||
// with any edit and we can be sure it went in after all outstanding appends.
|
||||
wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
|
||||
WALEdit.EMPTY_WALEDIT, this.sequenceId, false);
|
||||
|
@ -1885,7 +1877,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Return all the data for the row that matches <i>row</i> exactly,
|
||||
* or the one that immediately preceeds it, at or immediately before
|
||||
* or the one that immediately precedes it, at or immediately before
|
||||
* <i>ts</i>.
|
||||
*
|
||||
* @param row row key
|
||||
|
@ -1912,7 +1904,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
KeyValue key = store.getRowKeyAtOrBefore(row);
|
||||
Result result = null;
|
||||
if (key != null) {
|
||||
Get get = new Get(key.getRow());
|
||||
Get get = new Get(CellUtil.cloneRow(key));
|
||||
get.addFamily(family);
|
||||
result = get(get);
|
||||
}
|
||||
|
@ -2025,7 +2017,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* This is used only by unit tests. Not required to be a public API.
|
||||
* @param familyMap map of family to edits for the given family.
|
||||
* @param durability
|
||||
* @throws IOException
|
||||
*/
|
||||
void delete(NavigableMap<byte[], List<Cell>> familyMap,
|
||||
|
@ -2039,8 +2030,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Setup correct timestamps in the KVs in Delete object.
|
||||
* Caller should have the row and region locks.
|
||||
* @param familyMap
|
||||
* @param byteNow
|
||||
* @throws IOException
|
||||
*/
|
||||
void prepareDeleteTimestamps(Map<byte[], List<Cell>> familyMap, byte[] byteNow)
|
||||
|
@ -2056,7 +2045,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Check if time is LATEST, change to time of most recent addition if so
|
||||
// This is expensive.
|
||||
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
|
||||
byte[] qual = kv.getQualifier();
|
||||
byte[] qual = CellUtil.cloneQualifier(kv);
|
||||
if (qual == null) qual = HConstants.EMPTY_BYTE_ARRAY;
|
||||
|
||||
Integer count = kvCount.get(qual);
|
||||
|
@ -2067,7 +2056,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
count = kvCount.get(qual);
|
||||
|
||||
Get get = new Get(kv.getRow());
|
||||
Get get = new Get(CellUtil.cloneRow(kv));
|
||||
get.setMaxVersions(count);
|
||||
get.addColumn(family, qual);
|
||||
|
||||
|
@ -2092,7 +2081,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* @param put
|
||||
* @throws IOException
|
||||
*/
|
||||
public void put(Put put)
|
||||
|
@ -2694,13 +2682,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
//get, put, unlockRow or something
|
||||
/**
|
||||
*
|
||||
* @param row
|
||||
* @param family
|
||||
* @param qualifier
|
||||
* @param compareOp
|
||||
* @param comparator
|
||||
* @param w
|
||||
* @param writeToWAL
|
||||
* @throws IOException
|
||||
* @return true if the new put was executed, false otherwise
|
||||
*/
|
||||
|
@ -2774,7 +2755,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (matches) {
|
||||
// All edits for the given row (across all column families) must
|
||||
// happen atomically.
|
||||
doBatchMutate((Mutation)w);
|
||||
doBatchMutate(w);
|
||||
this.checkAndMutateChecksPassed.increment();
|
||||
return true;
|
||||
}
|
||||
|
@ -2807,8 +2788,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* arg. (In the future other cancellable HRegion methods could eventually add a
|
||||
* {@link ForeignExceptionSnare}, or we could do something fancier).
|
||||
*
|
||||
* @param desc snasphot description object
|
||||
* @param exnSnare ForeignExceptionSnare that captures external exeptions in case we need to
|
||||
* @param desc snapshot description object
|
||||
* @param exnSnare ForeignExceptionSnare that captures external exceptions in case we need to
|
||||
* bail out. This is allowed to be null and will just be ignored in that case.
|
||||
* @throws IOException if there is an external or internal error causing the snapshot to fail
|
||||
*/
|
||||
|
@ -2871,9 +2852,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Add updates first to the hlog and then add values to memstore.
|
||||
* Warning: Assumption is caller has lock on passed in row.
|
||||
* @param family
|
||||
* @param edits Cell updates by column
|
||||
* @praram now
|
||||
* @throws IOException
|
||||
*/
|
||||
private void put(final byte [] row, byte [] family, List<Cell> edits)
|
||||
|
@ -2932,7 +2911,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Remove all the keys listed in the map from the memstore. This method is
|
||||
* called when a Put/Delete has updated memstore but subequently fails to update
|
||||
* called when a Put/Delete has updated memstore but subsequently fails to update
|
||||
* the wal. This method is then invoked to rollback the memstore.
|
||||
*/
|
||||
private void rollbackMemstore(BatchOperationInProgress<?> batchOp,
|
||||
|
@ -2954,7 +2933,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// Remove those keys from the memstore that matches our
|
||||
// key's (row, cf, cq, timestamp, memstoreTS). The interesting part is
|
||||
// that even the memstoreTS has to match for keys that will be rolleded-back.
|
||||
// that even the memstoreTS has to match for keys that will be rolled-back.
|
||||
Store store = getStore(family);
|
||||
for (Cell cell: cells) {
|
||||
store.rollback(KeyValueUtil.ensureKeyValue(cell));
|
||||
|
@ -3086,10 +3065,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* next time we go to recover. So, we have to flush inline, using seqids that
|
||||
* make sense in a this single region context only -- until we online.
|
||||
*
|
||||
* @param regiondir
|
||||
* @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of
|
||||
* the maxSeqId for the store to be applied, else its skipped.
|
||||
* @param reporter
|
||||
* @return the sequence id of the last edit added to this region out of the
|
||||
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
|
||||
* @throws UnsupportedEncodingException
|
||||
|
@ -3283,7 +3260,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
// Figure which store the edit is meant for.
|
||||
if (store == null || !CellUtil.matchingFamily(kv, store.getFamily().getName())) {
|
||||
store = this.stores.get(kv.getFamily());
|
||||
store = getStore(kv);
|
||||
}
|
||||
if (store == null) {
|
||||
// This should never happen. Perhaps schema was changed between
|
||||
|
@ -3313,7 +3290,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
} catch (EOFException eof) {
|
||||
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
|
||||
msg = "Encountered EOF. Most likely due to Master failure during " +
|
||||
"log spliting, so we have this data in another edit. " +
|
||||
"log splitting, so we have this data in another edit. " +
|
||||
"Continuing, but renaming " + edits + " as " + p;
|
||||
LOG.warn(msg, eof);
|
||||
status.abort(msg);
|
||||
|
@ -3337,8 +3314,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
reporter.progress();
|
||||
}
|
||||
msg = "Applied " + editsCount + ", skipped " + skippedEdits +
|
||||
", firstSequenceidInLog=" + firstSeqIdInLog +
|
||||
", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
|
||||
", firstSequenceIdInLog=" + firstSeqIdInLog +
|
||||
", maxSequenceIdInLog=" + currentEditSeqId + ", path=" + edits;
|
||||
status.markComplete(msg);
|
||||
LOG.debug(msg);
|
||||
return currentEditSeqId;
|
||||
|
@ -3354,7 +3331,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Call to complete a compaction. Its for the case where we find in the WAL a compaction
|
||||
* that was not finished. We could find one recovering a WAL after a regionserver crash.
|
||||
* See HBASE-2331.
|
||||
* @param compaction
|
||||
*/
|
||||
void completeCompactionMarker(CompactionDescriptor compaction)
|
||||
throws IOException {
|
||||
|
@ -3411,6 +3387,22 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return this.stores.get(column);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return HStore instance. Does not do any copy: as the number of store is limited, we
|
||||
* iterate on the list.
|
||||
*/
|
||||
private Store getStore(Cell cell) {
|
||||
for (Map.Entry<byte[], Store> famStore : stores.entrySet()) {
|
||||
if (Bytes.equals(
|
||||
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
|
||||
famStore.getKey(), 0, famStore.getKey().length)) {
|
||||
return famStore.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public Map<byte[], Store> getStores() {
|
||||
return this.stores;
|
||||
}
|
||||
|
@ -3506,7 +3498,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* Acqures a lock on the given row.
|
||||
* Acquires a lock on the given row.
|
||||
* The same thread may acquire multiple locks on the same row.
|
||||
* @return the acquired row lock
|
||||
* @throws IOException if the lock could not be acquired after waiting
|
||||
|
@ -3562,7 +3554,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @param familyPaths List of Pair<byte[] column family, String hfilePath>
|
||||
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
|
||||
* file about to be bulk loaded
|
||||
* @param assignSeqId
|
||||
* @return true if successful, false if failed recoverably
|
||||
* @throws IOException if failed unrecoverably.
|
||||
*/
|
||||
|
@ -3574,8 +3565,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
try {
|
||||
this.writeRequestsCount.increment();
|
||||
|
||||
// There possibly was a split that happend between when the split keys
|
||||
// were gathered and before the HReiogn's write lock was taken. We need
|
||||
// There possibly was a split that happened between when the split keys
|
||||
// were gathered and before the HRegion's write lock was taken. We need
|
||||
// to validate the HFile region before attempting to bulk load all of them
|
||||
List<IOException> ioes = new ArrayList<IOException>();
|
||||
List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[], String>>();
|
||||
|
@ -3772,10 +3763,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
initializeKVHeap(scanners, joinedScanners, region);
|
||||
}
|
||||
|
||||
RegionScannerImpl(Scan scan, HRegion region) throws IOException {
|
||||
this(scan, null, region);
|
||||
}
|
||||
|
||||
protected void initializeKVHeap(List<KeyValueScanner> scanners,
|
||||
List<KeyValueScanner> joinedScanners, HRegion region)
|
||||
throws IOException {
|
||||
|
@ -3881,7 +3868,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Fetches records with currentRow into results list, until next row or limit (if not -1).
|
||||
* @param results
|
||||
* @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call.
|
||||
* @param limit Max amount of KVs to place in result list, -1 means no limit.
|
||||
* @param currentRow Byte array with key we are fetching.
|
||||
|
@ -4100,7 +4086,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
joinedHeap.close();
|
||||
joinedHeap = null;
|
||||
}
|
||||
// no need to sychronize here.
|
||||
// no need to synchronize here.
|
||||
scannerReadPoints.remove(this);
|
||||
this.filterClosed = true;
|
||||
}
|
||||
|
@ -4148,7 +4134,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @param regionInfo - HRegionInfo that describes the region
|
||||
* is new), then read them from the supplied path.
|
||||
* @param htd the table descriptor
|
||||
* @param rsServices
|
||||
* @return the new instance
|
||||
*/
|
||||
static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
|
||||
|
@ -4183,8 +4168,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* necessary cleanup for you.
|
||||
* @param info Info for region to create.
|
||||
* @param rootDir Root directory for HBase instance
|
||||
* @param conf
|
||||
* @param hTableDescriptor
|
||||
* @return new HRegion
|
||||
*
|
||||
* @throws IOException
|
||||
|
@ -4203,7 +4186,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* the one that takes an {@link HLog} instance but don't be surprised by the
|
||||
* call to the {@link HLog#closeAndDelete()} on the {@link HLog} the
|
||||
* HRegion was carrying.
|
||||
* @param r
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void closeHRegion(final HRegion r) throws IOException {
|
||||
|
@ -4220,8 +4202,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*
|
||||
* @param info Info for region to create.
|
||||
* @param rootDir Root directory for HBase instance
|
||||
* @param conf
|
||||
* @param hTableDescriptor
|
||||
* @param hlog shared HLog
|
||||
* @param initialize - true to initialize the region
|
||||
* @return new HRegion
|
||||
|
@ -4246,8 +4226,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*
|
||||
* @param info Info for region to create.
|
||||
* @param rootDir Root directory for HBase instance
|
||||
* @param conf
|
||||
* @param hTableDescriptor
|
||||
* @param hlog shared HLog
|
||||
* @param initialize - true to initialize the region
|
||||
* @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
|
||||
|
@ -4273,8 +4251,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @param info Info for region to create.
|
||||
* @param rootDir Root directory for HBase instance
|
||||
* @param tableDir table directory
|
||||
* @param conf
|
||||
* @param hTableDescriptor
|
||||
* @param hlog shared HLog
|
||||
* @param initialize - true to initialize the region
|
||||
* @param ignoreHLog - true to skip generate new hlog if it is null, mostly for createTable
|
||||
|
@ -4302,7 +4278,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (initialize) {
|
||||
// If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
|
||||
// verifying the WALEdits.
|
||||
region.setSequenceId(region.initialize());
|
||||
region.setSequenceId(region.initialize(null));
|
||||
}
|
||||
return region;
|
||||
}
|
||||
|
@ -4323,7 +4299,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* HLog#setSequenceNumber(long) passing the result of the call to
|
||||
* HRegion#getMinSequenceId() to ensure the log id is properly kept
|
||||
* up. HRegionStore does this every time it opens a new region.
|
||||
* @param conf
|
||||
* @return new HRegion
|
||||
*
|
||||
* @throws IOException
|
||||
|
@ -4497,8 +4472,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Open HRegion.
|
||||
* Calls initialize and sets sequenceid.
|
||||
* @param reporter
|
||||
* Calls initialize and sets sequenceId.
|
||||
* @return Returns <code>this</code>
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -4537,9 +4511,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Create a merged region given a temp directory with the region data.
|
||||
* @param mergedRegionInfo
|
||||
* @param region_b another merging region
|
||||
* @return merged hregion
|
||||
* @return merged HRegion
|
||||
* @throws IOException
|
||||
*/
|
||||
HRegion createMergedRegionFromMerges(final HRegionInfo mergedRegionInfo,
|
||||
|
@ -4625,8 +4598,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Merge two HRegions. The regions must be adjacent and must not overlap.
|
||||
*
|
||||
* @param srcA
|
||||
* @param srcB
|
||||
* @return new merged HRegion
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -4701,7 +4672,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
} catch (IOException ioe) {
|
||||
rmt.rollback(null, null);
|
||||
throw new IOException("Failed merging region " + a + " and " + b
|
||||
+ ", and succssfully rolled back");
|
||||
+ ", and successfully rolled back");
|
||||
}
|
||||
dstRegion.compactStores(true);
|
||||
|
||||
|
@ -4784,11 +4755,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// do after lock
|
||||
if (this.metricsRegion != null) {
|
||||
long totalSize = 0l;
|
||||
if (results != null) {
|
||||
for (Cell kv : results) {
|
||||
totalSize += KeyValueUtil.ensureKeyValue(kv).getLength();
|
||||
}
|
||||
}
|
||||
this.metricsRegion.updateGet(totalSize);
|
||||
}
|
||||
|
||||
|
@ -4881,9 +4850,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
MultiVersionConsistencyControl.WriteEntry writeEntry = null;
|
||||
boolean locked = false;
|
||||
boolean locked;
|
||||
boolean walSyncSuccessful = false;
|
||||
List<RowLock> acquiredRowLocks = null;
|
||||
List<RowLock> acquiredRowLocks;
|
||||
long addedSize = 0;
|
||||
List<KeyValue> mutations = new ArrayList<KeyValue>();
|
||||
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
||||
|
@ -4911,9 +4880,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// 6. Apply to memstore
|
||||
for (KeyValue kv : mutations) {
|
||||
kv.setMvccVersion(writeEntry.getWriteNumber());
|
||||
byte[] family = kv.getFamily();
|
||||
checkFamily(family);
|
||||
addedSize += stores.get(family).add(kv);
|
||||
Store store = getStore(kv);
|
||||
if (store == null) {
|
||||
checkFamily(CellUtil.cloneFamily(kv));
|
||||
// unreachable
|
||||
}
|
||||
addedSize += store.add(kv);
|
||||
}
|
||||
|
||||
long txid = 0;
|
||||
|
@ -4943,20 +4915,18 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
} finally {
|
||||
if (!mutations.isEmpty() && !walSyncSuccessful) {
|
||||
LOG.warn("Wal sync failed. Roll back " + mutations.size() +
|
||||
" memstore keyvalues for row(s):" +
|
||||
processor.getRowsToLock().iterator().next() + "...");
|
||||
" memstore keyvalues for row(s):" + StringUtils.byteToHexString(
|
||||
processor.getRowsToLock().iterator().next()) + "...");
|
||||
for (KeyValue kv : mutations) {
|
||||
stores.get(kv.getFamily()).rollback(kv);
|
||||
getStore(kv).rollback(kv);
|
||||
}
|
||||
}
|
||||
// 11. Roll mvcc forward
|
||||
if (writeEntry != null) {
|
||||
mvcc.completeMemstoreInsert(writeEntry);
|
||||
writeEntry = null;
|
||||
}
|
||||
if (locked) {
|
||||
this.updatesLock.readLock().unlock();
|
||||
locked = false;
|
||||
}
|
||||
// release locks if some were acquired but another timed out
|
||||
releaseRowLocks(acquiredRowLocks);
|
||||
|
@ -5035,7 +5005,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Perform one or more append operations on a row.
|
||||
*
|
||||
* @param append
|
||||
* @return new keyvalues after increment
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -5085,8 +5054,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Get previous values for all columns in this family
|
||||
Get get = new Get(row);
|
||||
for (Cell cell : family.getValue()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
get.addColumn(family.getKey(), kv.getQualifier());
|
||||
get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
|
||||
}
|
||||
List<Cell> results = get(get, false);
|
||||
|
||||
|
@ -5142,7 +5110,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Give coprocessors a chance to update the new cell
|
||||
if (coprocessorHost != null) {
|
||||
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
|
||||
RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
|
||||
RegionObserver.MutationType.APPEND, append, oldKv, newKV));
|
||||
}
|
||||
kvs.add(newKV);
|
||||
|
||||
|
@ -5161,7 +5129,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// Actually write to WAL now
|
||||
if (writeToWAL) {
|
||||
// Using default cluster id, as this can only happen in the orginating
|
||||
// Using default cluster id, as this can only happen in the originating
|
||||
// cluster. A slave cluster receives the final value (not the delta)
|
||||
// as a Put.
|
||||
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
|
||||
|
@ -5225,7 +5193,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* Perform one or more increment operations on a row.
|
||||
* @param increment
|
||||
* @return new keyvalues after increment
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -5276,8 +5243,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Get previous values for all columns in this family
|
||||
Get get = new Get(row);
|
||||
for (Cell cell: family.getValue()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
get.addColumn(family.getKey(), kv.getQualifier());
|
||||
get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell));
|
||||
}
|
||||
get.setTimeRange(tr.getMin(), tr.getMax());
|
||||
List<Cell> results = get(get, false);
|
||||
|
@ -5328,7 +5294,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Give coprocessors a chance to update the new cell
|
||||
if (coprocessorHost != null) {
|
||||
newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
|
||||
RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
|
||||
RegionObserver.MutationType.INCREMENT, increment, c, newKV));
|
||||
}
|
||||
allKVs.add(newKV);
|
||||
|
||||
|
@ -5354,7 +5320,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Actually write to WAL now
|
||||
if (walEdits != null && !walEdits.isEmpty()) {
|
||||
if (writeToWAL) {
|
||||
// Using default cluster id, as this can only happen in the orginating
|
||||
// Using default cluster id, as this can only happen in the originating
|
||||
// cluster. A slave cluster receives the final value (not the delta)
|
||||
// as a Put.
|
||||
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
|
@ -5427,7 +5393,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
(12 * Bytes.SIZEOF_LONG) +
|
||||
4 * Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
|
@ -5449,7 +5415,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
WriteState.HEAP_SIZE + // writestate
|
||||
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
|
||||
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
|
||||
ClassSize.ARRAYLIST + // recentFlushes
|
||||
MultiVersionConsistencyControl.FIXED_SIZE // mvcc
|
||||
+ ClassSize.TREEMAP // maxSeqIdInStores
|
||||
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
|
||||
|
@ -5471,7 +5436,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
private static void printUsageAndExit(final String message) {
|
||||
if (message != null && message.length() > 0) System.out.println(message);
|
||||
System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]");
|
||||
System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]");
|
||||
System.out.println("Options:");
|
||||
System.out.println(" major_compact Pass this option to major compact " +
|
||||
"passed region.");
|
||||
|
@ -5521,7 +5486,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* {@link HRegion#registerService(com.google.protobuf.Service)}
|
||||
* method before they are available.
|
||||
*
|
||||
* @param controller an {@code RpcContoller} implementation to pass to the invoked service
|
||||
* @param controller an {@code RpcController} implementation to pass to the invoked service
|
||||
* @param call a {@code CoprocessorServiceCall} instance identifying the service, method,
|
||||
* and parameters for the method invocation
|
||||
* @return a protocol buffer {@code Message} instance containing the method's result
|
||||
|
@ -5576,18 +5541,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/*
|
||||
* Process table.
|
||||
* Do major compaction or list content.
|
||||
* @param fs
|
||||
* @param p
|
||||
* @param log
|
||||
* @param c
|
||||
* @param majorCompact
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void processTable(final FileSystem fs, final Path p,
|
||||
final HLog log, final Configuration c,
|
||||
final boolean majorCompact)
|
||||
throws IOException {
|
||||
HRegion region = null;
|
||||
HRegion region;
|
||||
// Currently expects tables have one region only.
|
||||
if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) {
|
||||
region = HRegion.newHRegion(p, log, fs, c,
|
||||
|
@ -5596,7 +5556,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
throw new IOException("Not a known catalog table: " + p.toString());
|
||||
}
|
||||
try {
|
||||
region.initialize();
|
||||
region.initialize(null);
|
||||
if (majorCompact) {
|
||||
region.compactStores(true);
|
||||
} else {
|
||||
|
@ -5630,7 +5590,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
void forceSplit(byte[] sp) {
|
||||
// NOTE : this HRegion will go away after the forced split is successfull
|
||||
// NOTE : this HRegion will go away after the forced split is successful
|
||||
// therefore, no reason to clear this value
|
||||
this.splitRequest = true;
|
||||
if (sp != null) {
|
||||
|
@ -5699,19 +5659,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks every store to see if one has too many
|
||||
* store files
|
||||
* @return true if any store has too many store files
|
||||
*/
|
||||
public boolean needsCompaction() {
|
||||
for (Store store : stores.values()) {
|
||||
if(store.needsCompaction()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** @return the coprocessor host */
|
||||
public RegionCoprocessorHost getCoprocessorHost() {
|
||||
|
@ -5795,7 +5742,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Closes the lock. This needs to be called in the finally block corresponding
|
||||
* to the try block of {@link #startRegionOperation(Operation)}
|
||||
* @param operation
|
||||
* @throws IOException
|
||||
*/
|
||||
public void closeRegionOperation(Operation operation) throws IOException {
|
||||
|
@ -5962,7 +5908,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* <pre>
|
||||
* ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
|
||||
* </pre>
|
||||
* @param args
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
|
@ -6145,21 +6090,4 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Lock the updates' readLock first, so that we could safely append logs in coprocessors.
|
||||
* @throws RegionTooBusyException
|
||||
* @throws InterruptedIOException
|
||||
*/
|
||||
public void updatesLock() throws RegionTooBusyException, InterruptedIOException {
|
||||
lock(updatesLock.readLock());
|
||||
}
|
||||
|
||||
/**
|
||||
* Unlock the updates' readLock after appending logs in coprocessors.
|
||||
* @throws InterruptedIOException
|
||||
*/
|
||||
public void updatesUnlock() throws InterruptedIOException {
|
||||
updatesLock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue