HBASE-19134 Make WALKey an Interface; expose Read-Only version to CPs

Created a new WALKey Interface and a WALKeyImpl. The WALKey Interface
is surfaced to Coprocessors and throughout most of the code base.
WALKeyImpl is used internally by WAL and by Replication which need
access to WALKey setters.

Methods that were deprecated in WALObserver because they were exposing
Private audience Classes have been undeprecated now we have WALKey.

Moved over to use SequenceId#getSequenceId throughout. Changed
SequenceId#getSequenceId removing the IOE.
This commit is contained in:
Michael Stack 2017-12-06 22:44:52 -08:00
parent 542060ce40
commit a5a77ae3d5
No known key found for this signature in database
GPG Key ID: 9816C7FC8ACC93D2
55 changed files with 908 additions and 825 deletions

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -133,10 +133,10 @@ public class TestWALRecordReader {
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
log.append(info, getWalKey(ts, scopes), edit, true);
log.append(info, getWalKeyImpl(ts, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.append(info, getWalKey(ts+1, scopes), edit, true);
log.append(info, getWalKeyImpl(ts+1, scopes), edit, true);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
@ -147,10 +147,10 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
log.append(info, getWalKey(ts1+1, scopes), edit, true);
log.append(info, getWalKeyImpl(ts1+1, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
log.append(info, getWalKey(ts1+2, scopes), edit, true);
log.append(info, getWalKeyImpl(ts1+2, scopes), edit, true);
log.sync();
log.shutdown();
walfactory.shutdown();
@ -192,7 +192,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
long txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@ -202,7 +202,7 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
txid = log.append(info, getWalKeyImpl(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
@ -241,8 +241,8 @@ public class TestWALRecordReader {
testSplit(splits.get(1));
}
protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
protected WALKeyImpl getWalKeyImpl(final long time, NavigableMap<byte[], Integer> scopes) {
return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
}
protected WALRecordReader getReader() {

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -76,7 +76,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Before;
@ -764,7 +764,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
.setEndKey(HConstants.EMPTY_END_ROW)
.build();
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
Replication.scopeWALEdits(new WALKey(), edit,
Replication.scopeWALEdits(new WALKeyImpl(), edit,
htable1.getConfiguration(), null);
}
@ -844,7 +844,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
long now = EnvironmentEdgeManager.currentTime();
edit.add(new KeyValue(rowName, famName, qualifier,
now, value));
WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
wal.append(hri, walKey, edit, true);
wal.sync();

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -912,26 +912,16 @@ public interface RegionObserver {
/**
* Called before a {@link WALEdit}
* replayed for this region.
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
* damage.
* @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/
@Deprecated
default void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}
/**
* Called after a {@link WALEdit}
* replayed for this region.
* Do not amend the WALKey. It is InterfaceAudience.Private. Changing the WALKey will cause
* damage.
* @param ctx the environment provided by the region server
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/
@Deprecated
default void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@ -108,15 +109,16 @@ public class ReplicationProtbufUtil {
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
for (Entry entry: entries) {
entryBuilder.clear();
// TODO: this duplicates a lot in WALKey#getBuilder
// TODO: this duplicates a lot in WALKeyImpl#getBuilder
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
WALKey key = entry.getKey();
WALKeyImpl key = entry.getKey();
keyBuilder.setEncodedRegionName(
UnsafeByteOperations.unsafeWrap(encodedRegionName == null
? key.getEncodedRegionName()
: encodedRegionName));
keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(key.getTablename().getName()));
keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
long sequenceId = key.getSequenceId();
keyBuilder.setLogSequenceNumber(sequenceId);
keyBuilder.setWriteTime(key.getWriteTime());
if (key.getNonce() != HConstants.NO_NONCE) {
keyBuilder.setNonce(key.getNonce());

View File

@ -191,6 +191,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import org.apache.hadoop.io.MultipleIOException;
@ -3343,7 +3344,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public long getOrigLogSeqNum() {
return WALKey.NO_SEQUENCE_ID;
return SequenceId.NO_SEQUENCE_ID;
}
@Override
@ -4504,16 +4505,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
firstSeqIdInLog = key.getSequenceId();
}
if (currentEditSeqId > key.getLogSeqNum()) {
if (currentEditSeqId > key.getSequenceId()) {
// when this condition is true, it means we have a serious defect because we need to
// maintain increasing SeqId for WAL edits per region
LOG.error(getRegionInfo().getEncodedName() + " : "
+ "Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key
+ "; edit=" + val);
} else {
currentEditSeqId = key.getLogSeqNum();
currentEditSeqId = key.getSequenceId();
}
currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
key.getOrigLogSeqNum() : currentEditSeqId;
@ -4571,7 +4572,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
continue;
}
// Now, figure if we should skip this edit.
if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getColumnFamilyDescriptor()
if (key.getSequenceId() <= maxSeqIdInStores.get(store.getColumnFamilyDescriptor()
.getName())) {
skippedEdits++;
continue;
@ -7550,7 +7551,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce) throws IOException {
return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
WALKey.NO_SEQUENCE_ID);
SequenceId.NO_SEQUENCE_ID);
}
/**
@ -7560,16 +7561,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(),
"WALEdit is null or empty!");
Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != WALKey.NO_SEQUENCE_ID,
Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != SequenceId.NO_SEQUENCE_ID,
"Invalid replay sequence Id for replay WALEdit!");
// Using default cluster id, as this can only happen in the originating cluster.
// A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
// here instead of WALKey directly to support legacy coprocessors.
WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup,
nonce, mvcc) :
new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
// here instead of WALKeyImpl directly to support legacy coprocessors.
WALKeyImpl walKey = walEdit.isReplay()?
new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc) :
new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc, this.getReplicationScope());
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(origLogSeqNum);

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -27,5 +26,12 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public interface SequenceId {
public long getSequenceId() throws IOException;
/**
* Used to represent when a particular wal key doesn't know/care about the sequence ordering.
*/
long NO_SEQUENCE_ID = -1;
default long getSequenceId() {
return NO_SEQUENCE_ID;
}
}

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.WriterBase;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.util.StringUtils;
@ -954,7 +955,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
}
}
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKey key,
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
@ -1017,7 +1018,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
* this append; otherwise, you will just have to wait on the WriteEntry to get filled in.
*/
@Override
public abstract long append(RegionInfo info, WALKey key, WALEdit edits, boolean inMemstore)
public abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException;
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.ipc.RemoteException;
@ -561,10 +562,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
@Override
public long append(RegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore)
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
@ -113,7 +114,8 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
int buffered = output.buffered();
entry.setCompressionContext(compressionContext);
try {
entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
entry.getKey().
getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
.writeDelimitedTo(asyncOutputWrapper);
} catch (IOException e) {
throw new AssertionError("should not happen", e);

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALPrettyPrinter;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter;
@ -441,7 +442,7 @@ public class FSHLog extends AbstractFSWAL<Writer> {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH_EXCEPTION",
justification = "Will never be null")
@Override
public long append(final RegionInfo hri, final WALKey key, final WALEdit edits,
public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
disruptor.getRingBuffer());
@ -469,17 +470,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final BlockingQueue<SyncFuture> syncFutures;
private volatile SyncFuture takeSyncFuture = null;
/**
* UPDATE!
* @param syncs the batch of calls to sync that arrived as this thread was starting; when done,
* we will put the result of the actual hdfs sync call as the result.
* @param sequence The sequence number on the ring buffer when this thread was set running. If
* this actual writer sync completes then all appends up this point have been
* flushed/synced/pushed to datanodes. If we fail, then the passed in
* <code>syncs</code> futures will return the exception to their clients; some of the
* edits may have made it out to data nodes but we will report all that were part of
* this session as failed.
*/
SyncRunner(final String name, final int maxHandlersCount) {
super(name);
// LinkedBlockingQueue because of

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CollectionUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@ -57,7 +57,7 @@ class FSWALEntry extends Entry {
private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
FSWALEntry(final long txid, final WALKey key, final WALEdit edit,
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit,
final RegionInfo regionInfo, final boolean inMemstore) {
super(key, edit);
this.inMemstore = inMemstore;

View File

@ -367,7 +367,8 @@ public class ProtobufLogReader extends ReaderBase {
entry.getKey().readFieldsFromPb(walKey, this.byteStringUncompressor);
if (!walKey.hasFollowingKvCount() || 0 == walKey.getFollowingKvCount()) {
if (LOG.isTraceEnabled()) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" + this.inputStream.getPos());
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset=" +
this.inputStream.getPos());
}
continue;
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
@ -50,8 +51,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
@Override
public void append(Entry entry) throws IOException {
entry.setCompressionContext(compressionContext);
entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()
.writeDelimitedTo(output);
((WALKeyImpl)entry.getKey()).getBuilder(compressor).
setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
for (Cell cell : entry.getEdit().getCells()) {
// cellEncoder must assume little about the stream, since we write PB and cells in turn.
cellEncoder.write(cell);

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
@ -92,7 +92,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
public Entry next(Entry reuse) throws IOException {
Entry e = reuse;
if (e == null) {
e = new Entry(new WALKey(), new WALEdit());
e = new Entry();
}
if (compressionContext != null) {
e.setCompressionContext(compressionContext);

View File

@ -74,13 +74,8 @@ public interface WALActionsListener {
/**
* Called before each write.
* @param info
* @param logKey
* @param logEdit
*/
default void visitLogEntryBeforeWrite(
RegionInfo info, WALKey logKey, WALEdit logEdit
) {}
default void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {}
/**
* @param logKey

View File

@ -139,11 +139,6 @@ public class WALCoprocessorHost
}
}
/**
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/
@Deprecated
public void preWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
// Not bypassable.
@ -157,11 +152,7 @@ public class WALCoprocessorHost
}
});
}
/**
* @deprecated Since hbase-2.0.0. No replacement. To be removed in hbase-3.0.0 and replaced
* with something that doesn't expose IntefaceAudience.Private classes.
*/
@Deprecated
public void postWALWrite(final RegionInfo info, final WALKey logKey, final WALEdit logEdit)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new WALObserverOperation() {

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
@ -59,11 +59,12 @@ public class WALUtil {
* <p>This write is for internal use only. Not for external client consumption.
* @param mvcc Used by WAL to get sequence Id for the waledit.
*/
public static WALKey writeCompactionMarker(WAL wal,
public static WALKeyImpl writeCompactionMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
@ -75,10 +76,10 @@ public class WALUtil {
*
* <p>This write is for internal use only. Not for external client consumption.
*/
public static WALKey writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = doFullAppendTransaction(wal, replicationScope, hri,
WALKeyImpl walKey = doFullAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
@ -90,11 +91,11 @@ public class WALUtil {
* Write a region open marker indicating that the region is opened.
* This write is for internal use only. Not for external client consumption.
*/
public static WALKey writeRegionEventMarker(WAL wal,
public static WALKeyImpl writeRegionEventMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri,
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = writeMarker(wal, replicationScope, hri,
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createRegionEventWALEdit(hri, r), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
@ -112,19 +113,19 @@ public class WALUtil {
* @return walKey with sequenceid filled out for this bulk load marker
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static WALKey writeBulkLoadMarkerAndSync(final WAL wal,
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc),
mvcc);
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
return walKey;
}
private static WALKey writeMarker(final WAL wal,
private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
throws IOException {
@ -138,14 +139,14 @@ public class WALUtil {
* Good for case of adding a single edit or marker to the WAL.
*
* <p>This write is for internal use only. Not for external client consumption.
* @return WALKey that was added to the WAL.
* @return WALKeyImpl that was added to the WAL.
*/
public static WALKey doFullAppendTransaction(final WAL wal,
public static WALKeyImpl doFullAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
throws IOException {
// TODO: Pass in current time to use?
WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(),
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
System.currentTimeMillis(), mvcc, replicationScope);
long trx = MultiVersionConcurrencyControl.NONE;
try {

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -55,7 +55,7 @@ public class ClusterMarkingEntryFilter implements WALEntryFilter {
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
WALEdit edit = entry.getEdit();
WALKey logKey = entry.getKey();
WALKeyImpl logKey = (WALKeyImpl)entry.getKey();
if (edit != null && !edit.isEmpty()) {
// Mark that the current cluster has the change

View File

@ -31,6 +31,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
@ -58,7 +60,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException;
@ -266,7 +267,7 @@ public class Replication implements
}
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
logKey.serializeReplicationScope(false);
((WALKeyImpl)logKey).serializeReplicationScope(false);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@ -111,7 +112,7 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
newEdit.add(cell);
}
}
newEntries.add(new Entry(entry.getKey(), newEdit));
newEntries.add(new Entry(((WALKeyImpl)entry.getKey()), newEdit));
}
replicateContext.setEntries(newEntries);
return delegator.replicate(replicateContext);

View File

@ -161,7 +161,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
public long append(RegionInfo info, WALKey key, WALEdit edits, boolean inMemstore)
public long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -116,7 +116,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
long append(RegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException;
long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore) throws IOException;
/**
* updates the seuence number of a specific store.
@ -230,10 +230,10 @@ public interface WAL extends Closeable, WALFileLengthProvider {
*/
class Entry {
private final WALEdit edit;
private final WALKey key;
private final WALKeyImpl key;
public Entry() {
this(new WALKey(), new WALEdit());
this(new WALKeyImpl(), new WALEdit());
}
/**
@ -242,7 +242,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
* @param edit log's edit
* @param key log's key
*/
public Entry(WALKey key, WALEdit edit) {
public Entry(WALKeyImpl key, WALEdit edit) {
this.key = key;
this.edit = edit;
}
@ -261,7 +261,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
*
* @return key
*/
public WALKey getKey() {
public WALKeyImpl getKey() {
return key;
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
* All the edits for a given transaction are written out as a single record, in PB format followed
* by Cells written via the WALCellEncoder.
*/
// TODO: Do not expose this class to Coprocessors. It has set methods. A CP might meddle.
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC })
public class WALEdit implements HeapSize {
@ -119,6 +120,7 @@ public class WALEdit implements HeapSize {
return this.isReplay;
}
@InterfaceAudience.Private
public WALEdit add(Cell cell) {
this.cells.add(cell);
return this;

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,630 +17,88 @@
*/
package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.SequenceId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.SequenceId;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* A Key for an entry in the WAL.
*
* The log intermingles edits to many tables and rows, so each log entry
* identifies the appropriate table and row. Within a table and row, they're
* also sorted.
*
* <p>Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
*
* Key for WAL Entry.
* Read-only. No Setters. For limited audience such as Coprocessors.
*/
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into WALEntry.
@InterfaceAudience.Private
public class WALKey implements SequenceId, Comparable<WALKey> {
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC})
public interface WALKey extends SequenceId, Comparable<WALKey> {
/**
* Used to represent when a particular wal key doesn't know/care about the sequence ordering.
* Unmodifiable empty list of UUIDs.
*/
public static final long NO_SEQUENCE_ID = -1;
List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
@InterfaceAudience.Private // For internal use only.
public MultiVersionConcurrencyControl getMvcc() {
return mvcc;
default long estimatedSerializedSizeOf() {
return 0;
}
/**
* Use it to complete mvcc transaction. This WALKey was part of
* (the transaction is started when you call append; see the comment on FSHLog#append). To
* complete call
* {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
* or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
* @return A WriteEntry gotten from local WAL subsystem.
* @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry)
* @return encoded region name
*/
@InterfaceAudience.Private // For internal use only.
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
assert this.writeEntry != null;
return this.writeEntry;
}
@InterfaceAudience.Private // For internal use only.
public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
assert this.writeEntry == null;
this.writeEntry = writeEntry;
// Set our sequenceid now using WriteEntry.
this.sequenceId = writeEntry.getWriteNumber();
}
private byte [] encodedRegionName;
private TableName tablename;
/**
* SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is
* NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized.
*/
private long sequenceId;
byte[] getEncodedRegionName();
/**
* Used during WAL replay; the sequenceId of the edit when it came into the system.
* @return table name
*/
private long origLogSeqNum = 0;
/** Time at which this edit was written. */
private long writeTime;
/** The first element in the list is the cluster id on which the change has originated */
private List<UUID> clusterIds;
private NavigableMap<byte[], Integer> replicationScope;
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
private MultiVersionConcurrencyControl mvcc;
/**
* Set in a way visible to multiple threads; e.g. synchronized getter/setters.
*/
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
private CompressionContext compressionContext;
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
public WALKey(final NavigableMap<byte[], Integer> replicationScope) {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
}
@VisibleForTesting
public WALKey(final byte[] encodedRegionName, final TableName tablename,
long logSeqNum,
final long now, UUID clusterId) {
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
// TODO: Fix being able to pass in sequenceid.
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
init(encodedRegionName,
tablename,
NO_SEQUENCE_ID,
now,
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
null, null);
}
// TODO: Fix being able to pass in sequenceid.
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, replicationScope);
}
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope);
}
public WALKey(final byte[] encodedRegionName,
final TableName tablename,
final long now,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName,
tablename,
NO_SEQUENCE_ID,
now,
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
mvcc, null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup the nonceGroup
* @param nonce the nonce
* @param mvcc the mvcc associate the WALKey
* @param replicationScope the non-default replication scope
* associated with the region's column families
*/
// TODO: Fix being able to pass in sequenceid.
public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
*/
// TODO: Fix being able to pass in sequenceid.
public WALKey(final byte[] encodedRegionName,
final TableName tablename,
long logSeqNum,
final long now,
List<UUID> clusterIds,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename the tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
* @param nonce
* @param mvcc mvcc control used to generate sequence numbers and control read/write points
*/
public WALKey(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup the nonceGroup
* @param nonce the nonce
* @param mvcc mvcc control used to generate sequence numbers and control read/write points
* @param replicationScope the non-default replication scope of the column families
*/
public WALKey(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
* @param logSeqNum
* @param nonceGroup
* @param nonce
*/
// TODO: Fix being able to pass in sequenceid.
public WALKey(final byte[] encodedRegionName,
final TableName tablename,
long logSeqNum,
long nonceGroup,
long nonce,
final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName,
tablename,
logSeqNum,
EnvironmentEdgeManager.currentTime(),
EMPTY_UUIDS,
nonceGroup,
nonce,
mvcc, null);
}
@InterfaceAudience.Private
protected void init(final byte[] encodedRegionName,
final TableName tablename,
long logSeqNum,
final long now,
List<UUID> clusterIds,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
this.encodedRegionName = encodedRegionName;
this.tablename = tablename;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
this.mvcc = mvcc;
if (logSeqNum != NO_SEQUENCE_ID) {
setSequenceId(logSeqNum);
}
this.replicationScope = replicationScope;
}
// For deserialization. DO NOT USE. See setWriteEntry below.
@InterfaceAudience.Private
protected void setSequenceId(long sequenceId) {
this.sequenceId = sequenceId;
}
/**
* @param compressionContext Compression context to use
*/
public void setCompressionContext(CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
/** @return encoded region name */
public byte [] getEncodedRegionName() {
return encodedRegionName;
}
/** @return table name */
public TableName getTablename() {
return tablename;
}
/** @return log sequence number
* @deprecated Use {@link #getSequenceId()}
*/
@Deprecated
public long getLogSeqNum() {
return getSequenceId();
}
/**
* Used to set original sequenceId for WALKey during WAL replay
*/
public void setOrigLogSeqNum(final long sequenceId) {
this.origLogSeqNum = sequenceId;
}
/**
* Return a positive long if current WALKey is created from a replay edit; a replay edit is an
* edit that came in when replaying WALs of a crashed server.
* @return original sequence number of the WALEdit
*/
public long getOrigLogSeqNum() {
return this.origLogSeqNum;
}
/**
* SequenceId is only available post WAL-assign. Calls before this will get you a
* {@link #NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this method
* for more on when this sequenceId comes available.
* @return long the new assigned sequence number
*/
@Override
public long getSequenceId() {
return this.sequenceId;
}
TableName getTablename();
/**
* @return the write time
*/
public long getWriteTime() {
return this.writeTime;
}
long getWriteTime();
public NavigableMap<byte[], Integer> getReplicationScopes() {
return replicationScope;
}
/** @return The nonce group */
public long getNonceGroup() {
return nonceGroup;
}
/** @return The nonce */
public long getNonce() {
return nonce;
}
private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) {
this.replicationScope = replicationScope;
}
public void serializeReplicationScope(boolean serialize) {
if (!serialize) {
setReplicationScope(null);
}
/**
* @return The nonce group
*/
default long getNonceGroup() {
return HConstants.NO_NONCE;
}
/**
* Marks that the cluster with the given clusterId has consumed the change
* @return The nonce
*/
public void addClusterId(UUID clusterId) {
if (!clusterIds.contains(clusterId)) {
clusterIds.add(clusterId);
}
default long getNonce() {
return HConstants.NO_NONCE;
}
UUID getOriginatingClusterId();
/**
* @return the set of cluster Ids that have consumed the change
* Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an
* edit that came in when replaying WALs of a crashed server.
* @return original sequence number of the WALEdit
*/
public List<UUID> getClusterIds() {
return clusterIds;
}
/**
* @return the cluster id on which the change has originated. It there is no such cluster, it
* returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
*/
public UUID getOriginatingClusterId(){
return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
}
@Override
public String toString() {
return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
sequenceId;
}
long getOrigLogSeqNum();
/**
* Produces a string map for this key. Useful for programmatic use and
* manipulation of the data stored in an WALKey, for example, printing
* manipulation of the data stored in an WALKeyImpl, for example, printing
* as JSON.
*
* @return a Map containing data from this key
*/
public Map<String, Object> toStringMap() {
default Map<String, Object> toStringMap() {
Map<String, Object> stringMap = new HashMap<>();
stringMap.put("table", tablename);
stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
stringMap.put("table", getTablename());
stringMap.put("region", Bytes.toStringBinary(getEncodedRegionName()));
stringMap.put("sequence", getSequenceId());
return stringMap;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
return compareTo((WALKey)obj) == 0;
}
@Override
public int hashCode() {
int result = Bytes.hashCode(this.encodedRegionName);
result ^= getSequenceId();
result ^= this.writeTime;
return result;
}
@Override
public int compareTo(WALKey o) {
int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
if (result == 0) {
long sid = getSequenceId();
long otherSid = o.getSequenceId();
if (sid < otherSid) {
result = -1;
} else if (sid > otherSid) {
result = 1;
}
if (result == 0) {
if (this.writeTime < o.writeTime) {
result = -1;
} else if (this.writeTime > o.writeTime) {
return 1;
}
}
}
// why isn't cluster id accounted for?
return result;
}
/**
* Drop this instance's tablename byte array and instead
* hold a reference to the provided tablename. This is not
* meant to be a general purpose setter - it's only used
* to collapse references to conserve memory.
*/
void internTableName(TableName tablename) {
// We should not use this as a setter - only to swap
// in a new reference to the same table name.
assert tablename.equals(this.tablename);
this.tablename = tablename;
}
/**
* Drop this instance's region name byte array and instead
* hold a reference to the provided region name. This is not
* meant to be a general purpose setter - it's only used
* to collapse references to conserve memory.
*/
void internEncodedRegionName(byte []encodedRegionName) {
// We should not use this as a setter - only to swap
// in a new reference to the same table name.
assert Bytes.equals(this.encodedRegionName, encodedRegionName);
this.encodedRegionName = encodedRegionName;
}
public WALProtos.WALKey.Builder getBuilder(
WALCellCodec.ByteStringCompressor compressor) throws IOException {
WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder();
if (compressionContext == null) {
builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName));
builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName()));
} else {
builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
compressionContext.regionDict));
builder.setTableName(compressor.compress(this.tablename.getName(),
compressionContext.tableDict));
}
builder.setLogSequenceNumber(getSequenceId());
builder.setWriteTime(writeTime);
if (this.origLogSeqNum > 0) {
builder.setOrigSequenceNumber(this.origLogSeqNum);
}
if (this.nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
if (this.nonceGroup != HConstants.NO_NONCE) {
builder.setNonceGroup(nonceGroup);
}
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
for (UUID clusterId : clusterIds) {
uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
builder.addClusterIds(uuidBuilder.build());
}
if (replicationScope != null) {
for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
ByteString family = (compressionContext == null)
? UnsafeByteOperations.unsafeWrap(e.getKey())
: compressor.compress(e.getKey(), compressionContext.familyDict);
builder.addScopes(FamilyScope.newBuilder()
.setFamily(family).setScopeType(ScopeType.forNumber(e.getValue())));
}
}
return builder;
}
public void readFieldsFromPb(WALProtos.WALKey walKey,
WALCellCodec.ByteStringUncompressor uncompressor)
throws IOException {
if (this.compressionContext != null) {
this.encodedRegionName = uncompressor.uncompress(
walKey.getEncodedRegionName(), compressionContext.regionDict);
byte[] tablenameBytes = uncompressor.uncompress(
walKey.getTableName(), compressionContext.tableDict);
this.tablename = TableName.valueOf(tablenameBytes);
} else {
this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
}
clusterIds.clear();
for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
}
if (walKey.hasNonceGroup()) {
this.nonceGroup = walKey.getNonceGroup();
}
if (walKey.hasNonce()) {
this.nonce = walKey.getNonce();
}
this.replicationScope = null;
if (walKey.getScopesCount() > 0) {
this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walKey.getScopesList()) {
byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
this.replicationScope.put(family, scope.getScopeType().getNumber());
}
}
setSequenceId(walKey.getLogSequenceNumber());
this.writeTime = walKey.getWriteTime();
if(walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
}
public long estimatedSerializedSizeOf() {
long size = encodedRegionName != null ? encodedRegionName.length : 0;
size += tablename != null ? tablename.toBytes().length : 0;
if (clusterIds != null) {
size += 16 * clusterIds.size();
}
if (nonceGroup != HConstants.NO_NONCE) {
size += Bytes.SIZEOF_LONG; // nonce group
}
if (nonce != HConstants.NO_NONCE) {
size += Bytes.SIZEOF_LONG; // nonce
}
if (replicationScope != null) {
for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) {
size += scope.getKey().length;
size += Bytes.SIZEOF_INT;
}
}
size += Bytes.SIZEOF_LONG; // sequence number
size += Bytes.SIZEOF_LONG; // write time
if (origLogSeqNum > 0) {
size += Bytes.SIZEOF_LONG; // original sequence number
}
return size;
}
}

View File

@ -0,0 +1,633 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.regionserver.SequenceId;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Default implementation of Key for an Entry in the WAL.
* For internal use only though Replication needs to have access.
*
* The log intermingles edits to many tables and rows, so each log entry
* identifies the appropriate table and row. Within a table and row, they're
* also sorted.
*
* <p>Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
*
*/
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into WALEntry.
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION})
public class WALKeyImpl implements WALKey {
public static final WALKeyImpl EMPTY_WALKEYIMPL = new WALKeyImpl();
public MultiVersionConcurrencyControl getMvcc() {
return mvcc;
}
/**
* Use it to complete mvcc transaction. This WALKeyImpl was part of
* (the transaction is started when you call append; see the comment on FSHLog#append). To
* complete call
* {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
* or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
* @return A WriteEntry gotten from local WAL subsystem.
* @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry)
*/
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
assert this.writeEntry != null;
return this.writeEntry;
}
public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
assert this.writeEntry == null;
this.writeEntry = writeEntry;
// Set our sequenceid now using WriteEntry.
this.sequenceId = writeEntry.getWriteNumber();
}
private byte [] encodedRegionName;
private TableName tablename;
/**
* SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is
* NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized.
*/
private long sequenceId;
/**
* Used during WAL replay; the sequenceId of the edit when it came into the system.
*/
private long origLogSeqNum = 0;
/** Time at which this edit was written. */
private long writeTime;
/** The first element in the list is the cluster id on which the change has originated */
private List<UUID> clusterIds;
private NavigableMap<byte[], Integer> replicationScope;
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
private MultiVersionConcurrencyControl mvcc;
/**
* Set in a way visible to multiple threads; e.g. synchronized getter/setters.
*/
private MultiVersionConcurrencyControl.WriteEntry writeEntry;
private CompressionContext compressionContext;
public WALKeyImpl() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
public WALKeyImpl(final NavigableMap<byte[], Integer> replicationScope) {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
}
@VisibleForTesting
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
long logSeqNum,
final long now, UUID clusterId) {
List<UUID> clusterIds = new ArrayList<>(1);
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now) {
init(encodedRegionName,
tablename,
NO_SEQUENCE_ID,
now,
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
null, null);
}
// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, replicationScope);
}
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope);
}
public WALKeyImpl(final byte[] encodedRegionName,
final TableName tablename,
final long now,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName,
tablename,
NO_SEQUENCE_ID,
now,
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
mvcc, null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup the nonceGroup
* @param nonce the nonce
* @param mvcc the mvcc associate the WALKeyImpl
* @param replicationScope the non-default replication scope
* associated with the region's column families
*/
// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
*/
// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName,
final TableName tablename,
long logSeqNum,
final long now,
List<UUID> clusterIds,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename the tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
* @param nonce
* @param mvcc mvcc control used to generate sequence numbers and control read/write points
*/
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup the nonceGroup
* @param nonce the nonce
* @param mvcc mvcc control used to generate sequence numbers and control read/write points
* @param replicationScope the non-default replication scope of the column families
*/
public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename
* @param logSeqNum
* @param nonceGroup
* @param nonce
*/
// TODO: Fix being able to pass in sequenceid.
public WALKeyImpl(final byte[] encodedRegionName,
final TableName tablename,
long logSeqNum,
long nonceGroup,
long nonce,
final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName,
tablename,
logSeqNum,
EnvironmentEdgeManager.currentTime(),
EMPTY_UUIDS,
nonceGroup,
nonce,
mvcc, null);
}
@InterfaceAudience.Private
protected void init(final byte[] encodedRegionName,
final TableName tablename,
long logSeqNum,
final long now,
List<UUID> clusterIds,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
this.encodedRegionName = encodedRegionName;
this.tablename = tablename;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
this.mvcc = mvcc;
if (logSeqNum != NO_SEQUENCE_ID) {
setSequenceId(logSeqNum);
}
this.replicationScope = replicationScope;
}
// For deserialization. DO NOT USE. See setWriteEntry below.
@InterfaceAudience.Private
protected void setSequenceId(long sequenceId) {
this.sequenceId = sequenceId;
}
/**
* @param compressionContext Compression context to use
*/
public void setCompressionContext(CompressionContext compressionContext) {
this.compressionContext = compressionContext;
}
/** @return encoded region name */
@Override
public byte [] getEncodedRegionName() {
return encodedRegionName;
}
/** @return table name */
@Override
public TableName getTablename() {
return tablename;
}
/** @return log sequence number
* @deprecated Use {@link #getSequenceId()}
*/
@Deprecated
public long getLogSeqNum() {
return getSequenceId();
}
/**
* Used to set original sequenceId for WALKeyImpl during WAL replay
*/
public void setOrigLogSeqNum(final long sequenceId) {
this.origLogSeqNum = sequenceId;
}
/**
* Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an
* edit that came in when replaying WALs of a crashed server.
* @return original sequence number of the WALEdit
*/
@Override
public long getOrigLogSeqNum() {
return this.origLogSeqNum;
}
/**
* SequenceId is only available post WAL-assign. Calls before this will get you a
* {@link SequenceId#NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this
* method for more on when this sequenceId comes available.
* @return long the new assigned sequence number
*/
@Override
public long getSequenceId() {
return this.sequenceId;
}
/**
* @return the write time
*/
@Override
public long getWriteTime() {
return this.writeTime;
}
public NavigableMap<byte[], Integer> getReplicationScopes() {
return replicationScope;
}
/** @return The nonce group */
@Override
public long getNonceGroup() {
return nonceGroup;
}
/** @return The nonce */
@Override
public long getNonce() {
return nonce;
}
private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) {
this.replicationScope = replicationScope;
}
public void serializeReplicationScope(boolean serialize) {
if (!serialize) {
setReplicationScope(null);
}
}
/**
* Marks that the cluster with the given clusterId has consumed the change
*/
public void addClusterId(UUID clusterId) {
if (!clusterIds.contains(clusterId)) {
clusterIds.add(clusterId);
}
}
/**
* @return the set of cluster Ids that have consumed the change
*/
public List<UUID> getClusterIds() {
return clusterIds;
}
/**
* @return the cluster id on which the change has originated. It there is no such cluster, it
* returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
*/
@Override
public UUID getOriginatingClusterId(){
return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0);
}
@Override
public String toString() {
return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
return compareTo((WALKey)obj) == 0;
}
@Override
public int hashCode() {
int result = Bytes.hashCode(this.encodedRegionName);
result ^= getSequenceId();
result ^= this.writeTime;
return result;
}
@Override
public int compareTo(WALKey o) {
int result = Bytes.compareTo(this.encodedRegionName, o.getEncodedRegionName());
if (result == 0) {
long sid = getSequenceId();
long otherSid = o.getSequenceId();
if (sid < otherSid) {
result = -1;
} else if (sid > otherSid) {
result = 1;
}
if (result == 0) {
if (this.writeTime < o.getWriteTime()) {
result = -1;
} else if (this.writeTime > o.getWriteTime()) {
return 1;
}
}
}
// why isn't cluster id accounted for?
return result;
}
/**
* Drop this instance's tablename byte array and instead
* hold a reference to the provided tablename. This is not
* meant to be a general purpose setter - it's only used
* to collapse references to conserve memory.
*/
void internTableName(TableName tablename) {
// We should not use this as a setter - only to swap
// in a new reference to the same table name.
assert tablename.equals(this.tablename);
this.tablename = tablename;
}
/**
* Drop this instance's region name byte array and instead
* hold a reference to the provided region name. This is not
* meant to be a general purpose setter - it's only used
* to collapse references to conserve memory.
*/
void internEncodedRegionName(byte []encodedRegionName) {
// We should not use this as a setter - only to swap
// in a new reference to the same table name.
assert Bytes.equals(this.encodedRegionName, encodedRegionName);
this.encodedRegionName = encodedRegionName;
}
public WALProtos.WALKey.Builder getBuilder(
WALCellCodec.ByteStringCompressor compressor) throws IOException {
WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder();
if (compressionContext == null) {
builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName));
builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName()));
} else {
builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
compressionContext.regionDict));
builder.setTableName(compressor.compress(this.tablename.getName(),
compressionContext.tableDict));
}
builder.setLogSequenceNumber(getSequenceId());
builder.setWriteTime(writeTime);
if (this.origLogSeqNum > 0) {
builder.setOrigSequenceNumber(this.origLogSeqNum);
}
if (this.nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
}
if (this.nonceGroup != HConstants.NO_NONCE) {
builder.setNonceGroup(nonceGroup);
}
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
for (UUID clusterId : clusterIds) {
uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
builder.addClusterIds(uuidBuilder.build());
}
if (replicationScope != null) {
for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
ByteString family = (compressionContext == null)
? UnsafeByteOperations.unsafeWrap(e.getKey())
: compressor.compress(e.getKey(), compressionContext.familyDict);
builder.addScopes(FamilyScope.newBuilder()
.setFamily(family).setScopeType(ScopeType.forNumber(e.getValue())));
}
}
return builder;
}
public void readFieldsFromPb(WALProtos.WALKey walKey,
WALCellCodec.ByteStringUncompressor uncompressor)
throws IOException {
if (this.compressionContext != null) {
this.encodedRegionName = uncompressor.uncompress(
walKey.getEncodedRegionName(), compressionContext.regionDict);
byte[] tablenameBytes = uncompressor.uncompress(
walKey.getTableName(), compressionContext.tableDict);
this.tablename = TableName.valueOf(tablenameBytes);
} else {
this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
}
clusterIds.clear();
for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
}
if (walKey.hasNonceGroup()) {
this.nonceGroup = walKey.getNonceGroup();
}
if (walKey.hasNonce()) {
this.nonce = walKey.getNonce();
}
this.replicationScope = null;
if (walKey.getScopesCount() > 0) {
this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walKey.getScopesList()) {
byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
this.replicationScope.put(family, scope.getScopeType().getNumber());
}
}
setSequenceId(walKey.getLogSequenceNumber());
this.writeTime = walKey.getWriteTime();
if(walKey.hasOrigSequenceNumber()) {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
}
@Override
public long estimatedSerializedSizeOf() {
long size = encodedRegionName != null ? encodedRegionName.length : 0;
size += tablename != null ? tablename.toBytes().length : 0;
if (clusterIds != null) {
size += 16 * clusterIds.size();
}
if (nonceGroup != HConstants.NO_NONCE) {
size += Bytes.SIZEOF_LONG; // nonce group
}
if (nonce != HConstants.NO_NONCE) {
size += Bytes.SIZEOF_LONG; // nonce
}
if (replicationScope != null) {
for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) {
size += scope.getKey().length;
size += Bytes.SIZEOF_INT;
}
}
size += Bytes.SIZEOF_LONG; // sequence number
size += Bytes.SIZEOF_LONG; // write time
if (origLogSeqNum > 0) {
size += Bytes.SIZEOF_LONG; // original sequence number
}
return size;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -964,7 +964,7 @@ public class WALSplitter {
}
private void internify(Entry entry) {
WALKey k = entry.getKey();
WALKeyImpl k = entry.getKey();
k.internTableName(this.tableName);
k.internEncodedRegionName(this.encodedRegionName);
}
@ -1685,7 +1685,7 @@ public class WALSplitter {
List<MutationReplay> mutations = new ArrayList<>();
Cell previousCell = null;
Mutation m = null;
WALKey key = null;
WALKeyImpl key = null;
WALEdit val = null;
if (logEntry != null) val = new WALEdit();
@ -1733,7 +1733,7 @@ public class WALSplitter {
for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
}
key = new WALKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null);
logEntry.setFirst(key);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -172,7 +173,7 @@ public class TestCoprocessorMetrics {
@Override
public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, org.apache.hadoop.hbase.wal.WALKey logKey,
RegionInfo info, WALKey logKey,
WALEdit logEdit) throws IOException {
walEditsCount.increment();
}

View File

@ -64,7 +64,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.junit.After;
import org.junit.AfterClass;
@ -232,8 +232,8 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
long txid = log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now,
// we use HLogKey here instead of WALKeyImpl directly to support legacy coprocessors.
long txid = log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now,
new MultiVersionConcurrencyControl(), scopes),
edit, true);
log.sync(txid);
@ -286,7 +286,7 @@ public class TestWALObserver {
final long now = EnvironmentEdgeManager.currentTime();
long txid = log.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
new WALEdit(), true);
log.sync(txid);
@ -334,7 +334,7 @@ public class TestWALObserver {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
}
wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
true);
// sync to fs.
wal.sync();
@ -476,9 +476,9 @@ public class TestWALObserver {
byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
// uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
// uses WALKeyImpl instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
ee.currentTime(), mvcc), edit, true);
}
if (-1 != txid) {

View File

@ -88,7 +88,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -669,7 +669,7 @@ public class TestDistributedLogSplitting {
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, COLUMN_FAMILY, qualifier, System.currentTimeMillis(), value));
log.append(curRegionInfo,
new WALKey(curRegionInfo.getEncodedNameAsBytes(), tableName,
new WALKeyImpl(curRegionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), e, true);
if (0 == i % syncEvery) {
log.sync();

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
@ -113,7 +113,7 @@ public class TestBulkLoad {
familyName, storeFileNames)),
anyBoolean())).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgument(1);
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@ -138,7 +138,7 @@ public class TestBulkLoad {
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
anyBoolean())).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgument(1);
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@ -157,7 +157,7 @@ public class TestBulkLoad {
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
anyBoolean())).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgument(1);
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@ -177,7 +177,7 @@ public class TestBulkLoad {
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
anyBoolean())).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgument(1);
WALKeyImpl walKey = invocation.getArgument(1);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();

View File

@ -156,6 +156,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.wal.WALSplitter;
@ -655,7 +656,7 @@ public class TestHRegion {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
.toBytes(i)));
writer.append(new WAL.Entry(new WALKey(regionName, tableName, i, time,
writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
HConstants.DEFAULT_CLUSTER_ID), edit));
writer.close();
@ -706,7 +707,7 @@ public class TestHRegion {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
.toBytes(i)));
writer.append(new WAL.Entry(new WALKey(regionName, tableName, i, time,
writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
HConstants.DEFAULT_CLUSTER_ID), edit));
writer.close();
@ -809,7 +810,7 @@ public class TestHRegion {
edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
.toBytes(i)));
}
writer.append(new WAL.Entry(new WALKey(regionName, tableName, i, time,
writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, i, time,
HConstants.DEFAULT_CLUSTER_ID), edit));
writer.close();
}
@ -906,7 +907,7 @@ public class TestHRegion {
long time = System.nanoTime();
writer.append(new WAL.Entry(new WALKey(regionName, tableName, 10, time,
writer.append(new WAL.Entry(new WALKeyImpl(regionName, tableName, 10, time,
HConstants.DEFAULT_CLUSTER_ID), WALEdit.createCompaction(region.getRegionInfo(),
compactionDescriptor)));
writer.close();
@ -4695,7 +4696,7 @@ public class TestHRegion {
//verify append called or not
verify(wal, expectAppend ? times(1) : never())
.append((HRegionInfo)any(), (WALKey)any(),
.append((HRegionInfo)any(), (WALKeyImpl)any(),
(WALEdit)any(), Mockito.anyBoolean());
// verify sync called or not
@ -5843,7 +5844,7 @@ public class TestHRegion {
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
TEST_UTIL.getConfiguration(), rss, null);
verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
verify(wal, times(1)).append((HRegionInfo)any(), (WALKeyImpl)any()
, editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
@ -5914,18 +5915,18 @@ public class TestHRegion {
/**
* Utility method to setup a WAL mock.
* Needs to do the bit where we close latch on the WALKey on append else test hangs.
* Needs to do the bit where we close latch on the WALKeyImpl on append else test hangs.
* @return
* @throws IOException
*/
private WAL mockWAL() throws IOException {
WAL wal = mock(WAL.class);
Mockito.when(wal.append((HRegionInfo)Mockito.any(),
(WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
(WALKeyImpl)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
thenAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
WALKey key = invocation.getArgument(1);
WALKeyImpl key = invocation.getArgument(1);
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
key.setWriteEntry(we);
return 1L;
@ -5967,7 +5968,7 @@ public class TestHRegion {
region.close(false);
// 2 times, one for region open, the other close region
verify(wal, times(2)).append((HRegionInfo)any(), (WALKey)any(),
verify(wal, times(2)).append((HRegionInfo)any(), (WALKeyImpl)any(),
editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getAllValues().get(1);

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import org.apache.hadoop.util.StringUtils;
import org.junit.After;
@ -299,7 +300,7 @@ public class TestHRegionReplayEvents {
put.setDurability(Durability.SKIP_WAL);
MutationReplay mutation = new MutationReplay(MutationType.PUT, put, 0, 0);
region.batchReplay(new MutationReplay[] {mutation},
entry.getKey().getLogSeqNum());
entry.getKey().getSequenceId());
return Integer.parseInt(Bytes.toString(put.getRow()));
}
@ -1150,7 +1151,7 @@ public class TestHRegionReplayEvents {
// test for region open and close
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
(WALKeyImpl)any(), (WALEdit)any(), anyBoolean());
// test for replay prepare flush
putDataByReplay(secondaryRegion, 0, 10, cq, families);
@ -1166,11 +1167,11 @@ public class TestHRegionReplayEvents {
.build());
verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
(WALKeyImpl)any(), (WALEdit)any(), anyBoolean());
secondaryRegion.close();
verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
(WALKeyImpl)any(), (WALEdit)any(), anyBoolean());
}
/**
@ -1259,7 +1260,7 @@ public class TestHRegionReplayEvents {
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
}
}
@ -1299,7 +1300,7 @@ public class TestHRegionReplayEvents {
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
} else {
replayEdit(secondaryRegion, entry);
}
@ -1333,7 +1334,7 @@ public class TestHRegionReplayEvents {
}
FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0));
if (flush != null) {
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getLogSeqNum());
secondaryRegion.replayWALFlushMarker(flush, entry.getKey().getSequenceId());
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -229,8 +230,8 @@ public class TestWALLockup {
// edit. WAL subsystem doesn't care.
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(),
System.currentTimeMillis(), mvcc, scopes);
WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@ -406,8 +407,8 @@ public class TestWALLockup {
try {
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(),
System.currentTimeMillis(), mvcc, scopes);
WALKeyImpl key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
htd.getTableName(), System.currentTimeMillis(), mvcc, scopes);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@ -438,8 +439,8 @@ public class TestWALLockup {
// make RingBufferEventHandler sleep 1s, so the following sync
// endOfBatch=false
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), TableName.valueOf("sleep"),
System.currentTimeMillis(), mvcc, scopes);
key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("sleep"), System.currentTimeMillis(), mvcc, scopes);
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);
Thread t = new Thread("Sync") {
@ -462,7 +463,7 @@ public class TestWALLockup {
e1.printStackTrace();
}
// make append throw DamagedWALException
key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(),
key = new WALKeyImpl(region.getRegionInfo().getEncodedNameAsBytes(),
TableName.valueOf("DamagedWALException"), System.currentTimeMillis(), mvcc, scopes);
dodgyWAL2.append(region.getRegionInfo(), key, edit, true);

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.SequenceId;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -159,8 +161,8 @@ public abstract class AbstractTestFSWAL {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(),
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, scopes);
log.append(hri, key, cols, true);
}
@ -415,7 +417,7 @@ public abstract class AbstractTestFSWAL {
goslow.set(true);
for (int i = 0; i < countPerFamily; i++) {
final RegionInfo info = region.getRegionInfo();
final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
wal.append(info, logkey, edits, true);
region.getMVCC().completeAndWait(logkey.getWriteEntry());

View File

@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -151,7 +151,7 @@ public abstract class AbstractTestProtobufLog<W extends Closeable> {
// Write log in pb format.
writer = createWriter(path);
for (int i = 0; i < recordCount; ++i) {
WALKey key = new WALKey(
WALKeyImpl key = new WALKeyImpl(
hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();
for (int j = 0; j < columnCount; ++j) {

View File

@ -98,7 +98,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.junit.After;
@ -801,14 +801,14 @@ public abstract class AbstractTestWALReplay {
long now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
true);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
true);
// Sync.
@ -1140,9 +1140,9 @@ public abstract class AbstractTestWALReplay {
}
}
private WALKey createWALKey(final TableName tableName, final HRegionInfo hri,
private WALKeyImpl createWALKey(final TableName tableName, final HRegionInfo hri,
final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
}
private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,

View File

@ -23,7 +23,7 @@ import java.util.Queue;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
public class FaultyProtobufLogReader extends ProtobufLogReader {
@ -44,7 +44,7 @@ public class FaultyProtobufLogReader extends ProtobufLogReader {
if (nextQueue.isEmpty()) { // Read the whole thing at once and fake reading
boolean b;
do {
Entry e = new Entry(new WALKey(), new WALEdit());
Entry e = new Entry();
if (compressionContext != null) {
e.setCompressionContext(compressionContext);
}

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@ -204,7 +204,7 @@ public class TestLogRollAbort {
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
log.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
log.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Send the data to HDFS datanodes and close the HDFS writer

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -162,7 +162,7 @@ public class TestLogRollingNoCluster {
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
Threads.sleep(ThreadLocalRandom.current().nextInt(5));
wal.sync(txid);

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@ -115,7 +115,7 @@ public class TestWALActionsListener {
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
final long txid = wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
TableName.valueOf(b), 0, mvcc, scopes), edit, true);
wal.sync(txid);
if (i == 10) {

View File

@ -34,7 +34,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
@ -48,9 +47,9 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
@ -117,7 +116,7 @@ public class TestReplicationSource {
KeyValue kv = new KeyValue(b,b,b);
WALEdit edit = new WALEdit();
edit.add(kv);
WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
HConstants.DEFAULT_CLUSTER_ID);
writer.append(new WAL.Entry(key, edit));
writer.sync();

View File

@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -60,21 +60,22 @@ public class TestReplicationWALEntryFilters {
SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
// meta
WALKey key1 = new WALKey(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
WALKeyImpl key1 = new WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, System.currentTimeMillis());
Entry metaEntry = new Entry(key1, null);
assertNull(filter.filter(metaEntry));
// ns table
WALKey key2 =
new WALKey(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis());
WALKeyImpl key2 =
new WALKeyImpl(new byte[0], TableName.NAMESPACE_TABLE_NAME, System.currentTimeMillis());
Entry nsEntry = new Entry(key2, null);
assertNull(filter.filter(nsEntry));
// user table
WALKey key3 = new WALKey(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis());
WALKeyImpl key3 = new WALKeyImpl(new byte[0], TableName.valueOf("foo"),
System.currentTimeMillis());
Entry userEntry = new Entry(key3, null);
assertEquals(userEntry, filter.filter(userEntry));
@ -331,8 +332,8 @@ public class TestReplicationWALEntryFilters {
}
private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
WALKey key1 =
new WALKey(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes);
WALKeyImpl key1 =
new WALKeyImpl(new byte[0], TableName.valueOf("foo"), System.currentTimeMillis(), scopes);
WALEdit edit1 = new WALEdit();
for (byte[] kv : kvs) {

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -412,7 +412,7 @@ public class TestRegionReplicaReplicationEndpoint {
byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
Entry entry = new Entry(
new WALKey(encodedRegionName, toBeDisabledTable, 1),
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit());
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -49,19 +48,16 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@ -165,7 +161,8 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
// only keep primary region's edits
if (logKey.getTablename().equals(tableName) && info.getReplicaId() == 0) {
entries.add(new Entry(logKey, logEdit));
// Presume type is a WALKeyImpl
entries.add(new Entry((WALKeyImpl)logKey, logEdit));
}
}
}

View File

@ -82,7 +82,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -277,7 +277,7 @@ public abstract class TestReplicationSourceManager {
LOG.info(i);
final long txid = wal.append(
hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
wal.sync(txid);
@ -292,9 +292,8 @@ public abstract class TestReplicationSourceManager {
for (int i = 0; i < 3; i++) {
wal.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit, true);
}
wal.sync();
@ -310,7 +309,7 @@ public abstract class TestReplicationSourceManager {
"1", 0, false, false);
wal.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
wal.sync();
@ -428,7 +427,7 @@ public abstract class TestReplicationSourceManager {
// 1. Get the bulk load wal edit event
WALEdit logEdit = getBulkLoadWALEdit(scope);
// 2. Create wal key
WALKey logKey = new WALKey(scope);
WALKeyImpl logKey = new WALKeyImpl(scope);
// 3. Get the scopes for the key
Replication.scopeWALEdits(logKey, logEdit, conf, manager);
@ -444,7 +443,7 @@ public abstract class TestReplicationSourceManager {
NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
WALEdit logEdit = getBulkLoadWALEdit(scope);
// 2. Create wal key
WALKey logKey = new WALKey(scope);
WALKeyImpl logKey = new WALKeyImpl(scope);
// 3. Enable bulk load hfile replication
Configuration bulkLoadConf = HBaseConfiguration.create(conf);
bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
@ -374,8 +374,8 @@ public class TestWALEntryStream {
private void appendToLog(String key) throws IOException {
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes),
getWALEdit(key), true);
new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(),
mvcc, scopes), getWALEdit(key), true);
log.sync(txid);
}
@ -390,9 +390,8 @@ public class TestWALEntryStream {
}
private void appendToLogPlus(int count) throws IOException {
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes),
getWALEdits(count), true);
final long txid = log.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
log.sync(txid);
}

View File

@ -58,7 +58,7 @@ public class FaultyFSLog extends FSHLog {
}
@Override
public long append(RegionInfo info, WALKey key,
public long append(RegionInfo info, WALKeyImpl key,
WALEdit edits, boolean inMemstore) throws IOException {
if (this.ft == FailureType.APPEND) {
throw new IOException("append");

View File

@ -166,9 +166,9 @@ public class TestFSHLogProvider {
* used by TestDefaultWALProviderWithHLogKey
* @param scopes
*/
WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp,
WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long timestamp,
NavigableMap<byte[], Integer> scopes) {
return new WALKey(info, tableName, timestamp, mvcc, scopes);
return new WALKeyImpl(info, tableName, timestamp, mvcc, scopes);
}
/**

View File

@ -138,7 +138,7 @@ public class TestSecureWAL {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
wal.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();

View File

@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -197,7 +197,7 @@ public class TestWALFactory {
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
LOG.info("Region " + i + ": " + edit);
WALKey walKey = new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
WALKeyImpl walKey = new WALKeyImpl(infos[i].getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes);
log.append(infos[i], walKey, edit, true);
walKey.getWriteEntry();
@ -266,7 +266,7 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Now call sync and try reading. Opening a Reader before you sync just
@ -285,7 +285,7 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();
@ -307,7 +307,7 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
wal.append(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Now I should have written out lots of blocks. Sync then read.
@ -390,7 +390,7 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
wal.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes),
kvs, true);
}
@ -526,7 +526,7 @@ public class TestWALFactory {
final WAL log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
new WALKeyImpl(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
mvcc, scopes),
cols, true);
log.sync(txid);
@ -589,7 +589,7 @@ public class TestWALFactory {
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
final long txid = log.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
new WALKeyImpl(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
mvcc, scopes),
cols, true);
log.sync(txid);
@ -646,7 +646,7 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[]{(byte) (i + '0')}));
log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), cols, true);
}
log.sync();
@ -656,7 +656,7 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(11)),
timestamp, new byte[]{(byte) (11 + '0')}));
log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
log.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), cols, true);
log.sync();
assertEquals(COL_COUNT, visitor.increments);
@ -677,8 +677,7 @@ public class TestWALFactory {
int increments = 0;
@Override
public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey,
WALEdit logEdit) {
public void visitLogEntryBeforeWrite(RegionInfo info, WALKey logKey, WALEdit logEdit) {
increments++;
}

View File

@ -183,7 +183,7 @@ public class TestWALMethods {
WALEdit edit = new WALEdit();
edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val"));
WALKey key = new WALKey(TEST_REGION, TEST_TABLE, seq, now,
WALKeyImpl key = new WALKeyImpl(TEST_REGION, TEST_TABLE, seq, now,
HConstants.DEFAULT_CLUSTER_ID);
WAL.Entry entry = new WAL.Entry(key, edit);
return entry;

View File

@ -127,7 +127,7 @@ public class TestWALReaderOnSecureWAL {
} else {
kvs.add(kv);
}
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
wal.append(regioninfo, new WALKeyImpl(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -96,24 +96,30 @@ public class TestWALRootDir {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
long txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true);
long txid = log.append(regionInfo,
getWalKey(System.currentTimeMillis(), regionInfo, 0), edit, true);
log.sync(txid);
assertEquals("Expect 1 log have been created", 1, getWALFiles(walFs, walRootDir).size());
assertEquals("Expect 1 log have been created", 1,
getWALFiles(walFs, walRootDir).size());
log.rollWriter();
//Create 1 more WAL
assertEquals(2, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
assertEquals(2, getWALFiles(walFs, new Path(walRootDir,
HConstants.HREGION_LOGDIR_NAME)).size());
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1), edit, true);
txid = log.append(regionInfo, getWalKey(System.currentTimeMillis(), regionInfo, 1),
edit, true);
log.sync(txid);
log.rollWriter();
log.shutdown();
assertEquals("Expect 3 logs in WALs dir", 3, getWALFiles(walFs, new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
assertEquals("Expect 3 logs in WALs dir", 3, getWALFiles(walFs,
new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME)).size());
}
protected WALKey getWalKey(final long time, HRegionInfo hri, final long startPoint) {
return new WALKey(hri.getEncodedNameAsBytes(), tableName, time, new MultiVersionConcurrencyControl(startPoint));
protected WALKeyImpl getWalKey(final long time, HRegionInfo hri, final long startPoint) {
return new WALKeyImpl(hri.getEncodedNameAsBytes(), tableName, time,
new MultiVersionConcurrencyControl(startPoint));
}
private List<FileStatus> getWALFiles(FileSystem fs, Path dir)

View File

@ -379,7 +379,7 @@ public class TestWALSplit {
fs.mkdirs(regiondir);
long now = System.currentTimeMillis();
Entry entry =
new Entry(new WALKey(encoded,
new Entry(new WALKeyImpl(encoded,
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
@ -401,7 +401,7 @@ public class TestWALSplit {
fs.mkdirs(regiondir);
long now = System.currentTimeMillis();
Entry entry =
new Entry(new WALKey(encoded,
new Entry(new WALKeyImpl(encoded,
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
@ -1345,7 +1345,7 @@ public class TestWALSplit {
.addCompactionOutput(output);
WALEdit edit = WALEdit.createCompaction(hri, desc.build());
WALKey key = new WALKey(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
w.append(new Entry(key, edit));
w.sync();
@ -1362,7 +1362,7 @@ public class TestWALSplit {
final long time = EnvironmentEdgeManager.currentTime();
KeyValue kv = new KeyValue(region.getBytes(), WALEdit.METAFAMILY, WALEdit.REGION_EVENT,
time, regionOpenDesc.toByteArray());
final WALKey walKey = new WALKey(region.getBytes(), TABLE_NAME, 1, time,
final WALKeyImpl walKey = new WALKeyImpl(region.getBytes(), TABLE_NAME, 1, time,
HConstants.DEFAULT_CLUSTER_ID);
w.append(
new Entry(walKey, new WALEdit().add(kv)));
@ -1390,7 +1390,7 @@ public class TestWALSplit {
final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
WALEdit edit = new WALEdit();
edit.add(cell);
return new Entry(new WALKey(region, table, seq, time,
return new Entry(new WALKeyImpl(region, table, seq, time,
HConstants.DEFAULT_CLUSTER_ID), edit);
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -185,8 +185,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
RegionInfo hri = region.getRegionInfo();
final WALKey logkey =
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
final WALKeyImpl logkey =
new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
wal.append(hri, logkey, walEdit, true);
if (!this.noSync) {
if (++lastSync >= this.syncInterval) {
@ -430,7 +430,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
break;
}
count++;
long seqid = e.getKey().getLogSeqNum();
long seqid = e.getKey().getSequenceId();
if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
// sequenceIds should be increasing for every regions
if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {