HBASE-20781 Save recalculating families in a WALEdit batch of Cells

Pass the Set of families through to the WAL rather than recalculate
a Set already known.

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Michael Stack 2018-06-23 23:02:17 -07:00
parent c24a7c446f
commit c23e61f20d
6 changed files with 120 additions and 76 deletions

View File

@ -3048,13 +3048,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
public abstract Mutation getMutation(int index); public abstract Mutation getMutation(int index);
public abstract long getNonceGroup(int index); public abstract long getNonceGroup(int index);
public abstract long getNonce(int index); public abstract long getNonce(int index);
/** This method is potentially expensive and useful mostly for non-replay CP path. */
/**
* This method is potentially expensive and useful mostly for non-replay CP path.
*/
public abstract Mutation[] getMutationsForCoprocs(); public abstract Mutation[] getMutationsForCoprocs();
public abstract boolean isInReplay(); public abstract boolean isInReplay();
public abstract long getOrigLogSeqNum(); public abstract long getOrigLogSeqNum();
public abstract void startRegionOperation() throws IOException; public abstract void startRegionOperation() throws IOException;
public abstract void closeRegionOperation() throws IOException; public abstract void closeRegionOperation() throws IOException;
/** /**
@ -3073,8 +3082,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected abstract void checkAndPreparePut(final Put p) throws IOException; protected abstract void checkAndPreparePut(final Put p) throws IOException;
/** /**
* If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell * If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell
* count, tags and timestamp for all cells of all operations in a mini-batch. * count, tags and timestamp for all cells of all operations in a mini-batch.
*/ */
public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation>
miniBatchOp, long timestamp, final List<RowLock> acquiredRowLocks) throws IOException; miniBatchOp, long timestamp, final List<RowLock> acquiredRowLocks) throws IOException;
@ -3250,7 +3259,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try { try {
// if atomic then get exclusive lock, else shared lock // if atomic then get exclusive lock, else shared lock
rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock); rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock);
} catch (TimeoutIOException|InterruptedIOException e) { } catch (TimeoutIOException | InterruptedIOException e) {
// NOTE: We will retry when other exceptions, but we should stop if we receive // NOTE: We will retry when other exceptions, but we should stop if we receive
// TimeoutIOException or InterruptedIOException as operation has timed out or // TimeoutIOException or InterruptedIOException as operation has timed out or
// interrupted respectively. // interrupted respectively.
@ -3307,6 +3316,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() { visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() {
private Pair<NonceKey, WALEdit> curWALEditForNonce; private Pair<NonceKey, WALEdit> curWALEditForNonce;
@Override @Override
public boolean visit(int index) throws IOException { public boolean visit(int index) throws IOException {
Mutation m = getMutation(index); Mutation m = getMutation(index);
@ -3330,14 +3340,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
WALEdit walEdit = curWALEditForNonce.getSecond(); WALEdit walEdit = curWALEditForNonce.getSecond();
// Add WAL edits by CP // Add WAL edits from CPs.
WALEdit fromCP = walEditsFromCoprocessors[index]; WALEdit fromCP = walEditsFromCoprocessors[index];
if (fromCP != null) { if (fromCP != null) {
for (Cell cell : fromCP.getCells()) { for (Cell cell : fromCP.getCells()) {
walEdit.add(cell); walEdit.add(cell);
} }
} }
addFamilyMapToWALEdit(familyCellMaps[index], walEdit); walEdit.add(familyCellMaps[index]);
return true; return true;
} }
@ -3409,28 +3419,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting); region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);
} }
} }
/**
* Append the given map of family->edits to a WALEdit data structure.
* This does not write to the WAL itself.
* @param familyMap map of family->edits
* @param walEdit the destination entry to append into
*/
private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
WALEdit walEdit) {
for (List<Cell> edits : familyMap.values()) {
// Optimization: 'foreach' loop is not used. See:
// HBASE-12023 HRegion.applyFamilyMapToMemstore creates too many iterator objects
assert edits instanceof RandomAccess;
int listSize = edits.size();
for (int i=0; i < listSize; i++) {
Cell cell = edits.get(i);
walEdit.add(cell);
}
}
}
} }
/** /**
* Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most
* of the logic is same. * of the logic is same.

View File

@ -65,9 +65,10 @@ class FSWALEntry extends Entry {
this.txid = txid; this.txid = txid;
if (inMemstore) { if (inMemstore) {
// construct familyNames here to reduce the work of log sinker. // construct familyNames here to reduce the work of log sinker.
this.familyNames = collectFamilies(edit.getCells()); Set<byte []> families = edit.getFamilies();
this.familyNames = families != null? families: collectFamilies(edit.getCells());
} else { } else {
this.familyNames = Collections.<byte[]> emptySet(); this.familyNames = Collections.<byte[]>emptySet();
} }
} }

View File

@ -20,6 +20,10 @@ package org.apache.hadoop.hbase.wal;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
@ -43,12 +47,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
/** /**
* WALEdit: Used in HBase's transaction log (WAL) to represent * Used in HBase's transaction log (WAL) to represent a collection of edits (Cell/KeyValue objects)
* the collection of edits (KeyValue objects) corresponding to a * that came in as a single transaction. All the edits for a given transaction are written out as a
* single transaction. * single record, in PB format, followed (optionally) by Cells written via the WALCellEncoder.
* * <p>This class is LimitedPrivate for CPs to read-only. The {@link #add} methods are
* All the edits for a given transaction are written out as a single record, in PB format followed * classified as private methods, not for use by CPs.</p>
* by Cells written via the WALCellEncoder. * <p>WALEdit will accumulate a Set of all column family names referenced by the Cells
* {@link #add(Cell)}'d. This is an optimization. Usually when loading a WALEdit, we have the
* column family name to-hand.. just shove it into the WALEdit if available. Doing this, we can
* save on a parse of each Cell to figure column family down the line when we go to add the
* WALEdit to the WAL file. See the hand-off in FSWALEntry Constructor.
*/ */
// TODO: Do not expose this class to Coprocessors. It has set methods. A CP might meddle. // TODO: Do not expose this class to Coprocessors. It has set methods. A CP might meddle.
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION, @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
@ -69,29 +77,62 @@ public class WALEdit implements HeapSize {
@VisibleForTesting @VisibleForTesting
public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD"); public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
private final boolean isReplay; private final boolean replay;
private ArrayList<Cell> cells = null; private ArrayList<Cell> cells = null;
/**
* All the Cell families in <code>cells</code>. Updated by {@link #add(Cell)} and
* {@link #add(Map)}. This Set is passed to the FSWALEntry so it does not have
* to recalculate the Set of families in a transaction; makes for a bunch of CPU savings.
* An optimization that saves on CPU-expensive Cell-parsing.
*/
private Set<byte []> families = null;
public WALEdit() { public WALEdit() {
this(false); this(false);
} }
/**
* @deprecated Since 2.0.1. Use {@link #WALEdit(int, boolean)} instead.
*/
@Deprecated
public WALEdit(boolean isReplay) { public WALEdit(boolean isReplay) {
this(1, isReplay); this(1, isReplay);
} }
/**
* @deprecated Since 2.0.1. Use {@link #WALEdit(int, boolean)} instead.
*/
@Deprecated
public WALEdit(int cellCount) { public WALEdit(int cellCount) {
this(cellCount, false); this(cellCount, false);
} }
/**
* @param cellCount Pass so can pre-size the WALEdit. Optimization.
*/
public WALEdit(int cellCount, boolean isReplay) { public WALEdit(int cellCount, boolean isReplay) {
this.isReplay = isReplay; this.replay = isReplay;
cells = new ArrayList<>(cellCount); cells = new ArrayList<>(cellCount);
} }
private Set<byte[]> getOrCreateFamilies() {
if (this.families == null) {
this.families = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
}
return this.families;
}
/**
* For use by FSWALEntry ONLY. An optimization.
* @return All families in {@link #getCells()}; may be null.
*/
public Set<byte []> getFamilies() {
return this.families;
}
/** /**
* @param f
* @return True is <code>f</code> is {@link #METAFAMILY} * @return True is <code>f</code> is {@link #METAFAMILY}
*/ */
public static boolean isMetaEditFamily(final byte [] f) { public static boolean isMetaEditFamily(final byte [] f) {
@ -116,13 +157,20 @@ public class WALEdit implements HeapSize {
* replay. * replay.
*/ */
public boolean isReplay() { public boolean isReplay() {
return this.isReplay; return this.replay;
}
@InterfaceAudience.Private
public WALEdit add(Cell cell, byte [] family) {
getOrCreateFamilies().add(family);
return addCell(cell);
} }
@InterfaceAudience.Private @InterfaceAudience.Private
public WALEdit add(Cell cell) { public WALEdit add(Cell cell) {
this.cells.add(cell); // We clone Family each time we add a Cell. Expensive but safe. For CPU savings, use
return this; // add(Map) or add(Cell, family).
return add(cell, CellUtil.cloneFamily(cell));
} }
public boolean isEmpty() { public boolean isEmpty() {
@ -145,8 +193,10 @@ public class WALEdit implements HeapSize {
* @param cells the list of cells that this WALEdit now contains. * @param cells the list of cells that this WALEdit now contains.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
// Used by replay.
public void setCells(ArrayList<Cell> cells) { public void setCells(ArrayList<Cell> cells) {
this.cells = cells; this.cells = cells;
this.families = null;
} }
/** /**
@ -197,7 +247,7 @@ public class WALEdit implements HeapSize {
public static WALEdit createFlushWALEdit(RegionInfo hri, FlushDescriptor f) { public static WALEdit createFlushWALEdit(RegionInfo hri, FlushDescriptor f) {
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, FLUSH, KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, FLUSH,
EnvironmentEdgeManager.currentTime(), f.toByteArray()); EnvironmentEdgeManager.currentTime(), f.toByteArray());
return new WALEdit().add(kv); return new WALEdit().add(kv, METAFAMILY);
} }
public static FlushDescriptor getFlushDescriptor(Cell cell) throws IOException { public static FlushDescriptor getFlushDescriptor(Cell cell) throws IOException {
@ -211,7 +261,7 @@ public class WALEdit implements HeapSize {
RegionEventDescriptor regionEventDesc) { RegionEventDescriptor regionEventDesc) {
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, REGION_EVENT, KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, REGION_EVENT,
EnvironmentEdgeManager.currentTime(), regionEventDesc.toByteArray()); EnvironmentEdgeManager.currentTime(), regionEventDesc.toByteArray());
return new WALEdit().add(kv); return new WALEdit().add(kv, METAFAMILY);
} }
public static RegionEventDescriptor getRegionEventDescriptor(Cell cell) throws IOException { public static RegionEventDescriptor getRegionEventDescriptor(Cell cell) throws IOException {
@ -230,7 +280,7 @@ public class WALEdit implements HeapSize {
byte [] pbbytes = c.toByteArray(); byte [] pbbytes = c.toByteArray();
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION, KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
EnvironmentEdgeManager.currentTime(), pbbytes); EnvironmentEdgeManager.currentTime(), pbbytes);
return new WALEdit().add(kv); //replication scope null so that this won't be replicated return new WALEdit().add(kv, METAFAMILY); //replication scope null so this won't be replicated
} }
public static byte[] getRowForRegion(RegionInfo hri) { public static byte[] getRowForRegion(RegionInfo hri) {
@ -278,7 +328,7 @@ public class WALEdit implements HeapSize {
BULK_LOAD, BULK_LOAD,
EnvironmentEdgeManager.currentTime(), EnvironmentEdgeManager.currentTime(),
bulkLoadDescriptor.toByteArray()); bulkLoadDescriptor.toByteArray());
return new WALEdit().add(kv); return new WALEdit().add(kv, METAFAMILY);
} }
/** /**
@ -292,4 +342,34 @@ public class WALEdit implements HeapSize {
} }
return null; return null;
} }
/**
* Append the given map of family->edits to a WALEdit data structure.
* This does not write to the WAL itself.
* Note that as an optimization, we will stamp the Set of column families into the WALEdit
* to save on our having to calculate it subsequently way down in the actual WAL writing.
*
* @param familyMap map of family->edits
*/
public void add(Map<byte[], List<Cell>> familyMap) {
for (Map.Entry<byte [], List<Cell>> e: familyMap.entrySet()) {
// 'foreach' loop NOT used. See HBASE-12023 "...creates too many iterator objects."
int listSize = e.getValue().size();
// Add all Cells first and then at end, add the family rather than call {@link #add(Cell)}
// and have it clone family each time. Optimization!
for (int i = 0; i < listSize; i++) {
addCell(e.getValue().get(i));
}
addFamily(e.getKey());
}
}
private void addFamily(byte [] family) {
getOrCreateFamilies().add(family);
}
private WALEdit addCell(Cell cell) {
this.cells.add(cell);
return this;
}
} }

View File

@ -211,7 +211,7 @@ public class TestWALObserver {
Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap(); Map<byte[], List<Cell>> familyMap = p.getFamilyCellMap();
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
addFamilyMapToWALEdit(familyMap, edit); edit.add(familyMap);
boolean foundFamily0 = false; boolean foundFamily0 = false;
boolean foundFamily2 = false; boolean foundFamily2 = false;
@ -432,24 +432,6 @@ public class TestWALObserver {
return p; return p;
} }
/**
* Copied from HRegion.
*
* @param familyMap
* map of family->edits
* @param walEdit
* the destination entry to append into
*/
private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
WALEdit walEdit) {
for (List<Cell> edits : familyMap.values()) {
for (Cell cell : edits) {
// KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO.
walEdit.add(cell);
}
}
}
private Path runWALSplit(final Configuration c) throws IOException { private Path runWALSplit(final Configuration c) throws IOException {
List<Path> splits = WALSplitter.split( List<Path> splits = WALSplitter.split(
hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals);

View File

@ -232,7 +232,8 @@ public class TestFSHLogProvider {
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.rollWriter(); log.rollWriter();
assertEquals(2, AbstractFSWALProvider.getNumRolledLogFiles(log)); int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
assertEquals(2, count);
// Flush the second region, which removes all the remaining output files // Flush the second region, which removes all the remaining output files
// since the oldest was completely flushed and the two others only contain // since the oldest was completely flushed and the two others only contain

View File

@ -28,7 +28,6 @@ import com.codahale.metrics.MetricRegistry;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Random; import java.util.Random;
@ -41,7 +40,6 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -182,7 +180,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
long now = System.nanoTime(); long now = System.nanoTime();
Put put = setupPut(rand, key, value, numFamilies); Put put = setupPut(rand, key, value, numFamilies);
WALEdit walEdit = new WALEdit(); WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); walEdit.add(put.getFamilyCellMap());
RegionInfo hri = region.getRegionInfo(); RegionInfo hri = region.getRegionInfo();
final WALKeyImpl logkey = final WALKeyImpl logkey =
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
@ -562,15 +560,6 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
return put; return put;
} }
private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
WALEdit walEdit) {
for (List<Cell> edits : familyMap.values()) {
for (Cell cell : edits) {
walEdit.add(cell);
}
}
}
private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException { private long runBenchmark(Runnable[] runnable, final int numThreads) throws InterruptedException {
Thread[] threads = new Thread[numThreads]; Thread[] threads = new Thread[numThreads];
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();