diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java index 9b7107b2b86..d760aa20027 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellComparator.java @@ -68,19 +68,6 @@ public class CellComparator implements Comparator, Serializable { if (!ignoreSequenceid) { // Negate following comparisons so later edits show up first - - // compare log replay tag value if there is any - // when either keyvalue tagged with log replay sequence number, we need to compare them: - // 1) when both keyvalues have the tag, then use the tag values for comparison - // 2) when one has and the other doesn't have, the one without the log - // replay tag wins because - // it means the edit isn't from recovery but new one coming from clients during recovery - // 3) when both doesn't have, then skip to the next mvcc comparison - long leftChangeSeqNum = getReplaySeqNum(a); - long RightChangeSeqNum = getReplaySeqNum(b); - if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) { - return Longs.compare(RightChangeSeqNum, leftChangeSeqNum); - } // mvccVersion: later sorts first return Longs.compare(b.getMvccVersion(), a.getMvccVersion()); } else { @@ -88,22 +75,6 @@ public class CellComparator implements Comparator, Serializable { } } - /** - * Return replay log sequence number for the cell - * - * @param c - * @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG - */ - private static long getReplaySeqNum(final Cell c) { - Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(), - TagType.LOG_REPLAY_TAG_TYPE); - - if (tag != null) { - return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()); - } - return Long.MAX_VALUE; - } - public static int findCommonPrefixInRowPart(Cell left, Cell right, int rowCommonPrefix) { return findCommonPrefix(left.getRowArray(), right.getRowArray(), left.getRowLength() - rowCommonPrefix, right.getRowLength() - rowCommonPrefix, left.getRowOffset() diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 516fd811c7e..8566a8815cd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -746,6 +746,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); + this.seqId = c.getSequenceId(); } /** @@ -961,8 +962,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, checkForTagsLength(tagsLength); // Allocate right-sized byte array. int keyLength = (int) getKeyDataStructureSize(rlength, flength, qlength); - byte [] bytes = - new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, tagsLength)]; + byte[] bytes = new byte[(int) getKeyValueDataStructureSize(rlength, flength, qlength, vlength, + tagsLength)]; // Write key, value and key row length. int pos = 0; pos = Bytes.putInt(bytes, pos, keyLength); @@ -2492,8 +2493,8 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * Compare two keys assuming that the first n bytes are the same. * @param commonPrefix How many bytes are the same. */ - int compareIgnoringPrefix( - int commonPrefix, byte[] left, int loffset, int llength, byte[] right, int roffset, int rlength + int compareIgnoringPrefix(int commonPrefix, byte[] left, int loffset, int llength, + byte[] right, int roffset, int rlength ); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java index b1135163ccf..f088dcf410f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -26,7 +26,7 @@ public final class TagType { // Please declare new Tag Types here to avoid step on pre-existing tag types. public static final byte ACL_TAG_TYPE = (byte) 1; public static final byte VISIBILITY_TAG_TYPE = (byte) 2; - public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; + // public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; // deprecated public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4; // String based tag type used in replication public static final byte STRING_VIS_TAG_TYPE = (byte) 7; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index 4f511f02c16..694ccff980c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -773,8 +773,8 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements public void setRecoveryMode(boolean isForInitialization) throws IOException { synchronized(this) { if (this.isDrainingDone) { - // when there is no outstanding splitlogtask after master start up, we already have up to date - // recovery mode + // when there is no outstanding splitlogtask after master start up, we already have up to + // date recovery mode return; } } @@ -866,12 +866,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); - int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION); if (LOG.isDebugEnabled()) { - LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version); + LOG.debug("Distributed log replay=" + dlr); } - // For distributed log replay, hfile version must be 3 at least; we need tag support. - return dlr && (version >= 3); + return dlr; } private boolean resubmit(ServerName serverName, String path, int version) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4a4d0044192..d5c07f07586 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2684,7 +2684,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // continue; } doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells); + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, isInReplay); } // ------------------------------------ @@ -3189,12 +3189,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. * @param output newly added KVs into memstore + * @param isInReplay true when adding replayed KVs into memstore * @return the additional memory usage of the memstore caused by the * new entries. * @throws IOException */ private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, List memstoreCells) throws IOException { + long mvccNum, List memstoreCells, boolean isInReplay) throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { @@ -3209,6 +3210,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // Pair ret = store.add(cell); size += ret.getFirst(); memstoreCells.add(ret.getSecond()); + if(isInReplay) { + // set memstore newly added cells with replay mvcc number + CellUtil.setSequenceId(ret.getSecond(), mvccNum); + } } } @@ -3407,7 +3412,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } try { - // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater + // replay the edits. Replay can return -1 if everything is skipped, only update + // if seqId is greater seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); } catch (IOException e) { boolean skipErrors = conf.getBoolean( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index dd895e0feda..8821803d4d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1430,8 +1430,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, ? region.getCoprocessorHost() : null; // do not invoke coprocessors if this is a secondary region replica List> walEntries = new ArrayList>(); - // when tag is enabled, we need tag replay edits with log sequence number - boolean needAddReplayTag = (HFile.getFormatVersion(regionServer.conf) >= 3); // Skip adding the edits to WAL if this is a secondary region replica boolean isPrimary = RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()); @@ -1452,7 +1450,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Pair walEntry = (coprocessorHost == null) ? null : new Pair(); List edits = WALSplitter.getMutationsFromWALEntry(entry, - cells, walEntry, needAddReplayTag, durability); + cells, walEntry, durability); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index cc03e093af6..df760736593 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -92,8 +92,9 @@ public class DefaultCompactor extends Compactor { smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } + writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); + true, fd.maxTagsLength > 0); boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId); if (!finished) { writer.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index a74bd3ddb3b..40b4af0f723 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -112,13 +112,13 @@ public class StripeCompactor extends Compactor { smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint); cleanSeqId = true; } - final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint; + final Compression.Algorithm compression = store.getFamily().getCompactionCompression(); StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() { @Override public Writer createWriter() throws IOException { return store.createWriterInTmp( - fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0); + fd.maxKeyCount, compression, true, true, fd.maxTagsLength > 0); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index ef7b6ffd75d..3f381cc1230 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -2135,34 +2135,6 @@ public class WALSplitter { public final long nonce; } - /** - * Tag original sequence number for each edit to be replayed - * @param seqId - * @param cell - */ - private static Cell tagReplayLogSequenceNumber(long seqId, Cell cell) { - // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet - boolean needAddRecoveryTag = true; - if (cell.getTagsLength() > 0) { - Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), - TagType.LOG_REPLAY_TAG_TYPE); - if (tmpTag != null) { - // found an existing log replay tag so reuse it - needAddRecoveryTag = false; - } - } - if (needAddRecoveryTag) { - List newTags = new ArrayList(); - Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(seqId)); - newTags.add(replayTag); - if (cell.getTagsLength() > 0) { - newTags.addAll(Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength())); - } - return new TagRewriteCell(cell, Tag.fromList(newTags)); - } - return cell; - } - /** * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & * WALEdit from the passed in WALEntry @@ -2170,12 +2142,11 @@ public class WALSplitter { * @param cells * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances * extracted from the passed in WALEntry. - * @param addLogReplayTag * @return list of Pair to be replayed * @throws IOException */ public static List getMutationsFromWALEntry(WALEntry entry, CellScanner cells, - Pair logEntry, boolean addLogReplayTag, Durability durability) + Pair logEntry, Durability durability) throws IOException { if (entry == null) { @@ -2223,11 +2194,7 @@ public class WALSplitter { if (CellUtil.isDelete(cell)) { ((Delete) m).addDeleteMarker(cell); } else { - Cell tmpNewCell = cell; - if (addLogReplayTag) { - tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell); - } - ((Put) m).add(tmpNewCell); + ((Put) m).add(cell); } m.setDurability(durability); previousCell = cell; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 19050d5bc8d..f37c1ebca7c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -1188,7 +1188,6 @@ public class TestDistributedLogSplitting { LOG.info("testSameVersionUpdatesRecovery"); conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); - conf.setInt("hfile.format.version", 3); startCluster(NUM_RS); final AtomicLong sequenceId = new AtomicLong(100); final int NUM_REGIONS_TO_CREATE = 40; @@ -1282,11 +1281,10 @@ public class TestDistributedLogSplitting { conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); conf.setInt("hbase.hstore.compactionThreshold", 3); - conf.setInt("hfile.format.version", 3); startCluster(NUM_RS); final AtomicLong sequenceId = new AtomicLong(100); final int NUM_REGIONS_TO_CREATE = 40; - final int NUM_LOG_LINES = 1000; + final int NUM_LOG_LINES = 2000; // turn off load balancing to prevent regions from moving around otherwise // they will consume recovered.edits master.balanceSwitch(false);