diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 4a8b8adff79..65a34214619 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -133,10 +133,10 @@ public class TestWALRecordReader { long ts = System.currentTimeMillis(); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, getWalKey(ts, scopes), edit, true); + log.append(info, getWalKeyImpl(ts, scopes), edit, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, getWalKey(ts+1, scopes), edit, true); + log.append(info, getWalKeyImpl(ts+1, scopes), edit, true); log.sync(); LOG.info("Before 1st WAL roll " + log.toString()); log.rollWriter(); @@ -147,10 +147,10 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, getWalKey(ts1+1, scopes), edit, true); + log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, getWalKey(ts1+2, scopes), edit, true); + log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true); log.sync(); log.shutdown(); walfactory.shutdown(); @@ -192,7 +192,7 @@ public class TestWALRecordReader { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); + long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); log.sync(txid); Thread.sleep(1); // make sure 2nd log gets a later timestamp @@ -202,7 +202,7 @@ public class TestWALRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); - txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true); + txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true); log.sync(txid); log.shutdown(); walfactory.shutdown(); @@ -241,8 +241,8 @@ public class TestWALRecordReader { testSplit(splits.get(1)); } - protected WALKey getWalKey(final long time, NavigableMap scopes) { - return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); + protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap scopes) { + return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes); } protected WALRecordReader getReader() { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 28bf2491d1f..143f585dbdd 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -76,7 +76,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.mapreduce.Job; import org.junit.Before; @@ -764,7 +764,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { .setEndKey(HConstants.EMPTY_END_ROW) .build(); WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); - Replication.scopeWALEdits(new WALKey(), edit, + Replication.scopeWALEdits(new WALKeyImpl(), edit, htable1.getConfiguration(), null); } @@ -844,7 +844,7 @@ public class TestReplicationSmallTests extends TestReplicationBase { long now = EnvironmentEdgeManager.currentTime(); edit.add(new KeyValue(rowName, famName, qualifier, now, value)); - WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); + WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes); wal.append(hri, walKey, edit, true); wal.sync(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 6b5527bf2cc..7391f6f21e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -912,26 +912,16 @@ public interface RegionObserver { /** * Called before a {@link WALEdit} * replayed for this region. - * Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause - * damage. * @param ctx the environment provided by the region server - * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced - * with something that doesn't expose IntefaceAudience.Private classes. */ - @Deprecated default void preWALRestore(ObserverContext ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} /** * Called after a {@link WALEdit} * replayed for this region. - * Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause - * damage. * @param ctx the environment provided by the region server - * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced - * with something that doesn't expose IntefaceAudience.Private classes. */ - @Deprecated default void postWALRestore(ObserverContext ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index af9690a6594..56cbf478527 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -108,15 +109,16 @@ public class ReplicationProtbufUtil { HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); for (Entry entry: entries) { entryBuilder.clear(); - // TODO: this duplicates a lot in WALKey#getBuilder + // TODO: this duplicates a lot in WALKeyImpl#getBuilder WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder(); - WALKey key = entry.getKey(); + WALKeyImpl key = entry.getKey(); keyBuilder.setEncodedRegionName( UnsafeByteOperations.unsafeWrap(encodedRegionName == null ? key.getEncodedRegionName() : encodedRegionName)); keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(key.getTablename().getName())); - keyBuilder.setLogSequenceNumber(key.getLogSeqNum()); + long sequenceId = key.getSequenceId(); + keyBuilder.setLogSequenceNumber(sequenceId); keyBuilder.setWriteTime(key.getWriteTime()); if (key.getNonce() != HConstants.NO_NONCE) { keyBuilder.setNonce(key.getNonce()); @@ -129,7 +131,7 @@ public class ReplicationProtbufUtil { uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); keyBuilder.addClusterIds(uuidBuilder.build()); } - if(key.getOrigLogSeqNum() > 0) { + if (key.getOrigLogSeqNum() > 0) { keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum()); } WALEdit edit = entry.getEdit(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a8ea7242822..85c12e9433a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -191,6 +191,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.io.MultipleIOException; @@ -3343,7 +3344,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getOrigLogSeqNum() { - return WALKey.NO_SEQUENCE_ID; + return SequenceId.NO_SEQUENCE_ID; } @Override @@ -4504,16 +4505,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (firstSeqIdInLog == -1) { - firstSeqIdInLog = key.getLogSeqNum(); + firstSeqIdInLog = key.getSequenceId(); } - if (currentEditSeqId > key.getLogSeqNum()) { + if (currentEditSeqId > key.getSequenceId()) { // when this condition is true, it means we have a serious defect because we need to // maintain increasing SeqId for WAL edits per region LOG.error(getRegionInfo().getEncodedName() + " : " + "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key + "; edit=" + val); } else { - currentEditSeqId = key.getLogSeqNum(); + currentEditSeqId = key.getSequenceId(); } currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId; @@ -4571,7 +4572,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } // Now, figure if we should skip this edit. - if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getColumnFamilyDescriptor() + if (key.getSequenceId() <= maxSeqIdInStores.get(store.getColumnFamilyDescriptor() .getName())) { skippedEdits++; continue; @@ -7550,7 +7551,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, long now, long nonceGroup, long nonce) throws IOException { return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce, - WALKey.NO_SEQUENCE_ID); + SequenceId.NO_SEQUENCE_ID); } /** @@ -7560,16 +7561,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException { Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), "WALEdit is null or empty!"); - Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != WALKey.NO_SEQUENCE_ID, + Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != SequenceId.NO_SEQUENCE_ID, "Invalid replay sequence Id for replay WALEdit!"); // Using default cluster id, as this can only happen in the originating cluster. // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey - // here instead of WALKey directly to support legacy coprocessors. - WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, - nonce, mvcc) : - new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, + // here instead of WALKeyImpl directly to support legacy coprocessors. + WALKeyImpl walKey = walEdit.isReplay()? + new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds, + nonceGroup, nonce, mvcc) : + new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, this.getReplicationScope()); if (walEdit.isReplay()) { walKey.setOrigLogSeqNum(origLogSeqNum); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java index 7bfda6a09a7..235644edaca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SequenceId.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -27,5 +26,12 @@ import org.apache.yetus.audience.InterfaceAudience; */ @InterfaceAudience.Private public interface SequenceId { - public long getSequenceId() throws IOException; + /** + * Used to represent when a particular wal key doesn't know/care about the sequence ordering. + */ + long NO_SEQUENCE_ID = -1; + + default long getSequenceId() { + return NO_SEQUENCE_ID; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 534315e47ea..246221b8c45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.WriterBase; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; @@ -954,7 +955,7 @@ public abstract class AbstractFSWAL implements WAL { } } - protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKey key, + protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore, RingBuffer ringBuffer) throws IOException { if (this.closed) { @@ -1017,7 +1018,7 @@ public abstract class AbstractFSWAL implements WAL { * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. */ @Override - public abstract long append(RegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) + public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException; protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 18007aa0f75..832eefd559e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.ipc.RemoteException; @@ -561,10 +562,10 @@ public class AsyncFSWAL extends AbstractFSWAL { } @Override - public long append(RegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) + public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { - long txid = - stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); + long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, + waitingConsumePayloads); if (shouldScheduleConsumer()) { consumeExecutor.execute(consumer); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 482500e08dc..454928bb843 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; @@ -113,7 +114,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter int buffered = output.buffered(); entry.setCompressionContext(compressionContext); try { - entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() + entry.getKey(). + getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() .writeDelimitedTo(asyncOutputWrapper); } catch (IOException e) { throw new AssertionError("should not happen", e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 15a6a4181f3..3da37d3031e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALPrettyPrinter; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; @@ -441,7 +442,7 @@ public class FSHLog extends AbstractFSWAL { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION", justification = "Will never be null") @Override - public long append(final RegionInfo hri, final WALKey key, final WALEdit edits, + public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, final boolean inMemstore) throws IOException { return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, disruptor.getRingBuffer()); @@ -469,17 +470,6 @@ public class FSHLog extends AbstractFSWAL { private final BlockingQueue syncFutures; private volatile SyncFuture takeSyncFuture = null; - /** - * UPDATE! - * @param syncs the batch of calls to sync that arrived as this thread was starting; when done, - * we will put the result of the actual hdfs sync call as the result. - * @param sequence The sequence number on the ring buffer when this thread was set running. If - * this actual writer sync completes then all appends up this point have been - * flushed/synced/pushed to datanodes. If we fail, then the passed in - * syncs futures will return the exception to their clients; some of the - * edits may have made it out to data nodes but we will report all that were part of - * this session as failed. - */ SyncRunner(final String name, final int maxHandlersCount) { super(name); // LinkedBlockingQueue because of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index a928ad580e2..728a075cfd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; @@ -57,7 +57,7 @@ class FSWALEntry extends Entry { private final transient RegionInfo regionInfo; private final transient Set familyNames; - FSWALEntry(final long txid, final WALKey key, final WALEdit edit, + FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo, final boolean inMemstore) { super(key, edit); this.inMemstore = inMemstore; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 78c055e1be6..c199484271f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -367,7 +367,8 @@ public class ProtobufLogReader extends ReaderBase { entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor); if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) { if (LOG.isTraceEnabled()) { - LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos()); + LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + + this.inputStream.getPos()); } continue; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index d1e72f764c0..64acfba3400 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; @@ -50,8 +51,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter @Override public void append(Entry entry) throws IOException { entry.setCompressionContext(compressionContext); - entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() - .writeDelimitedTo(output); + ((WALKeyImpl)entry.getKey()).getBuilder(compressor). + setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output); for (Cell cell : entry.getEdit().getCells()) { // cellEncoder must assume little about the stream, since we write PB and cells in turn. cellEncoder.write(cell); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index a30c29cd645..2a01b14c981 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) public abstract class ReaderBase implements AbstractFSWALProvider.Reader { @@ -92,7 +92,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader { public Entry next(Entry reuse) throws IOException { Entry e = reuse; if (e == null) { - e = new Entry(new WALKey(), new WALEdit()); + e = new Entry(); } if (compressionContext != null) { e.setCompressionContext(compressionContext); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index a25b19d3847..13ffac75627 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -74,13 +74,8 @@ public interface WALActionsListener { /** * Called before each write. - * @param info - * @param logKey - * @param logEdit */ - default void visitLogEntryBeforeWrite( - RegionInfo info, WALKey logKey, WALEdit logEdit - ) {} + default void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {} /** * @param logKey diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java index d28c3c48f42..0edd5d4e19d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCoprocessorHost.java @@ -139,11 +139,6 @@ public class WALCoprocessorHost } } - /** - * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced - * with something that doesn't expose IntefaceAudience.Private classes. - */ - @Deprecated public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { // Not bypassable. @@ -157,11 +152,7 @@ public class WALCoprocessorHost } }); } - /** - * @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced - * with something that doesn't expose IntefaceAudience.Private classes. - */ - @Deprecated + public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 816a3b88d21..518ee8fbe54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; @@ -59,11 +59,12 @@ public class WALUtil { *

This write is for internal use only. Not for external client consumption. * @param mvcc Used by WAL to get sequence Id for the waledit. */ - public static WALKey writeCompactionMarker(WAL wal, + public static WALKeyImpl writeCompactionMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc) throws IOException { - WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc); + WALKeyImpl walKey = + writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } @@ -75,10 +76,10 @@ public class WALUtil { * *

This write is for internal use only. Not for external client consumption. */ - public static WALKey writeFlushMarker(WAL wal, NavigableMap replicationScope, + public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) throws IOException { - WALKey walKey = doFullAppendTransaction(wal, replicationScope, hri, + WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); @@ -90,11 +91,11 @@ public class WALUtil { * Write a region open marker indicating that the region is opened. * This write is for internal use only. Not for external client consumption. */ - public static WALKey writeRegionEventMarker(WAL wal, + public static WALKeyImpl writeRegionEventMarker(WAL wal, NavigableMap replicationScope, RegionInfo hri, final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) throws IOException { - WALKey walKey = writeMarker(wal, replicationScope, hri, + WALKeyImpl walKey = writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); @@ -112,19 +113,19 @@ public class WALUtil { * @return walKey with sequenceid filled out for this bulk load marker * @throws IOException We will throw an IOException if we can not append to the HLog. */ - public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, + public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal, final NavigableMap replicationScope, final RegionInfo hri, final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) throws IOException { - WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), - mvcc); + WALKeyImpl walKey = + writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); } return walKey; } - private static WALKey writeMarker(final WAL wal, + private static WALKeyImpl writeMarker(final WAL wal, final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc) throws IOException { @@ -138,14 +139,14 @@ public class WALUtil { * Good for case of adding a single edit or marker to the WAL. * *

This write is for internal use only. Not for external client consumption. - * @return WALKey that was added to the WAL. + * @return WALKeyImpl that was added to the WAL. */ - public static WALKey doFullAppendTransaction(final WAL wal, + public static WALKeyImpl doFullAppendTransaction(final WAL wal, final NavigableMap replicationScope, final RegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync) throws IOException { // TODO: Pass in current time to use? - WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), + WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc, replicationScope); long trx = MultiVersionConcurrencyControl.NONE; try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java index c629e16d38e..6dc50019c89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -55,7 +55,7 @@ public class ClusterMarkingEntryFilter implements WALEntryFilter { if (replicationEndpoint.canReplicateToSameCluster() || !entry.getKey().getClusterIds().contains(peerClusterId)) { WALEdit edit = entry.getEdit(); - WALKey logKey = entry.getKey(); + WALKeyImpl logKey = (WALKeyImpl)entry.getKey(); if (edit != null && !edit.isEmpty()) { // Mark that the current cluster has the change diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index 0e4aadd6732..2a2df60303b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; @@ -58,7 +60,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.zookeeper.KeeperException; @@ -266,7 +267,7 @@ public class Replication implements } } if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { - logKey.serializeReplicationScope(false); + ((WALKeyImpl)logKey).serializeReplicationScope(false); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java index 4c86323ef68..038b79973ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityReplicationEndpoint.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; @@ -111,7 +112,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint { newEdit.add(cell); } } - newEntries.add(new Entry(entry.getKey(), newEdit)); + newEntries.add(new Entry(((WALKeyImpl)entry.getKey()), newEdit)); } replicateContext.setEntries(newEntries); return delegator.replicate(replicateContext); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 76b65a76ebd..cedf3509f5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -161,7 +161,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public long append(RegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) + public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index e3192559518..1002357e4f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -116,7 +116,7 @@ public interface WAL extends Closeable, WALFileLengthProvider { * @return Returns a 'transaction id' and key will have the region edit/sequence id * in it. */ - long append(RegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException; + long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException; /** * updates the seuence number of a specific store. @@ -230,10 +230,10 @@ public interface WAL extends Closeable, WALFileLengthProvider { */ class Entry { private final WALEdit edit; - private final WALKey key; + private final WALKeyImpl key; public Entry() { - this(new WALKey(), new WALEdit()); + this(new WALKeyImpl(), new WALEdit()); } /** @@ -242,7 +242,7 @@ public interface WAL extends Closeable, WALFileLengthProvider { * @param edit log's edit * @param key log's key */ - public Entry(WALKey key, WALEdit edit) { + public Entry(WALKeyImpl key, WALEdit edit) { this.key = key; this.edit = edit; } @@ -261,7 +261,7 @@ public interface WAL extends Closeable, WALFileLengthProvider { * * @return key */ - public WALKey getKey() { + public WALKeyImpl getKey() { return key; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java index 2feb3565a9d..f5b611bae83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe * All the edits for a given transaction are written out as a single record, in PB format followed * by Cells written via the WALCellEncoder. */ +// TODO: Do not expose this class to Coprocessors. It has set methods. A CP might meddle. @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION, HBaseInterfaceAudience.COPROC }) public class WALEdit implements HeapSize { @@ -119,6 +120,7 @@ public class WALEdit implements HeapSize { return this.isReplay; } + @InterfaceAudience.Private public WALEdit add(Cell cell) { this.cells.add(cell); return this; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index fd40ec4dda8..0c818dbac87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,630 +17,88 @@ */ package org.apache.hadoop.hbase.wal; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.SequenceId; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; -import java.io.IOException; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.TreeMap; import java.util.UUID; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.regionserver.SequenceId; -// imports for things that haven't moved from regionserver.wal yet. -import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; -import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** - * A Key for an entry in the WAL. - * - * The log intermingles edits to many tables and rows, so each log entry - * identifies the appropriate table and row. Within a table and row, they're - * also sorted. - * - *

Some Transactional edits (START, COMMIT, ABORT) will not have an associated row. - * + * Key for WAL Entry. + * Read-only. No Setters. For limited audience such as Coprocessors. */ -// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical -// purposes. They need to be merged into WALEntry. -@InterfaceAudience.Private -public class WALKey implements SequenceId, Comparable { - +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION, + HBaseInterfaceAudience.COPROC}) +public interface WALKey extends SequenceId, Comparable { /** - * Used to represent when a particular wal key doesn't know/care about the sequence ordering. + * Unmodifiable empty list of UUIDs. */ - public static final long NO_SEQUENCE_ID = -1; + List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); - @InterfaceAudience.Private // For internal use only. - public MultiVersionConcurrencyControl getMvcc() { - return mvcc; + default long estimatedSerializedSizeOf() { + return 0; } /** - * Use it to complete mvcc transaction. This WALKey was part of - * (the transaction is started when you call append; see the comment on FSHLog#append). To - * complete call - * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} - * or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} - * @return A WriteEntry gotten from local WAL subsystem. - * @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry) + * @return encoded region name */ - @InterfaceAudience.Private // For internal use only. - public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { - assert this.writeEntry != null; - return this.writeEntry; - } - - @InterfaceAudience.Private // For internal use only. - public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { - assert this.writeEntry == null; - this.writeEntry = writeEntry; - // Set our sequenceid now using WriteEntry. - this.sequenceId = writeEntry.getWriteNumber(); - } - - private byte [] encodedRegionName; - - private TableName tablename; - /** - * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is - * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized. - */ - private long sequenceId; + byte[] getEncodedRegionName(); /** - * Used during WAL replay; the sequenceId of the edit when it came into the system. + * @return table name */ - private long origLogSeqNum = 0; - - /** Time at which this edit was written. */ - private long writeTime; - - /** The first element in the list is the cluster id on which the change has originated */ - private List clusterIds; - - private NavigableMap replicationScope; - - private long nonceGroup = HConstants.NO_NONCE; - private long nonce = HConstants.NO_NONCE; - private MultiVersionConcurrencyControl mvcc; - /** - * Set in a way visible to multiple threads; e.g. synchronized getter/setters. - */ - private MultiVersionConcurrencyControl.WriteEntry writeEntry; - public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); - - private CompressionContext compressionContext; - - public WALKey() { - init(null, null, 0L, HConstants.LATEST_TIMESTAMP, - new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); - } - - public WALKey(final NavigableMap replicationScope) { - init(null, null, 0L, HConstants.LATEST_TIMESTAMP, - new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope); - } - - @VisibleForTesting - public WALKey(final byte[] encodedRegionName, final TableName tablename, - long logSeqNum, - final long now, UUID clusterId) { - List clusterIds = new ArrayList<>(1); - clusterIds.add(clusterId); - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, - HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); - } - - // TODO: Fix being able to pass in sequenceid. - public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) { - init(encodedRegionName, - tablename, - NO_SEQUENCE_ID, - now, - EMPTY_UUIDS, - HConstants.NO_NONCE, - HConstants.NO_NONCE, - null, null); - } - - // TODO: Fix being able to pass in sequenceid. - public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now, - final NavigableMap replicationScope) { - init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, - HConstants.NO_NONCE, null, replicationScope); - } - - public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now, - MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { - init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, - HConstants.NO_NONCE, mvcc, replicationScope); - } - - public WALKey(final byte[] encodedRegionName, - final TableName tablename, - final long now, - MultiVersionConcurrencyControl mvcc) { - init(encodedRegionName, - tablename, - NO_SEQUENCE_ID, - now, - EMPTY_UUIDS, - HConstants.NO_NONCE, - HConstants.NO_NONCE, - mvcc, null); - } - - /** - * Create the log key for writing to somewhere. - * We maintain the tablename mainly for debugging purposes. - * A regionName is always a sub-table object. - *

Used by log splitting and snapshots. - * - * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tablename - name of table - * @param logSeqNum - log sequence number - * @param now Time at which this edit was written. - * @param clusterIds the clusters that have consumed the change(used in Replication) - * @param nonceGroup the nonceGroup - * @param nonce the nonce - * @param mvcc the mvcc associate the WALKey - * @param replicationScope the non-default replication scope - * associated with the region's column families - */ - // TODO: Fix being able to pass in sequenceid. - public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, - final long now, List clusterIds, long nonceGroup, long nonce, - MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, - replicationScope); - } - - /** - * Create the log key for writing to somewhere. - * We maintain the tablename mainly for debugging purposes. - * A regionName is always a sub-table object. - *

Used by log splitting and snapshots. - * - * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tablename - name of table - * @param logSeqNum - log sequence number - * @param now Time at which this edit was written. - * @param clusterIds the clusters that have consumed the change(used in Replication) - */ - // TODO: Fix being able to pass in sequenceid. - public WALKey(final byte[] encodedRegionName, - final TableName tablename, - long logSeqNum, - final long now, - List clusterIds, - long nonceGroup, - long nonce, - MultiVersionConcurrencyControl mvcc) { - init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null); - } - - /** - * Create the log key for writing to somewhere. - * We maintain the tablename mainly for debugging purposes. - * A regionName is always a sub-table object. - * - * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tablename the tablename - * @param now Time at which this edit was written. - * @param clusterIds the clusters that have consumed the change(used in Replication) - * @param nonceGroup - * @param nonce - * @param mvcc mvcc control used to generate sequence numbers and control read/write points - */ - public WALKey(final byte[] encodedRegionName, final TableName tablename, - final long now, List clusterIds, long nonceGroup, - final long nonce, final MultiVersionConcurrencyControl mvcc) { - init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, - null); - } - - /** - * Create the log key for writing to somewhere. - * We maintain the tablename mainly for debugging purposes. - * A regionName is always a sub-table object. - * - * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tablename - * @param now Time at which this edit was written. - * @param clusterIds the clusters that have consumed the change(used in Replication) - * @param nonceGroup the nonceGroup - * @param nonce the nonce - * @param mvcc mvcc control used to generate sequence numbers and control read/write points - * @param replicationScope the non-default replication scope of the column families - */ - public WALKey(final byte[] encodedRegionName, final TableName tablename, - final long now, List clusterIds, long nonceGroup, - final long nonce, final MultiVersionConcurrencyControl mvcc, - NavigableMap replicationScope) { - init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, - replicationScope); - } - - /** - * Create the log key for writing to somewhere. - * We maintain the tablename mainly for debugging purposes. - * A regionName is always a sub-table object. - * - * @param encodedRegionName Encoded name of the region as returned by - * HRegionInfo#getEncodedNameAsBytes(). - * @param tablename - * @param logSeqNum - * @param nonceGroup - * @param nonce - */ - // TODO: Fix being able to pass in sequenceid. - public WALKey(final byte[] encodedRegionName, - final TableName tablename, - long logSeqNum, - long nonceGroup, - long nonce, - final MultiVersionConcurrencyControl mvcc) { - init(encodedRegionName, - tablename, - logSeqNum, - EnvironmentEdgeManager.currentTime(), - EMPTY_UUIDS, - nonceGroup, - nonce, - mvcc, null); - } - - @InterfaceAudience.Private - protected void init(final byte[] encodedRegionName, - final TableName tablename, - long logSeqNum, - final long now, - List clusterIds, - long nonceGroup, - long nonce, - MultiVersionConcurrencyControl mvcc, - NavigableMap replicationScope) { - this.sequenceId = logSeqNum; - this.writeTime = now; - this.clusterIds = clusterIds; - this.encodedRegionName = encodedRegionName; - this.tablename = tablename; - this.nonceGroup = nonceGroup; - this.nonce = nonce; - this.mvcc = mvcc; - if (logSeqNum != NO_SEQUENCE_ID) { - setSequenceId(logSeqNum); - } - this.replicationScope = replicationScope; - } - - // For deserialization. DO NOT USE. See setWriteEntry below. - @InterfaceAudience.Private - protected void setSequenceId(long sequenceId) { - this.sequenceId = sequenceId; - } - - /** - * @param compressionContext Compression context to use - */ - public void setCompressionContext(CompressionContext compressionContext) { - this.compressionContext = compressionContext; - } - - /** @return encoded region name */ - public byte [] getEncodedRegionName() { - return encodedRegionName; - } - - /** @return table name */ - public TableName getTablename() { - return tablename; - } - - /** @return log sequence number - * @deprecated Use {@link #getSequenceId()} - */ - @Deprecated - public long getLogSeqNum() { - return getSequenceId(); - } - - /** - * Used to set original sequenceId for WALKey during WAL replay - */ - public void setOrigLogSeqNum(final long sequenceId) { - this.origLogSeqNum = sequenceId; - } - - /** - * Return a positive long if current WALKey is created from a replay edit; a replay edit is an - * edit that came in when replaying WALs of a crashed server. - * @return original sequence number of the WALEdit - */ - public long getOrigLogSeqNum() { - return this.origLogSeqNum; - } - - /** - * SequenceId is only available post WAL-assign. Calls before this will get you a - * {@link #NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this method - * for more on when this sequenceId comes available. - * @return long the new assigned sequence number - */ - @Override - public long getSequenceId() { - return this.sequenceId; - } + TableName getTablename(); /** * @return the write time */ - public long getWriteTime() { - return this.writeTime; - } + long getWriteTime(); - public NavigableMap getReplicationScopes() { - return replicationScope; - } - - /** @return The nonce group */ - public long getNonceGroup() { - return nonceGroup; - } - - /** @return The nonce */ - public long getNonce() { - return nonce; - } - - private void setReplicationScope(NavigableMap replicationScope) { - this.replicationScope = replicationScope; - } - - public void serializeReplicationScope(boolean serialize) { - if (!serialize) { - setReplicationScope(null); - } + /** + * @return The nonce group + */ + default long getNonceGroup() { + return HConstants.NO_NONCE; } /** - * Marks that the cluster with the given clusterId has consumed the change + * @return The nonce */ - public void addClusterId(UUID clusterId) { - if (!clusterIds.contains(clusterId)) { - clusterIds.add(clusterId); - } + default long getNonce() { + return HConstants.NO_NONCE; } + UUID getOriginatingClusterId(); + /** - * @return the set of cluster Ids that have consumed the change + * Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an + * edit that came in when replaying WALs of a crashed server. + * @return original sequence number of the WALEdit */ - public List getClusterIds() { - return clusterIds; - } - - /** - * @return the cluster id on which the change has originated. It there is no such cluster, it - * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled) - */ - public UUID getOriginatingClusterId(){ - return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0); - } - - @Override - public String toString() { - return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + - sequenceId; - } + long getOrigLogSeqNum(); /** * Produces a string map for this key. Useful for programmatic use and - * manipulation of the data stored in an WALKey, for example, printing + * manipulation of the data stored in an WALKeyImpl, for example, printing * as JSON. * * @return a Map containing data from this key */ - public Map toStringMap() { + default Map toStringMap() { Map stringMap = new HashMap<>(); - stringMap.put("table", tablename); - stringMap.put("region", Bytes.toStringBinary(encodedRegionName)); + stringMap.put("table", getTablename()); + stringMap.put("region", Bytes.toStringBinary(getEncodedRegionName())); stringMap.put("sequence", getSequenceId()); return stringMap; } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - return compareTo((WALKey)obj) == 0; - } - - @Override - public int hashCode() { - int result = Bytes.hashCode(this.encodedRegionName); - result ^= getSequenceId(); - result ^= this.writeTime; - return result; - } - - @Override - public int compareTo(WALKey o) { - int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName); - if (result == 0) { - long sid = getSequenceId(); - long otherSid = o.getSequenceId(); - if (sid < otherSid) { - result = -1; - } else if (sid > otherSid) { - result = 1; - } - if (result == 0) { - if (this.writeTime < o.writeTime) { - result = -1; - } else if (this.writeTime > o.writeTime) { - return 1; - } - } - } - // why isn't cluster id accounted for? - return result; - } - - /** - * Drop this instance's tablename byte array and instead - * hold a reference to the provided tablename. This is not - * meant to be a general purpose setter - it's only used - * to collapse references to conserve memory. - */ - void internTableName(TableName tablename) { - // We should not use this as a setter - only to swap - // in a new reference to the same table name. - assert tablename.equals(this.tablename); - this.tablename = tablename; - } - - /** - * Drop this instance's region name byte array and instead - * hold a reference to the provided region name. This is not - * meant to be a general purpose setter - it's only used - * to collapse references to conserve memory. - */ - void internEncodedRegionName(byte []encodedRegionName) { - // We should not use this as a setter - only to swap - // in a new reference to the same table name. - assert Bytes.equals(this.encodedRegionName, encodedRegionName); - this.encodedRegionName = encodedRegionName; - } - - public WALProtos.WALKey.Builder getBuilder( - WALCellCodec.ByteStringCompressor compressor) throws IOException { - WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); - if (compressionContext == null) { - builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName)); - builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName())); - } else { - builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, - compressionContext.regionDict)); - builder.setTableName(compressor.compress(this.tablename.getName(), - compressionContext.tableDict)); - } - builder.setLogSequenceNumber(getSequenceId()); - builder.setWriteTime(writeTime); - if (this.origLogSeqNum > 0) { - builder.setOrigSequenceNumber(this.origLogSeqNum); - } - if (this.nonce != HConstants.NO_NONCE) { - builder.setNonce(nonce); - } - if (this.nonceGroup != HConstants.NO_NONCE) { - builder.setNonceGroup(nonceGroup); - } - HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); - for (UUID clusterId : clusterIds) { - uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); - uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); - builder.addClusterIds(uuidBuilder.build()); - } - if (replicationScope != null) { - for (Map.Entry e : replicationScope.entrySet()) { - ByteString family = (compressionContext == null) - ? UnsafeByteOperations.unsafeWrap(e.getKey()) - : compressor.compress(e.getKey(), compressionContext.familyDict); - builder.addScopes(FamilyScope.newBuilder() - .setFamily(family).setScopeType(ScopeType.forNumber(e.getValue()))); - } - } - return builder; - } - - public void readFieldsFromPb(WALProtos.WALKey walKey, - WALCellCodec.ByteStringUncompressor uncompressor) - throws IOException { - if (this.compressionContext != null) { - this.encodedRegionName = uncompressor.uncompress( - walKey.getEncodedRegionName(), compressionContext.regionDict); - byte[] tablenameBytes = uncompressor.uncompress( - walKey.getTableName(), compressionContext.tableDict); - this.tablename = TableName.valueOf(tablenameBytes); - } else { - this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); - this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); - } - clusterIds.clear(); - for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { - clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); - } - if (walKey.hasNonceGroup()) { - this.nonceGroup = walKey.getNonceGroup(); - } - if (walKey.hasNonce()) { - this.nonce = walKey.getNonce(); - } - this.replicationScope = null; - if (walKey.getScopesCount() > 0) { - this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (FamilyScope scope : walKey.getScopesList()) { - byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : - uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); - this.replicationScope.put(family, scope.getScopeType().getNumber()); - } - } - setSequenceId(walKey.getLogSequenceNumber()); - this.writeTime = walKey.getWriteTime(); - if(walKey.hasOrigSequenceNumber()) { - this.origLogSeqNum = walKey.getOrigSequenceNumber(); - } - } - - public long estimatedSerializedSizeOf() { - long size = encodedRegionName != null ? encodedRegionName.length : 0; - size += tablename != null ? tablename.toBytes().length : 0; - if (clusterIds != null) { - size += 16 * clusterIds.size(); - } - if (nonceGroup != HConstants.NO_NONCE) { - size += Bytes.SIZEOF_LONG; // nonce group - } - if (nonce != HConstants.NO_NONCE) { - size += Bytes.SIZEOF_LONG; // nonce - } - if (replicationScope != null) { - for (Map.Entry scope: replicationScope.entrySet()) { - size += scope.getKey().length; - size += Bytes.SIZEOF_INT; - } - } - size += Bytes.SIZEOF_LONG; // sequence number - size += Bytes.SIZEOF_LONG; // write time - if (origLogSeqNum > 0) { - size += Bytes.SIZEOF_LONG; // original sequence number - } - return size; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java new file mode 100644 index 00000000000..72d5268fb6c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -0,0 +1,633 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.regionserver.SequenceId; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +// imports for things that haven't moved from regionserver.wal yet. +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * Default implementation of Key for an Entry in the WAL. + * For internal use only though Replication needs to have access. + * + * The log intermingles edits to many tables and rows, so each log entry + * identifies the appropriate table and row. Within a table and row, they're + * also sorted. + * + *

Some Transactional edits (START, COMMIT, ABORT) will not have an associated row. + * + */ +// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical +// purposes. They need to be merged into WALEntry. +@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION}) +public class WALKeyImpl implements WALKey { + public static final WALKeyImpl EMPTY_WALKEYIMPL = new WALKeyImpl(); + + public MultiVersionConcurrencyControl getMvcc() { + return mvcc; + } + + /** + * Use it to complete mvcc transaction. This WALKeyImpl was part of + * (the transaction is started when you call append; see the comment on FSHLog#append). To + * complete call + * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} + * or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} + * @return A WriteEntry gotten from local WAL subsystem. + * @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry) + */ + public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { + assert this.writeEntry != null; + return this.writeEntry; + } + + public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { + assert this.writeEntry == null; + this.writeEntry = writeEntry; + // Set our sequenceid now using WriteEntry. + this.sequenceId = writeEntry.getWriteNumber(); + } + + private byte [] encodedRegionName; + + private TableName tablename; + + /** + * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is + * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized. + */ + private long sequenceId; + + /** + * Used during WAL replay; the sequenceId of the edit when it came into the system. + */ + private long origLogSeqNum = 0; + + /** Time at which this edit was written. */ + private long writeTime; + + /** The first element in the list is the cluster id on which the change has originated */ + private List clusterIds; + + private NavigableMap replicationScope; + + private long nonceGroup = HConstants.NO_NONCE; + private long nonce = HConstants.NO_NONCE; + private MultiVersionConcurrencyControl mvcc; + + /** + * Set in a way visible to multiple threads; e.g. synchronized getter/setters. + */ + private MultiVersionConcurrencyControl.WriteEntry writeEntry; + + private CompressionContext compressionContext; + + public WALKeyImpl() { + init(null, null, 0L, HConstants.LATEST_TIMESTAMP, + new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); + } + + public WALKeyImpl(final NavigableMap replicationScope) { + init(null, null, 0L, HConstants.LATEST_TIMESTAMP, + new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope); + } + + @VisibleForTesting + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, + long logSeqNum, + final long now, UUID clusterId) { + List clusterIds = new ArrayList<>(1); + clusterIds.add(clusterId); + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, + HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); + } + + // TODO: Fix being able to pass in sequenceid. + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now) { + init(encodedRegionName, + tablename, + NO_SEQUENCE_ID, + now, + EMPTY_UUIDS, + HConstants.NO_NONCE, + HConstants.NO_NONCE, + null, null); + } + + // TODO: Fix being able to pass in sequenceid. + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, + final NavigableMap replicationScope) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, + HConstants.NO_NONCE, null, replicationScope); + } + + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, + MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, + HConstants.NO_NONCE, mvcc, replicationScope); + } + + public WALKeyImpl(final byte[] encodedRegionName, + final TableName tablename, + final long now, + MultiVersionConcurrencyControl mvcc) { + init(encodedRegionName, + tablename, + NO_SEQUENCE_ID, + now, + EMPTY_UUIDS, + HConstants.NO_NONCE, + HConstants.NO_NONCE, + mvcc, null); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + *

Used by log splitting and snapshots. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename - name of table + * @param logSeqNum - log sequence number + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + * @param nonceGroup the nonceGroup + * @param nonce the nonce + * @param mvcc the mvcc associate the WALKeyImpl + * @param replicationScope the non-default replication scope + * associated with the region's column families + */ + // TODO: Fix being able to pass in sequenceid. + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, + final long now, List clusterIds, long nonceGroup, long nonce, + MultiVersionConcurrencyControl mvcc, final NavigableMap replicationScope) { + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, + replicationScope); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + *

Used by log splitting and snapshots. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename - name of table + * @param logSeqNum - log sequence number + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + */ + // TODO: Fix being able to pass in sequenceid. + public WALKeyImpl(final byte[] encodedRegionName, + final TableName tablename, + long logSeqNum, + final long now, + List clusterIds, + long nonceGroup, + long nonce, + MultiVersionConcurrencyControl mvcc) { + init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename the tablename + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + * @param nonceGroup + * @param nonce + * @param mvcc mvcc control used to generate sequence numbers and control read/write points + */ + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, + final long now, List clusterIds, long nonceGroup, + final long nonce, final MultiVersionConcurrencyControl mvcc) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, + null); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename + * @param now Time at which this edit was written. + * @param clusterIds the clusters that have consumed the change(used in Replication) + * @param nonceGroup the nonceGroup + * @param nonce the nonce + * @param mvcc mvcc control used to generate sequence numbers and control read/write points + * @param replicationScope the non-default replication scope of the column families + */ + public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, + final long now, List clusterIds, long nonceGroup, + final long nonce, final MultiVersionConcurrencyControl mvcc, + NavigableMap replicationScope) { + init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, + replicationScope); + } + + /** + * Create the log key for writing to somewhere. + * We maintain the tablename mainly for debugging purposes. + * A regionName is always a sub-table object. + * + * @param encodedRegionName Encoded name of the region as returned by + * HRegionInfo#getEncodedNameAsBytes(). + * @param tablename + * @param logSeqNum + * @param nonceGroup + * @param nonce + */ + // TODO: Fix being able to pass in sequenceid. + public WALKeyImpl(final byte[] encodedRegionName, + final TableName tablename, + long logSeqNum, + long nonceGroup, + long nonce, + final MultiVersionConcurrencyControl mvcc) { + init(encodedRegionName, + tablename, + logSeqNum, + EnvironmentEdgeManager.currentTime(), + EMPTY_UUIDS, + nonceGroup, + nonce, + mvcc, null); + } + + @InterfaceAudience.Private + protected void init(final byte[] encodedRegionName, + final TableName tablename, + long logSeqNum, + final long now, + List clusterIds, + long nonceGroup, + long nonce, + MultiVersionConcurrencyControl mvcc, + NavigableMap replicationScope) { + this.sequenceId = logSeqNum; + this.writeTime = now; + this.clusterIds = clusterIds; + this.encodedRegionName = encodedRegionName; + this.tablename = tablename; + this.nonceGroup = nonceGroup; + this.nonce = nonce; + this.mvcc = mvcc; + if (logSeqNum != NO_SEQUENCE_ID) { + setSequenceId(logSeqNum); + } + this.replicationScope = replicationScope; + } + + // For deserialization. DO NOT USE. See setWriteEntry below. + @InterfaceAudience.Private + protected void setSequenceId(long sequenceId) { + this.sequenceId = sequenceId; + } + + /** + * @param compressionContext Compression context to use + */ + public void setCompressionContext(CompressionContext compressionContext) { + this.compressionContext = compressionContext; + } + + /** @return encoded region name */ + @Override + public byte [] getEncodedRegionName() { + return encodedRegionName; + } + + /** @return table name */ + @Override + public TableName getTablename() { + return tablename; + } + + /** @return log sequence number + * @deprecated Use {@link #getSequenceId()} + */ + @Deprecated + public long getLogSeqNum() { + return getSequenceId(); + } + + /** + * Used to set original sequenceId for WALKeyImpl during WAL replay + */ + public void setOrigLogSeqNum(final long sequenceId) { + this.origLogSeqNum = sequenceId; + } + + /** + * Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an + * edit that came in when replaying WALs of a crashed server. + * @return original sequence number of the WALEdit + */ + @Override + public long getOrigLogSeqNum() { + return this.origLogSeqNum; + } + + /** + * SequenceId is only available post WAL-assign. Calls before this will get you a + * {@link SequenceId#NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this + * method for more on when this sequenceId comes available. + * @return long the new assigned sequence number + */ + @Override + public long getSequenceId() { + return this.sequenceId; + } + + /** + * @return the write time + */ + @Override + public long getWriteTime() { + return this.writeTime; + } + + public NavigableMap getReplicationScopes() { + return replicationScope; + } + + /** @return The nonce group */ + @Override + public long getNonceGroup() { + return nonceGroup; + } + + /** @return The nonce */ + @Override + public long getNonce() { + return nonce; + } + + private void setReplicationScope(NavigableMap replicationScope) { + this.replicationScope = replicationScope; + } + + public void serializeReplicationScope(boolean serialize) { + if (!serialize) { + setReplicationScope(null); + } + } + + /** + * Marks that the cluster with the given clusterId has consumed the change + */ + public void addClusterId(UUID clusterId) { + if (!clusterIds.contains(clusterId)) { + clusterIds.add(clusterId); + } + } + + /** + * @return the set of cluster Ids that have consumed the change + */ + public List getClusterIds() { + return clusterIds; + } + + /** + * @return the cluster id on which the change has originated. It there is no such cluster, it + * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled) + */ + @Override + public UUID getOriginatingClusterId(){ + return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0); + } + + @Override + public String toString() { + return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + return compareTo((WALKey)obj) == 0; + } + + @Override + public int hashCode() { + int result = Bytes.hashCode(this.encodedRegionName); + result ^= getSequenceId(); + result ^= this.writeTime; + return result; + } + + @Override + public int compareTo(WALKey o) { + int result = Bytes.compareTo(this.encodedRegionName, o.getEncodedRegionName()); + if (result == 0) { + long sid = getSequenceId(); + long otherSid = o.getSequenceId(); + if (sid < otherSid) { + result = -1; + } else if (sid > otherSid) { + result = 1; + } + if (result == 0) { + if (this.writeTime < o.getWriteTime()) { + result = -1; + } else if (this.writeTime > o.getWriteTime()) { + return 1; + } + } + } + // why isn't cluster id accounted for? + return result; + } + + /** + * Drop this instance's tablename byte array and instead + * hold a reference to the provided tablename. This is not + * meant to be a general purpose setter - it's only used + * to collapse references to conserve memory. + */ + void internTableName(TableName tablename) { + // We should not use this as a setter - only to swap + // in a new reference to the same table name. + assert tablename.equals(this.tablename); + this.tablename = tablename; + } + + /** + * Drop this instance's region name byte array and instead + * hold a reference to the provided region name. This is not + * meant to be a general purpose setter - it's only used + * to collapse references to conserve memory. + */ + void internEncodedRegionName(byte []encodedRegionName) { + // We should not use this as a setter - only to swap + // in a new reference to the same table name. + assert Bytes.equals(this.encodedRegionName, encodedRegionName); + this.encodedRegionName = encodedRegionName; + } + + public WALProtos.WALKey.Builder getBuilder( + WALCellCodec.ByteStringCompressor compressor) throws IOException { + WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); + if (compressionContext == null) { + builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName)); + builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName())); + } else { + builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, + compressionContext.regionDict)); + builder.setTableName(compressor.compress(this.tablename.getName(), + compressionContext.tableDict)); + } + builder.setLogSequenceNumber(getSequenceId()); + builder.setWriteTime(writeTime); + if (this.origLogSeqNum > 0) { + builder.setOrigSequenceNumber(this.origLogSeqNum); + } + if (this.nonce != HConstants.NO_NONCE) { + builder.setNonce(nonce); + } + if (this.nonceGroup != HConstants.NO_NONCE) { + builder.setNonceGroup(nonceGroup); + } + HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); + for (UUID clusterId : clusterIds) { + uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); + uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); + builder.addClusterIds(uuidBuilder.build()); + } + if (replicationScope != null) { + for (Map.Entry e : replicationScope.entrySet()) { + ByteString family = (compressionContext == null) + ? UnsafeByteOperations.unsafeWrap(e.getKey()) + : compressor.compress(e.getKey(), compressionContext.familyDict); + builder.addScopes(FamilyScope.newBuilder() + .setFamily(family).setScopeType(ScopeType.forNumber(e.getValue()))); + } + } + return builder; + } + + public void readFieldsFromPb(WALProtos.WALKey walKey, + WALCellCodec.ByteStringUncompressor uncompressor) + throws IOException { + if (this.compressionContext != null) { + this.encodedRegionName = uncompressor.uncompress( + walKey.getEncodedRegionName(), compressionContext.regionDict); + byte[] tablenameBytes = uncompressor.uncompress( + walKey.getTableName(), compressionContext.tableDict); + this.tablename = TableName.valueOf(tablenameBytes); + } else { + this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); + this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); + } + clusterIds.clear(); + for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { + clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); + } + if (walKey.hasNonceGroup()) { + this.nonceGroup = walKey.getNonceGroup(); + } + if (walKey.hasNonce()) { + this.nonce = walKey.getNonce(); + } + this.replicationScope = null; + if (walKey.getScopesCount() > 0) { + this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (FamilyScope scope : walKey.getScopesList()) { + byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : + uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); + this.replicationScope.put(family, scope.getScopeType().getNumber()); + } + } + setSequenceId(walKey.getLogSequenceNumber()); + this.writeTime = walKey.getWriteTime(); + if(walKey.hasOrigSequenceNumber()) { + this.origLogSeqNum = walKey.getOrigSequenceNumber(); + } + } + + @Override + public long estimatedSerializedSizeOf() { + long size = encodedRegionName != null ? encodedRegionName.length : 0; + size += tablename != null ? tablename.toBytes().length : 0; + if (clusterIds != null) { + size += 16 * clusterIds.size(); + } + if (nonceGroup != HConstants.NO_NONCE) { + size += Bytes.SIZEOF_LONG; // nonce group + } + if (nonce != HConstants.NO_NONCE) { + size += Bytes.SIZEOF_LONG; // nonce + } + if (replicationScope != null) { + for (Map.Entry scope: replicationScope.entrySet()) { + size += scope.getKey().length; + size += Bytes.SIZEOF_INT; + } + } + size += Bytes.SIZEOF_LONG; // sequence number + size += Bytes.SIZEOF_LONG; // write time + if (origLogSeqNum > 0) { + size += Bytes.SIZEOF_LONG; // original sequence number + } + return size; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 39063a2be64..ce1713a127e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -964,7 +964,7 @@ public class WALSplitter { } private void internify(Entry entry) { - WALKey k = entry.getKey(); + WALKeyImpl k = entry.getKey(); k.internTableName(this.tableName); k.internEncodedRegionName(this.encodedRegionName); } @@ -1685,7 +1685,7 @@ public class WALSplitter { List mutations = new ArrayList<>(); Cell previousCell = null; Mutation m = null; - WALKey key = null; + WALKeyImpl key = null; WALEdit val = null; if (logEntry != null) val = new WALEdit(); @@ -1733,7 +1733,7 @@ public class WALSplitter { for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); } - key = new WALKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( + key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null); logEntry.setFirst(key); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorMetrics.java index fbcd1d5dc1e..b2c106255df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorMetrics.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -172,7 +173,7 @@ public class TestCoprocessorMetrics { @Override public void postWALWrite(ObserverContext ctx, - RegionInfo info, org.apache.hadoop.hbase.wal.WALKey logKey, + RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { walEditsCount.increment(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index 9e140f02291..0dd2c8cd8db 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -64,7 +64,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter; import org.junit.After; import org.junit.AfterClass; @@ -232,8 +232,8 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTime(); - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - long txid = log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, + // we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors. + long txid = log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, new MultiVersionConcurrencyControl(), scopes), edit, true); log.sync(txid); @@ -286,7 +286,7 @@ public class TestWALObserver { final long now = EnvironmentEdgeManager.currentTime(); long txid = log.append(hri, - new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), + new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes), new WALEdit(), true); log.sync(txid); @@ -334,7 +334,7 @@ public class TestWALObserver { addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc); } - wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, true); // sync to fs. wal.sync(); @@ -476,9 +476,9 @@ public class TestWALObserver { byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); - // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care + // uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care // about legacy coprocessors - txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, + txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, ee.currentTime(), mvcc), edit, true); } if (-1 != txid) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 099caa871f0..ea184afef5d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -88,7 +88,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -669,7 +669,7 @@ public class TestDistributedLogSplitting { byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value)); log.append(curRegionInfo, - new WALKey(curRegionInfo.getEncodedNameAsBytes(), tableName, + new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc), e, true); if (0 == i % syncEvery) { log.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 98bf48d699b..f3c5da60e99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; @@ -113,7 +113,7 @@ public class TestBulkLoad { familyName, storeFileNames)), anyBoolean())).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { - WALKey walKey = invocation.getArgument(1); + WALKeyImpl walKey = invocation.getArgument(1); MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); if (mvcc != null) { MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); @@ -138,7 +138,7 @@ public class TestBulkLoad { any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { - WALKey walKey = invocation.getArgument(1); + WALKeyImpl walKey = invocation.getArgument(1); MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); if (mvcc != null) { MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); @@ -157,7 +157,7 @@ public class TestBulkLoad { any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { - WALKey walKey = invocation.getArgument(1); + WALKeyImpl walKey = invocation.getArgument(1); MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); if (mvcc != null) { MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); @@ -177,7 +177,7 @@ public class TestBulkLoad { any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)), anyBoolean())).thenAnswer(new Answer() { public Object answer(InvocationOnMock invocation) { - WALKey walKey = invocation.getArgument(1); + WALKeyImpl walKey = invocation.getArgument(1); MultiVersionConcurrencyControl mvcc = walKey.getMvcc(); if (mvcc != null) { MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 0f46f69aa10..dc4027e6873 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -156,6 +156,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALSplitter; @@ -655,7 +656,7 @@ public class TestHRegion { WALEdit edit = new WALEdit(); edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes .toBytes(i))); - writer.append(new WAL.Entry(new WALKey(regionName, tableName, i, time, + writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); writer.close(); @@ -706,7 +707,7 @@ public class TestHRegion { WALEdit edit = new WALEdit(); edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes .toBytes(i))); - writer.append(new WAL.Entry(new WALKey(regionName, tableName, i, time, + writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); writer.close(); @@ -809,7 +810,7 @@ public class TestHRegion { edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes .toBytes(i))); } - writer.append(new WAL.Entry(new WALKey(regionName, tableName, i, time, + writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time, HConstants.DEFAULT_CLUSTER_ID), edit)); writer.close(); } @@ -906,7 +907,7 @@ public class TestHRegion { long time = System.nanoTime(); - writer.append(new WAL.Entry(new WALKey(regionName, tableName, 10, time, + writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time, HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(), compactionDescriptor))); writer.close(); @@ -4695,7 +4696,7 @@ public class TestHRegion { //verify append called or not verify(wal, expectAppend ? times(1) : never()) - .append((HRegionInfo)any(), (WALKey)any(), + .append((HRegionInfo)any(), (WALKeyImpl)any(), (WALEdit)any(), Mockito.anyBoolean()); // verify sync called or not @@ -5843,7 +5844,7 @@ public class TestHRegion { region = HRegion.openHRegion(hri, htd, rss.getWAL(hri), TEST_UTIL.getConfiguration(), rss, null); - verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any() + verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any() , editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getValue(); @@ -5914,18 +5915,18 @@ public class TestHRegion { /** * Utility method to setup a WAL mock. - * Needs to do the bit where we close latch on the WALKey on append else test hangs. + * Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs. * @return * @throws IOException */ private WAL mockWAL() throws IOException { WAL wal = mock(WAL.class); Mockito.when(wal.append((HRegionInfo)Mockito.any(), - (WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())). + (WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())). thenAnswer(new Answer() { @Override public Long answer(InvocationOnMock invocation) throws Throwable { - WALKey key = invocation.getArgument(1); + WALKeyImpl key = invocation.getArgument(1); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(); key.setWriteEntry(we); return 1L; @@ -5967,7 +5968,7 @@ public class TestHRegion { region.close(false); // 2 times, one for region open, the other close region - verify(wal, times(2)).append((HRegionInfo)any(), (WALKey)any(), + verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(), editCaptor.capture(), anyBoolean()); WALEdit edit = editCaptor.getAllValues().get(1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index bab5b2673be..8b5bf254af6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.util.StringUtils; import org.junit.After; @@ -299,7 +300,7 @@ public class TestHRegionReplayEvents { put.setDurability(Durability.SKIP_WAL); MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0); region.batchReplay(new MutationReplay[] {mutation}, - entry.getKey().getLogSeqNum()); + entry.getKey().getSequenceId()); return Integer.parseInt(Bytes.toString(put.getRow())); } @@ -1150,7 +1151,7 @@ public class TestHRegionReplayEvents { // test for region open and close secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null); verify(walSecondary, times(0)).append((HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), anyBoolean()); + (WALKeyImpl)any(), (WALEdit)any(), anyBoolean()); // test for replay prepare flush putDataByReplay(secondaryRegion, 0, 10, cq, families); @@ -1166,11 +1167,11 @@ public class TestHRegionReplayEvents { .build()); verify(walSecondary, times(0)).append((HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), anyBoolean()); + (WALKeyImpl)any(), (WALEdit)any(), anyBoolean()); secondaryRegion.close(); verify(walSecondary, times(0)).append((HRegionInfo)any(), - (WALKey)any(), (WALEdit)any(), anyBoolean()); + (WALKeyImpl)any(), (WALEdit)any(), anyBoolean()); } /** @@ -1259,7 +1260,7 @@ public class TestHRegionReplayEvents { } FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); if (flush != null) { - secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum()); + secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); } } @@ -1299,7 +1300,7 @@ public class TestHRegionReplayEvents { } FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); if (flush != null) { - secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum()); + secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); } else { replayEdit(secondaryRegion, entry); } @@ -1333,7 +1334,7 @@ public class TestHRegionReplayEvents { } FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); if (flush != null) { - secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum()); + secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java index 5d082451b04..5278f3f7ac4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWALLockup.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; @@ -229,8 +230,8 @@ public class TestWALLockup { // edit. WAL subsystem doesn't care. Put put = new Put(bytes); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(), - System.currentTimeMillis(), mvcc, scopes); + WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc, scopes); WALEdit edit = new WALEdit(); CellScanner CellScanner = put.cellScanner(); assertTrue(CellScanner.advance()); @@ -406,8 +407,8 @@ public class TestWALLockup { try { Put put = new Put(bytes); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes); - WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(), - System.currentTimeMillis(), mvcc, scopes); + WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), + htd.getTableName(), System.currentTimeMillis(), mvcc, scopes); WALEdit edit = new WALEdit(); CellScanner CellScanner = put.cellScanner(); assertTrue(CellScanner.advance()); @@ -438,8 +439,8 @@ public class TestWALLockup { // make RingBufferEventHandler sleep 1s, so the following sync // endOfBatch=false - key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), TableName.valueOf("sleep"), - System.currentTimeMillis(), mvcc, scopes); + key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), + TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc, scopes); dodgyWAL2.append(region.getRegionInfo(), key, edit, true); Thread t = new Thread("Sync") { @@ -462,7 +463,7 @@ public class TestWALLockup { e1.printStackTrace(); } // make append throw DamagedWALException - key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), + key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(), TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc, scopes); dodgyWAL2.append(region.getRegionInfo(), key, edit, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index b736fae80de..9481018ace1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.SequenceId; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -159,8 +161,8 @@ public abstract class AbstractTestFSWAL { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), - WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, + WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), + SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes); log.append(hri, key, cols, true); } @@ -415,7 +417,7 @@ public abstract class AbstractTestFSWAL { goslow.set(true); for (int i = 0; i < countPerFamily; i++) { final RegionInfo info = region.getRegionInfo(); - final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName, + final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes); wal.append(info, logkey, edits, true); region.getMVCC().completeAndWait(logkey.getWriteEntry()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java index 2f3e9b94f43..c0510d33b14 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java @@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -151,7 +151,7 @@ public abstract class AbstractTestProtobufLog { // Write log in pb format. writer = createWriter(path); for (int i = 0; i < recordCount; ++i) { - WALKey key = new WALKey( + WALKeyImpl key = new WALKeyImpl( hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); WALEdit edit = new WALEdit(); for (int j = 0; j < columnCount; ++j) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 5acbf236252..60951aa068c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -98,7 +98,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.DFSInputStream; import org.junit.After; @@ -801,14 +801,14 @@ public abstract class AbstractTestWALReplay { long now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, true); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTime(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, + wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit, true); // Sync. @@ -1140,9 +1140,9 @@ public abstract class AbstractTestWALReplay { } } - private WALKey createWALKey(final TableName tableName, final HRegionInfo hri, + private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri, final MultiVersionConcurrencyControl mvcc, NavigableMap scopes) { - return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); + return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes); } private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java index a78720dc37d..f1508e5bc93 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java @@ -23,7 +23,7 @@ import java.util.Queue; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; public class FaultyProtobufLogReader extends ProtobufLogReader { @@ -44,7 +44,7 @@ public class FaultyProtobufLogReader extends ProtobufLogReader { if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading boolean b; do { - Entry e = new Entry(new WALKey(), new WALEdit()); + Entry e = new Entry(); if (compressionContext != null) { e.setCompressionContext(compressionContext); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 1a01de5ccbd..665ceeb9a92 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -204,7 +204,7 @@ public class TestLogRollAbort { for(byte[] fam : htd.getFamiliesKeys()) { scopes.put(fam, 0); } - log.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + log.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } // Send the data to HDFS datanodes and close the HDFS writer diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index 3ba12ed041c..c990680b283 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -162,7 +162,7 @@ public class TestLogRollingNoCluster { for(byte[] fam : htd.getFamiliesKeys()) { scopes.put(fam, 0); } - final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), + final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true); Threads.sleep(ThreadLocalRandom.current().nextInt(5)); wal.sync(txid); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index ec8e77123c3..6696ce3fbdc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; @@ -115,7 +115,7 @@ public class TestWALActionsListener { for(byte[] fam : htd.getFamiliesKeys()) { scopes.put(fam, 0); } - final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), + final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), TableName.valueOf(b), 0, mvcc, scopes), edit, true); wal.sync(txid); if (i == 10) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index 71b4def5d24..122860532c8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; @@ -48,9 +47,9 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager; import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; @@ -117,7 +116,7 @@ public class TestReplicationSource { KeyValue kv = new KeyValue(b,b,b); WALEdit edit = new WALEdit(); edit.add(kv); - WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, + WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); writer.append(new WAL.Entry(key, edit)); writer.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java index 9fda8bc2c2a..608d22b01f4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -60,21 +60,22 @@ public class TestReplicationWALEntryFilters { SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter(); // meta - WALKey key1 = new WALKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), + WALKeyImpl key1 = new WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(), TableName.META_TABLE_NAME, System.currentTimeMillis()); Entry metaEntry = new Entry(key1, null); assertNull(filter.filter(metaEntry)); // ns table - WALKey key2 = - new WALKey(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis()); + WALKeyImpl key2 = + new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis()); Entry nsEntry = new Entry(key2, null); assertNull(filter.filter(nsEntry)); // user table - WALKey key3 = new WALKey(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis()); + WALKeyImpl key3 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"), + System.currentTimeMillis()); Entry userEntry = new Entry(key3, null); assertEquals(userEntry, filter.filter(userEntry)); @@ -331,8 +332,8 @@ public class TestReplicationWALEntryFilters { } private Entry createEntry(TreeMap scopes, byte[]... kvs) { - WALKey key1 = - new WALKey(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes); + WALKeyImpl key1 = + new WALKeyImpl(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes); WALEdit edit1 = new WALEdit(); for (byte[] kv : kvs) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index 1c46b570229..7e0f09024b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -412,7 +412,7 @@ public class TestRegionReplicaReplicationEndpoint { byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes(); Entry entry = new Entry( - new WALKey(encodedRegionName, toBeDisabledTable, 1), + new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1), new WALEdit()); HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java index 45682fca01f..226994be6d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -49,19 +48,16 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext; -import org.apache.hadoop.hbase.replication.ReplicationPeer; -import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; -import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -165,7 +161,8 @@ public class TestRegionReplicaReplicationEndpointNoMaster { RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { // only keep primary region's edits if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) { - entries.add(new Entry(logKey, logEdit)); + // Presume type is a WALKeyImpl + entries.add(new Entry((WALKeyImpl)logKey, logEdit)); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 83dc636a392..c4d079d892e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -82,7 +82,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -277,7 +277,7 @@ public abstract class TestReplicationSourceManager { LOG.info(i); final long txid = wal.append( hri, - new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), + new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), edit, true); wal.sync(txid); @@ -292,9 +292,8 @@ public abstract class TestReplicationSourceManager { for (int i = 0; i < 3; i++) { wal.append(hri, - new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), - edit, - true); + new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), + edit, true); } wal.sync(); @@ -310,7 +309,7 @@ public abstract class TestReplicationSourceManager { "1", 0, false, false); wal.append(hri, - new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), + new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), edit, true); wal.sync(); @@ -428,7 +427,7 @@ public abstract class TestReplicationSourceManager { // 1. Get the bulk load wal edit event WALEdit logEdit = getBulkLoadWALEdit(scope); // 2. Create wal key - WALKey logKey = new WALKey(scope); + WALKeyImpl logKey = new WALKeyImpl(scope); // 3. Get the scopes for the key Replication.scopeWALEdits(logKey, logEdit, conf, manager); @@ -444,7 +443,7 @@ public abstract class TestReplicationSourceManager { NavigableMap scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); WALEdit logEdit = getBulkLoadWALEdit(scope); // 2. Create wal key - WALKey logKey = new WALKey(scope); + WALKeyImpl logKey = new WALKeyImpl(scope); // 3. Enable bulk load hfile replication Configuration bulkLoadConf = HBaseConfiguration.create(conf); bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 6570ab5bf83..6f9c2d3499c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; import org.junit.AfterClass; @@ -374,8 +374,8 @@ public class TestWALEntryStream { private void appendToLog(String key) throws IOException { final long txid = log.append(info, - new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), - getWALEdit(key), true); + new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), + mvcc, scopes), getWALEdit(key), true); log.sync(txid); } @@ -390,9 +390,8 @@ public class TestWALEntryStream { } private void appendToLogPlus(int count) throws IOException { - final long txid = log.append(info, - new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), - getWALEdits(count), true); + final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true); log.sync(txid); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java index 472eadd5f3a..ddd11137080 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FaultyFSLog.java @@ -58,7 +58,7 @@ public class FaultyFSLog extends FSHLog { } @Override - public long append(RegionInfo info, WALKey key, + public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException { if (this.ft == FailureType.APPEND) { throw new IOException("append"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index 93c5d4f1065..d3d4d53e288 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -166,9 +166,9 @@ public class TestFSHLogProvider { * used by TestDefaultWALProviderWithHLogKey * @param scopes */ - WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp, + WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp, NavigableMap scopes) { - return new WALKey(info, tableName, timestamp, mvcc, scopes); + return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java index 77d63a4deac..ac53ae9554b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestSecureWAL.java @@ -138,7 +138,7 @@ public class TestSecureWAL { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); - wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + wal.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } wal.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 92c7458ffa4..7c1af2539ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -1,4 +1,4 @@ -/** +/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -197,7 +197,7 @@ public class TestWALFactory { edit.add(new KeyValue(rowName, family, qualifier, System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); - WALKey walKey = new WALKey(infos[i].getEncodedNameAsBytes(), tableName, + WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes); log.append(infos[i], walKey, edit, true); walKey.getWriteEntry(); @@ -266,7 +266,7 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, + wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } // Now call sync and try reading. Opening a Reader before you sync just @@ -285,7 +285,7 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, + wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } wal.sync(); @@ -307,7 +307,7 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value)); - wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName, + wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } // Now I should have written out lots of blocks. Sync then read. @@ -390,7 +390,7 @@ public class TestWALFactory { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName())); - wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + wal.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } @@ -526,7 +526,7 @@ public class TestWALFactory { final WAL log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); final long txid = log.append(info, - new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), + new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols, true); log.sync(txid); @@ -589,7 +589,7 @@ public class TestWALFactory { HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace()); final long txid = log.append(hri, - new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), + new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols, true); log.sync(txid); @@ -646,7 +646,7 @@ public class TestWALFactory { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); - log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, + log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), cols, true); } log.sync(); @@ -656,7 +656,7 @@ public class TestWALFactory { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); - log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, + log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), cols, true); log.sync(); assertEquals(COL_COUNT, visitor.increments); @@ -677,8 +677,7 @@ public class TestWALFactory { int increments = 0; @Override - public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, - WALEdit logEdit) { + public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) { increments++; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index 243e9451a69..a6d245698f9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -183,7 +183,7 @@ public class TestWALMethods { WALEdit edit = new WALEdit(); edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val")); - WALKey key = new WALKey(TEST_REGION, TEST_TABLE, seq, now, + WALKeyImpl key = new WALKeyImpl(TEST_REGION, TEST_TABLE, seq, now, HConstants.DEFAULT_CLUSTER_ID); WAL.Entry entry = new WAL.Entry(key, edit); return entry; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java index 095b3720ba8..32253106cce 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALReaderOnSecureWAL.java @@ -127,7 +127,7 @@ public class TestWALReaderOnSecureWAL { } else { kvs.add(kv); } - wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName, + wal.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes), kvs, true); } wal.sync(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java index acdb5a21557..cd8bbe470f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALRootDir.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -96,24 +96,30 @@ public class TestWALRootDir { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); - long txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true); + long txid = log.append(regionInfo, + getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true); log.sync(txid); - assertEquals("Expect 1 log have been created", 1, getWALFiles(walFs, walRootDir).size()); + assertEquals("Expect 1 log have been created", 1, + getWALFiles(walFs, walRootDir).size()); log.rollWriter(); //Create 1 more WAL - assertEquals(2, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size()); + assertEquals(2, getWALFiles(walFs, new Path(walRootDir, + HConstants.HREGION_LOGDIR_NAME)).size()); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); - txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit, true); + txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), + edit, true); log.sync(txid); log.rollWriter(); log.shutdown(); - assertEquals("Expect 3 logs in WALs dir", 3, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size()); + assertEquals("Expect 3 logs in WALs dir", 3, getWALFiles(walFs, + new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size()); } - protected WALKey getWalKey(final long time, HRegionInfo hri, final long startPoint) { - return new WALKey(hri.getEncodedNameAsBytes(), tableName, time, new MultiVersionConcurrencyControl(startPoint)); + protected WALKeyImpl getWalKey(final long time, HRegionInfo hri, final long startPoint) { + return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, time, + new MultiVersionConcurrencyControl(startPoint)); } private List getWALFiles(FileSystem fs, Path dir) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index e2188411f73..0fc0df1e64e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -379,7 +379,7 @@ public class TestWALSplit { fs.mkdirs(regiondir); long now = System.currentTimeMillis(); Entry entry = - new Entry(new WALKey(encoded, + new Entry(new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, @@ -401,7 +401,7 @@ public class TestWALSplit { fs.mkdirs(regiondir); long now = System.currentTimeMillis(); Entry entry = - new Entry(new WALKey(encoded, + new Entry(new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); @@ -1345,7 +1345,7 @@ public class TestWALSplit { .addCompactionOutput(output); WALEdit edit = WALEdit.createCompaction(hri, desc.build()); - WALKey key = new WALKey(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, + WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1, EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID); w.append(new Entry(key, edit)); w.sync(); @@ -1362,7 +1362,7 @@ public class TestWALSplit { final long time = EnvironmentEdgeManager.currentTime(); KeyValue kv = new KeyValue(region.getBytes(), WALEdit.METAFAMILY, WALEdit.REGION_EVENT, time, regionOpenDesc.toByteArray()); - final WALKey walKey = new WALKey(region.getBytes(), TABLE_NAME, 1, time, + final WALKeyImpl walKey = new WALKeyImpl(region.getBytes(), TABLE_NAME, 1, time, HConstants.DEFAULT_CLUSTER_ID); w.append( new Entry(walKey, new WALEdit().add(kv))); @@ -1390,7 +1390,7 @@ public class TestWALSplit { final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value); WALEdit edit = new WALEdit(); edit.add(cell); - return new Entry(new WALKey(region, table, seq, time, + return new Entry(new WALKeyImpl(region, table, seq, time, HConstants.DEFAULT_CLUSTER_ID), edit); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 32418033b91..de23d61e88b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -185,8 +185,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); RegionInfo hri = region.getRegionInfo(); - final WALKey logkey = - new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); + final WALKeyImpl logkey = + new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes); wal.append(hri, logkey, walEdit, true); if (!this.noSync) { if (++lastSync >= this.syncInterval) { @@ -430,7 +430,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { break; } count++; - long seqid = e.getKey().getLogSeqNum(); + long seqid = e.getKey().getSequenceId(); if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) { // sequenceIds should be increasing for every regions if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {