HBASE-15205 Do not find the replication scope for every WAL#append() (Ram)

This commit is contained in:
ramkrishna 2016-02-26 22:30:55 +05:30
parent 538815d82a
commit 8f2bd06019
42 changed files with 687 additions and 391 deletions

View File

@ -134,7 +134,7 @@ public class ReplicationProtbufUtil {
keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
}
WALEdit edit = entry.getEdit();
NavigableMap<byte[], Integer> scopes = key.getScopes();
NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
if (scopes != null && !scopes.isEmpty()) {
for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
scopeBuilder.setFamily(ByteStringer.wrap(scope.getKey()));

View File

@ -17,19 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.TextFormat;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.EOFException;
import java.io.FileNotFoundException;
@ -195,6 +183,20 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.TextFormat;
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@ -583,6 +585,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final MetricsRegionWrapperImpl metricsRegionWrapper;
private final Durability durability;
private final boolean regionStatsEnabled;
// Stores the replication scope of the various column families of the table
// that has non-default scope
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
/**
* HRegion constructor. This constructor should only be used for testing and
@ -661,6 +667,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true);
this.htableDescriptor = htd;
Set<byte[]> families = this.htableDescriptor.getFamiliesKeys();
for (byte[] family : families) {
if (!replicationScope.containsKey(family)) {
int scope = htd.getFamily(family).getScope();
// Only store those families that has NON-DEFAULT scope
if (scope != REPLICATION_SCOPE_LOCAL) {
// Do a copy before storing it here.
replicationScope.put(Bytes.copy(family), scope);
}
}
}
this.rsServices = rsServices;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
setHTableSpecificConf();
@ -971,7 +988,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionOpenDesc, mvcc);
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
mvcc);
}
private void writeRegionCloseMarker(WAL wal) throws IOException {
@ -979,7 +997,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, mvcc);
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
mvcc);
// Store SeqId in HDFS when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
@ -2285,7 +2304,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
// No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
mvcc);
}
// Prepare flush (take a snapshot)
@ -2334,7 +2354,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false,
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
mvcc);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" +
@ -2379,7 +2399,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR));
try {
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
mvcc);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@ -2449,7 +2470,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
mvcc);
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@ -2462,7 +2484,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc);
WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "failed writing ABORT_FLUSH marker to WAL", ex);
@ -3139,13 +3161,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!replay) {
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc);
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc,
this.getReplicationScope());
}
// TODO: Use the doAppend methods below... complicated by the replay stuff above.
try {
long txid =
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
long txid = this.wal.append(this.getRegionInfo(), walKey,
walEdit, true);
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
} catch (IOException ioe) {
@ -3271,8 +3294,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
currentNonceGroup, currentNonce, mvcc);
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Complete the mvcc transaction started down in append else it will block others
this.mvcc.complete(walKey.getWriteEntry());
}
@ -5389,7 +5412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(
this.getRegionInfo().getTable(),
ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(),
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
if (this.rsServices != null) {
@ -6319,6 +6342,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return r.openHRegion(reporter);
}
@VisibleForTesting
public NavigableMap<byte[], Integer> getReplicationScope() {
return this.replicationScope;
}
/**
* Useful when reopening a closed region (normally for unit tests)
@ -7069,10 +7096,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// here instead of WALKey directly to support legacy coprocessors.
WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc);
nonceGroup, nonce, mvcc, this.getReplicationScope());
try {
long txid =
this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true);
this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
// Call sync on our edit.
if (txid != 0) sync(txid, durability);
writeEntry = walKey.getWriteEntry();
@ -7362,7 +7389,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
46 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
47 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@ -7385,7 +7412,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores
(2 * ClassSize.REENTRANT_LOCK) + // lock, updatesLock
MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
+ ClassSize.TREEMAP // maxSeqIdInStores
+ 2 * ClassSize.TREEMAP // maxSeqIdInStores, replicationScopes
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
+ ClassSize.STORE_SERVICES // store services
;

View File

@ -1307,7 +1307,7 @@ public class HStore implements Store {
// Fix reaching into Region to get the maxWaitForSeqId.
// Does this method belong in Region altogether given it is making so many references up there?
// Could be Region#writeCompactionMarker(compactionDescriptor);
WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getTableDesc(),
WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
}

View File

@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.Bytes;
@ -1083,8 +1082,8 @@ public class FSHLog implements WAL {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
justification="Will never be null")
@Override
public long append(final HTableDescriptor htd, final HRegionInfo hri, final WALKey key,
final WALEdit edits, final boolean inMemstore) throws IOException {
public long append(final HRegionInfo hri,
final WALKey key, final WALEdit edits, final boolean inMemstore) throws IOException {
if (this.closed) throw new IOException("Cannot append; log is closed");
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
// single consuming thread. Don't have to worry about it.
@ -1100,7 +1099,7 @@ public class FSHLog implements WAL {
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
// edit with its edit/sequence id.
// TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
entry = new FSWALEntry(sequence, key, edits, htd, hri, inMemstore);
entry = new FSWALEntry(sequence, key, edits, hri, inMemstore);
truck.loadPayload(entry, scope.detach());
} finally {
this.disruptor.getRingBuffer().publish(sequence);
@ -1878,14 +1877,12 @@ public class FSHLog implements WAL {
entry.getEdit())) {
if (entry.getEdit().isReplay()) {
// Set replication scope null so that this won't be replicated
entry.getKey().setScopes(null);
entry.getKey().serializeReplicationScope(false);
}
}
if (!listeners.isEmpty()) {
for (WALActionsListener i: listeners) {
// TODO: Why does listener take a table description and CPs take a regioninfo? Fix.
i.visitLogEntryBeforeWrite(entry.getHTableDescriptor(), entry.getKey(),
entry.getEdit());
i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit());
}
}

View File

@ -26,7 +26,6 @@ import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
@ -51,15 +50,13 @@ class FSWALEntry extends Entry {
// they are only in memory and held here while passing over the ring buffer.
private final transient long sequence;
private final transient boolean inMemstore;
private final transient HTableDescriptor htd;
private final transient HRegionInfo hri;
private final Set<byte[]> familyNames;
FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
final HTableDescriptor htd, final HRegionInfo hri, final boolean inMemstore) {
final HRegionInfo hri, final boolean inMemstore) {
super(key, edit);
this.inMemstore = inMemstore;
this.htd = htd;
this.hri = hri;
this.sequence = sequence;
if (inMemstore) {
@ -71,6 +68,7 @@ class FSWALEntry extends Entry {
Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
for (Cell cell : cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
// TODO: Avoid this clone?
familySet.add(CellUtil.cloneFamily(cell));
}
}
@ -89,10 +87,6 @@ class FSWALEntry extends Entry {
return this.inMemstore;
}
HTableDescriptor getHTableDescriptor() {
return this.htd;
}
HRegionInfo getHRegionInfo() {
return this.hri;
}

View File

@ -24,6 +24,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.commons.logging.Log;
@ -67,7 +68,7 @@ public class HLogKey extends WALKey implements Writable {
}
public HLogKey(final byte[] encodedRegionName, final TableName tablename) {
super(encodedRegionName, tablename);
super(encodedRegionName, tablename, null);
}
@VisibleForTesting
@ -75,11 +76,15 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename, now);
}
public HLogKey(final byte[] encodedRegionName,
final TableName tablename,
final long now,
final MultiVersionConcurrencyControl mvcc) {
super(encodedRegionName, tablename, now, mvcc);
@VisibleForTesting
public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
final NavigableMap<byte[], Integer> replicationScope) {
super(encodedRegionName, tablename, now, replicationScope);
}
public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now,
final MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> scopes) {
super(encodedRegionName, tablename, now, mvcc, scopes);
}
/**
@ -107,6 +112,35 @@ public class HLogKey extends WALKey implements Writable {
super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup the noncegroup
* @param nonce the nonce
* @param replicationScope the replicationScope of the non-default column families' of the region
*/
public HLogKey(
final byte[] encodedRegionName,
final TableName tablename,
long logSeqNum,
final long now,
List<UUID> clusterIds,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> replicationScope) {
super(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
@ -192,7 +226,7 @@ public class HLogKey extends WALKey implements Writable {
// encodes the length of encodedRegionName.
// If < 0 we just read the version and the next vint is the length.
// @see Bytes#readByteArray(DataInput)
setScopes(null); // writable HLogKey does not contain scopes
serializeReplicationScope(false); // writable HLogKey does not contain scopes
int len = WritableUtils.readVInt(in);
byte[] tablenameBytes = null;
if (len < 0) {

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.wal.WALKey;
@ -85,7 +84,6 @@ public interface WALActionsListener {
);
/**
* @param htd
* @param logKey
* @param logEdit TODO: Retire this in favor of
* {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
@ -93,8 +91,7 @@ public interface WALActionsListener {
* <code>htd</code>.
* @throws IOException If failed to parse the WALEdit
*/
void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
throws IOException;
void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException;
/**
* For notification post append to the writer. Used by metrics system at least.
@ -135,8 +132,7 @@ public interface WALActionsListener {
public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
throws IOException {
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
}
@Override

View File

@ -20,11 +20,11 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -58,10 +58,11 @@ public class WALUtil {
* <p>This write is for internal use only. Not for external client consumption.
* @param mvcc Used by WAL to get sequence Id for the waledit.
*/
public static WALKey writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc)
public static WALKey writeCompactionMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri, final CompactionDescriptor c,
MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc);
WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
@ -73,11 +74,11 @@ public class WALUtil {
*
* <p>This write is for internal use only. Not for external client consumption.
*/
public static WALKey writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey =
doFullAppendTransaction(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
public static WALKey writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
HRegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = doFullAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, sync);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
@ -88,10 +89,12 @@ public class WALUtil {
* Write a region open marker indicating that the region is opened.
* This write is for internal use only. Not for external client consumption.
*/
public static WALKey writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri,
public static WALKey writeRegionEventMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, HRegionInfo hri,
final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc);
WALKey walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createRegionEventWALEdit(hri, r), mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
@ -102,28 +105,30 @@ public class WALUtil {
* Write a log marker that a bulk load has succeeded and is about to be committed.
* This write is for internal use only. Not for external client consumption.
* @param wal The log to write into.
* @param htd A description of the table that we are bulk loading into.
* @param replicationScope The replication scope of the families in the HRegion
* @param hri A description of the region in the table that we are bulk loading into.
* @param desc A protocol buffers based description of the client's bulk loading request
* @return walKey with sequenceid filled out for this bulk load marker
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd,
final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc,
final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc);
public static WALKey writeBulkLoadMarkerAndSync(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException {
WALKey walKey = writeMarker(wal, replicationScope, hri, WALEdit.createBulkLoadEvent(hri, desc),
mvcc);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
return walKey;
}
private static WALKey writeMarker(final WAL wal, final HTableDescriptor htd,
final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
private static WALKey writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc)
throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullAppendTransaction(wal, htd, hri, edit, mvcc, true);
return doFullAppendTransaction(wal, replicationScope, hri, edit, mvcc, true);
}
/**
@ -134,16 +139,16 @@ public class WALUtil {
* <p>This write is for internal use only. Not for external client consumption.
* @return WALKey that was added to the WAL.
*/
public static WALKey doFullAppendTransaction(final WAL wal, final HTableDescriptor htd,
final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc,
final boolean sync)
public static WALKey doFullAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final HRegionInfo hri,
final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync)
throws IOException {
// TODO: Pass in current time to use?
WALKey walKey =
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc);
WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(),
System.currentTimeMillis(), mvcc, replicationScope);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.append(htd, hri, walKey, edit, false);
trx = wal.append(hri, walKey, edit, false);
if (sync) {
wal.sync(trx);
}

View File

@ -44,7 +44,7 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
NavigableMap<byte[], Integer> scopes = entry.getKey().getScopes();
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
if (scopes == null || scopes.isEmpty()) {
return null;
}

View File

@ -20,13 +20,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -43,7 +40,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
@ -61,7 +57,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.zookeeper.KeeperException;
@ -257,72 +252,47 @@ public class Replication extends WALActionsListener.Base implements
}
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
throws IOException {
scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
* @param htd Descriptor used to find the scope to use
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @param replicationManager Manager used to add bulk load events hfile references
* @throws IOException If failed to parse the WALEdit
*/
public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
byte[] family;
public static void scopeWALEdits(WALKey logKey,
WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
throws IOException {
boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
byte[] family;
boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
for (StoreDescriptor s : bld.getStoresList()) {
family = s.getFamilyName().toByteArray();
addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s);
}
} catch (IOException e) {
LOG.error("Failed to get bulk load events information from the wal file.", e);
throw e;
}
} else {
// Skip the flush/compaction/region events
continue;
}
} else {
family = CellUtil.cloneFamily(cell);
// Unexpected, has a tendency to happen in unit tests
assert htd.getFamily(family) != null;
if (!scopes.containsKey(family)) {
int scope = htd.getFamily(family).getScope();
if (scope != REPLICATION_SCOPE_LOCAL) {
scopes.put(family, scope);
}
}
foundOtherEdits = true;
}
}
if (!scopes.isEmpty() && !logEdit.isReplay()) {
logKey.setScopes(scopes);
}
}
private static void scopeBulkLoadEdits(HTableDescriptor htd,
ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
TableName tableName, Cell cell) throws IOException {
byte[] family;
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
for (StoreDescriptor s : bld.getStoresList()) {
family = s.getFamilyName().toByteArray();
if (!scopes.containsKey(family)) {
int scope = htd.getFamily(family).getScope();
if (scope != REPLICATION_SCOPE_LOCAL) {
scopes.put(family, scope);
addHFileRefsToQueue(replicationManager, tableName, family, s);
}
} else {
addHFileRefsToQueue(replicationManager, tableName, family, s);
}
}
} catch (IOException e) {
LOG.error("Failed to get bulk load events information from the wal file.", e);
throw e;
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
logKey.serializeReplicationScope(false);
}
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.FSUtils;
// imports for things that haven't moved from regionserver.wal yet.
@ -154,8 +153,7 @@ class DisabledWALProvider implements WALProvider {
}
@Override
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
boolean inMemstore) {
public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) {
if (!this.listeners.isEmpty()) {
final long start = System.nanoTime();
long len = 0;

View File

@ -25,7 +25,6 @@ import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
// imports we use from yet-to-be-moved regionsever.wal
@ -106,21 +105,17 @@ public interface WAL {
* completes BUT on return this edit must have its region edit/sequence id assigned
* else it messes up our unification of mvcc and sequenceid. On return <code>key</code> will
* have the region edit/sequence id filled in.
* @param info
* @param info the regioninfo associated with append
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param htd used to give scope for replication TODO refactor out in favor of table name and
* info
* @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
boolean inMemstore)
throws IOException;
long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException;
/**
* Sync what we have in the WAL.

View File

@ -193,7 +193,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
@InterfaceAudience.Private
protected List<UUID> clusterIds;
private NavigableMap<byte[], Integer> scopes;
private NavigableMap<byte[], Integer> replicationScope;
private long nonceGroup = HConstants.NO_NONCE;
private long nonce = HConstants.NO_NONCE;
@ -210,7 +210,12 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
public WALKey() {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
public WALKey(final NavigableMap<byte[], Integer> replicationScope) {
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
}
@VisibleForTesting
@ -220,15 +225,16 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
List<UUID> clusterIds = new ArrayList<UUID>();
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
HConstants.NO_NONCE, HConstants.NO_NONCE, null);
HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
}
/**
* @deprecated Remove. Useless.
*/
@Deprecated // REMOVE
public WALKey(final byte[] encodedRegionName, final TableName tablename) {
this(encodedRegionName, tablename, System.currentTimeMillis());
public WALKey(final byte[] encodedRegionName, final TableName tablename,
final NavigableMap<byte[], Integer> replicationScope) {
this(encodedRegionName, tablename, System.currentTimeMillis(), replicationScope);
}
// TODO: Fix being able to pass in sequenceid.
@ -240,7 +246,20 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
null);
null, null);
}
// TODO: Fix being able to pass in sequenceid.
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, null, replicationScope);
}
public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc, replicationScope);
}
public WALKey(final byte[] encodedRegionName,
@ -254,7 +273,33 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
HConstants.NO_NONCE,
HConstants.NO_NONCE,
mvcc);
mvcc, null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
* <p>Used by log splitting and snapshots.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup the nonceGroup
* @param nonce the nonce
* @param mvcc the mvcc associate the WALKey
* @param replicationScope the non-default replication scope
* associated with the region's column families
*/
// TODO: Fix being able to pass in sequenceid.
public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
}
/**
@ -279,7 +324,28 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
}
/**
* Create the log key for writing to somewhere.
* We maintain the tablename mainly for debugging purposes.
* A regionName is always a sub-table object.
*
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tablename the tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
* @param nonce
* @param mvcc mvcc control used to generate sequence numbers and control read/write points
*/
public WALKey(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
null);
}
/**
@ -292,14 +358,17 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
* @param tablename
* @param now Time at which this edit was written.
* @param clusterIds the clusters that have consumed the change(used in Replication)
* @param nonceGroup
* @param nonce
* @param nonceGroup the nonceGroup
* @param nonce the nonce
* @param mvcc mvcc control used to generate sequence numbers and control read/write points
* @param replicationScope the non-default replication scope of the column families
*/
public WALKey(final byte[] encodedRegionName, final TableName tablename,
final long now, List<UUID> clusterIds, long nonceGroup,
final long nonce, final MultiVersionConcurrencyControl mvcc) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
final long nonce, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
replicationScope);
}
/**
@ -328,7 +397,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
EMPTY_UUIDS,
nonceGroup,
nonce,
mvcc);
mvcc, null);
}
@InterfaceAudience.Private
@ -339,7 +408,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
List<UUID> clusterIds,
long nonceGroup,
long nonce,
MultiVersionConcurrencyControl mvcc) {
MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> replicationScope) {
this.sequenceId = logSeqNum;
this.writeTime = now;
this.clusterIds = clusterIds;
@ -351,6 +421,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
if (logSeqNum != NO_SEQUENCE_ID) {
setSequenceId(logSeqNum);
}
this.replicationScope = replicationScope;
}
// For HLogKey and deserialization. DO NOT USE. See setWriteEntry below.
@ -418,8 +489,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return this.writeTime;
}
public NavigableMap<byte[], Integer> getScopes() {
return scopes;
public NavigableMap<byte[], Integer> getReplicationScopes() {
return replicationScope;
}
/** @return The nonce group */
@ -432,8 +503,14 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
return nonce;
}
public void setScopes(NavigableMap<byte[], Integer> scopes) {
this.scopes = scopes;
private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) {
this.replicationScope = replicationScope;
}
public void serializeReplicationScope(boolean serialize) {
if (!serialize) {
setReplicationScope(null);
}
}
public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
@ -450,7 +527,7 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
}
if (scopes.size() > 0) {
this.scopes = scopes;
this.replicationScope = scopes;
}
}
}
@ -598,8 +675,8 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
builder.addClusterIds(uuidBuilder.build());
}
if (scopes != null) {
for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
if (replicationScope != null) {
for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
: compressor.compress(e.getKey(), compressionContext.familyDict);
builder.addScopes(FamilyScope.newBuilder()
@ -638,13 +715,13 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
if (walKey.hasNonce()) {
this.nonce = walKey.getNonce();
}
this.scopes = null;
this.replicationScope = null;
if (walKey.getScopesCount() > 0) {
this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
this.replicationScope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (FamilyScope scope : walKey.getScopesList()) {
byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
this.scopes.put(family, scope.getScopeType().getNumber());
this.replicationScope.put(family, scope.getScopeType().getNumber());
}
}
setSequenceId(walKey.getLogSequenceNumber());

View File

@ -1778,7 +1778,7 @@ public class WALSplitter {
WALEdit edit = entry.getEdit();
TableName table = entry.getKey().getTablename();
// clear scopes which isn't needed for recovery
entry.getKey().setScopes(null);
entry.getKey().serializeReplicationScope(false);
String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
// skip edits of non-existent tables
if (nonExistentTables != null && nonExistentTables.contains(table)) {

View File

@ -281,7 +281,8 @@ public class TestIOFencing {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri,
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(),
WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
((HRegion)compactingRegion).getReplicationScope(),
oldHri, compactionDescriptor, compactingRegion.getMVCC());
// Wait till flush has happened, otherwise there won't be multiple store files

View File

@ -29,6 +29,8 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -184,7 +186,11 @@ public class TestWALObserver {
HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes
.toString(TEST_TABLE));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
@ -235,8 +241,8 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTime();
// we use HLogKey here instead of WALKey directly to support legacy coprocessors.
long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now),
edit, true);
long txid = log.append(hri,
new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, scopes), edit, true);
log.sync(txid);
// the edit shall have been change now by the coprocessor.
@ -296,10 +302,15 @@ public class TestWALObserver {
assertFalse(oldApi.isPostWALWriteDeprecatedCalled());
LOG.debug("writing to WAL with non-legacy keys.");
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for (HColumnDescriptor hcd : htd.getFamilies()) {
scopes.put(hcd.getName(), 0);
}
final int countPerFamily = 5;
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
}
LOG.debug("Verify that only the non-legacy CP saw edits.");
@ -323,7 +334,7 @@ public class TestWALObserver {
final WALEdit edit = new WALEdit();
final byte[] nonce = Bytes.toBytes("1772");
edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce));
final long txid = wal.append(htd, hri, legacyKey, edit, true);
final long txid = wal.append(hri, legacyKey, edit, true);
wal.sync(txid);
LOG.debug("Make sure legacy cps can see supported edits after having been skipped.");
@ -349,7 +360,11 @@ public class TestWALObserver {
final HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE));
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
WAL log = wals.getWAL(UNSPECIFIED_REGION, null);
try {
SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class);
@ -360,8 +375,8 @@ public class TestWALObserver {
assertFalse(cp.isPostWALWriteCalled());
final long now = EnvironmentEdgeManager.currentTime();
long txid = log.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc),
long txid = log.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes),
new WALEdit(), true);
log.sync(txid);
@ -400,14 +415,18 @@ public class TestWALObserver {
// Put p = creatPutWith2Families(TEST_ROW);
WALEdit edit = new WALEdit();
long now = EnvironmentEdgeManager.currentTime();
// addFamilyMapToWALEdit(p.getFamilyMap(), edit);
final int countPerFamily = 1000;
// for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for (HColumnDescriptor hcd : htd.getFamilies()) {
scopes.put(hcd.getName(), 0);
}
for (HColumnDescriptor hcd : htd.getFamilies()) {
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
EnvironmentEdgeManager.getDelegate(), wal, htd, mvcc);
EnvironmentEdgeManager.getDelegate(), wal, scopes, mvcc);
}
wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
true);
// sync to fs.
wal.sync();
@ -527,7 +546,8 @@ public class TestWALObserver {
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
final NavigableMap<byte[], Integer> scopes, final MultiVersionConcurrencyControl mvcc)
throws IOException {
String familyStr = Bytes.toString(family);
long txid = -1;
for (int j = 0; j < count; j++) {
@ -537,7 +557,7 @@ public class TestWALObserver {
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
// uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care
// about legacy coprocessors
txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
ee.currentTime(), mvcc), edit, true);
}
if (-1 != txid) {

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogKeyRecordReader;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.wal.WALKey;
@ -32,8 +35,8 @@ import org.junit.experimental.categories.Category;
public class TestHLogRecordReader extends TestWALRecordReader {
@Override
protected WALKey getWalKey(final long time) {
return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
return new HLogKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
}
@Override

View File

@ -34,6 +34,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -656,9 +657,9 @@ public class TestImportExport {
Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3);
// Register the wal listener for the import table
TableWALActionListener walListener = new TableWALActionListener(importTableName);
HRegionInfo region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
TableWALActionListener walListener = new TableWALActionListener(region);
WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
wal.registerWALActionsListener(walListener);
@ -678,7 +679,7 @@ public class TestImportExport {
region = UTIL.getHBaseCluster().getRegionServerThreads().get(0).getRegionServer()
.getOnlineRegions(importTable.getName()).get(0).getRegionInfo();
wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(region);
walListener = new TableWALActionListener(importTableName);
walListener = new TableWALActionListener(region);
wal.registerWALActionsListener(walListener);
args = new String[] { importTableName, FQ_OUTPUT_DIR };
assertTrue(runImport(args));
@ -695,16 +696,17 @@ public class TestImportExport {
*/
private static class TableWALActionListener extends WALActionsListener.Base {
private String tableName;
private HRegionInfo regionInfo;
private boolean isVisited = false;
public TableWALActionListener(String tableName) {
this.tableName = tableName;
public TableWALActionListener(HRegionInfo region) {
this.regionInfo = region;
}
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
if (tableName.equalsIgnoreCase(htd.getNameAsString()) && (!logEdit.isMetaEdit())) {
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
if (logKey.getTablename().getNameAsString().equalsIgnoreCase(
this.regionInfo.getTable().getNameAsString()) && (!logEdit.isMetaEdit())) {
isVisited = true;
}
}

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -77,6 +79,8 @@ public class TestWALRecordReader {
private static HTableDescriptor htd;
private static Path logDir;
protected MultiVersionConcurrencyControl mvcc;
protected static NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
private static String getName() {
return "TestWALRecordReader";
@ -128,10 +132,10 @@ public class TestWALRecordReader {
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
log.append(htd, info, getWalKey(ts), edit, true);
log.append(info, getWalKey(ts, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.append(htd, info, getWalKey(ts+1), edit, true);
log.append(info, getWalKey(ts+1, scopes), edit, true);
log.sync();
LOG.info("Before 1st WAL roll " + log.toString());
log.rollWriter();
@ -142,10 +146,10 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
log.append(htd, info, getWalKey(ts1+1), edit, true);
log.append(info, getWalKey(ts1+1, scopes), edit, true);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
log.append(htd, info, getWalKey(ts1+2), edit, true);
log.append(info, getWalKey(ts1+2, scopes), edit, true);
log.sync();
log.shutdown();
walfactory.shutdown();
@ -187,7 +191,7 @@ public class TestWALRecordReader {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
long txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
@ -197,7 +201,7 @@ public class TestWALRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, true);
txid = log.append(info, getWalKey(System.currentTimeMillis(), scopes), edit, true);
log.sync(txid);
log.shutdown();
walfactory.shutdown();
@ -236,8 +240,8 @@ public class TestWALRecordReader {
testSplit(splits.get(1));
}
protected WALKey getWalKey(final long time) {
return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc);
protected WALKey getWalKey(final long time, NavigableMap<byte[], Integer> scopes) {
return new WALKey(info.getEncodedNameAsBytes(), tableName, time, mvcc, scopes);
}
protected WALRecordReader getReader() {

View File

@ -1301,9 +1301,8 @@ public class TestDistributedLogSplitting {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
wal.append(htd, curRegionInfo,
new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName, System.currentTimeMillis()),
e, true);
wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), null), e, true);
}
wal.sync();
wal.shutdown();
@ -1397,7 +1396,7 @@ public class TestDistributedLogSplitting {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
wal.append(htd, curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
wal.append(curRegionInfo, new HLogKey(curRegionInfo.getEncodedNameAsBytes(),
tableName, System.currentTimeMillis()), e, true);
}
wal.sync();
@ -1609,7 +1608,7 @@ public class TestDistributedLogSplitting {
// key
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
log.append(htd, curRegionInfo,
log.append(curRegionInfo,
new HLogKey(curRegionInfo.getEncodedNameAsBytes(), fullTName,
System.currentTimeMillis()), e, true);
if (0 == i % syncEvery) {

View File

@ -87,6 +87,7 @@ public class TestBulkLoad {
private final byte[] randomBytes = new byte[100];
private final byte[] family1 = Bytes.toBytes("family1");
private final byte[] family2 = Bytes.toBytes("family2");
@Rule
public TestName name = new TestName();
@ -105,12 +106,12 @@ public class TestBulkLoad {
storeFileName = (new Path(storeFileName)).getName();
List<String> storeFileNames = new ArrayList<String>();
storeFileNames.add(storeFileName);
when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class), any(WALKey.class),
when(log.append(any(HRegionInfo.class), any(WALKey.class),
argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
familyName, storeFileNames)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@ -132,11 +133,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadSingleFamilyHLog() throws IOException {
when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@ -151,11 +152,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadManyFamilyHLog() throws IOException {
when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
@ -171,11 +172,11 @@ public class TestBulkLoad {
@Test
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
when(log.append(any(HTableDescriptor.class), any(HRegionInfo.class),
when(log.append(any(HRegionInfo.class),
any(WALKey.class), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
any(boolean.class))).thenAnswer(new Answer() {
public Object answer(InvocationOnMock invocation) {
WALKey walKey = invocation.getArgumentAt(2, WALKey.class);
WALKey walKey = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
if (mvcc != null) {
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();

View File

@ -896,7 +896,7 @@ public class TestHRegion {
storeFiles, Lists.newArrayList(newFile),
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getTableDesc(),
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
@ -4796,7 +4796,7 @@ public class TestHRegion {
//verify append called or not
verify(wal, expectAppend ? times(1) : never())
.append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
.append((HRegionInfo)any(), (WALKey)any(),
(WALEdit)any(), Mockito.anyBoolean());
// verify sync called or not
@ -5998,7 +5998,7 @@ public class TestHRegion {
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
TEST_UTIL.getConfiguration(), rss, null);
verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
@ -6111,7 +6111,7 @@ public class TestHRegion {
// verify that we have not appended region open event to WAL because this region is still
// recovering
verify(wal, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
verify(wal, times(0)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
// not put the region out of recovering state
@ -6119,7 +6119,7 @@ public class TestHRegion {
.prepare().process();
// now we should have put the entry
verify(wal, times(1)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any()
verify(wal, times(1)).append((HRegionInfo)any(), (WALKey)any()
, editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getValue();
@ -6163,12 +6163,12 @@ public class TestHRegion {
*/
private WAL mockWAL() throws IOException {
WAL wal = mock(WAL.class);
Mockito.when(wal.append((HTableDescriptor)Mockito.any(), (HRegionInfo)Mockito.any(),
Mockito.when(wal.append((HRegionInfo)Mockito.any(),
(WALKey)Mockito.any(), (WALEdit)Mockito.any(), Mockito.anyBoolean())).
thenAnswer(new Answer<Long>() {
@Override
public Long answer(InvocationOnMock invocation) throws Throwable {
WALKey key = invocation.getArgumentAt(2, WALKey.class);
WALKey key = invocation.getArgumentAt(1, WALKey.class);
MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin();
key.setWriteEntry(we);
return 1L;
@ -6206,7 +6206,7 @@ public class TestHRegion {
region.close(false);
// 2 times, one for region open, the other close region
verify(wal, times(2)).append((HTableDescriptor)any(), (HRegionInfo)any(), (WALKey)any(),
verify(wal, times(2)).append((HRegionInfo)any(), (WALKey)any(),
editCaptor.capture(), anyBoolean());
WALEdit edit = editCaptor.getAllValues().get(1);

View File

@ -1126,7 +1126,7 @@ public class TestHRegionReplayEvents {
// test for region open and close
secondaryRegion = HRegion.openHRegion(secondaryHri, htd, walSecondary, CONF, rss, null);
verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
// test for replay prepare flush
@ -1140,11 +1140,11 @@ public class TestHRegionReplayEvents {
.setRegionName(ByteString.copyFrom(primaryRegion.getRegionInfo().getRegionName()))
.build());
verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
secondaryRegion.close();
verify(walSecondary, times(0)).append((HTableDescriptor)any(), (HRegionInfo)any(),
verify(walSecondary, times(0)).append((HRegionInfo)any(),
(WALKey)any(), (WALEdit)any(), anyBoolean());
}

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -411,7 +412,7 @@ public class TestHRegionServerBulkLoad {
private boolean found = false;
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
for (Cell cell : logEdit.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
for (Map.Entry entry : kv.toStringMap().entrySet()) {

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -208,13 +210,17 @@ public class TestWALLockup {
HTableDescriptor htd = new HTableDescriptor(TableName.META_TABLE_NAME);
final HRegion region = initHRegion(tableName, null, null, dodgyWAL);
byte [] bytes = Bytes.toBytes(getName());
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
scopes.put(COLUMN_FAMILY_BYTES, 0);
try {
// First get something into memstore. Make a Put and then pull the Cell out of it. Will
// manage append and sync carefully in below to manufacture hang. We keep adding same
// edit. WAL subsystem doesn't care.
Put put = new Put(bytes);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("1"), bytes);
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName());
WALKey key = new WALKey(region.getRegionInfo().getEncodedNameAsBytes(), htd.getTableName(),
scopes);
WALEdit edit = new WALEdit();
CellScanner CellScanner = put.cellScanner();
assertTrue(CellScanner.advance());
@ -228,7 +234,7 @@ public class TestWALLockup {
LOG.info("SET throwing of exception on append");
dodgyWAL.throwException = true;
// This append provokes a WAL roll request
dodgyWAL.append(htd, region.getRegionInfo(), key, edit, true);
dodgyWAL.append(region.getRegionInfo(), key, edit, true);
boolean exception = false;
try {
dodgyWAL.sync();

View File

@ -28,7 +28,9 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.commons.lang.mutable.MutableBoolean;
@ -152,12 +154,9 @@ public class TestFSHLog {
}
}
protected void addEdits(WAL log,
HRegionInfo hri,
HTableDescriptor htd,
int times,
MultiVersionConcurrencyControl mvcc)
throws IOException {
protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times,
MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
throws IOException {
final byte[] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
@ -165,8 +164,8 @@ public class TestFSHLog {
cols.add(new KeyValue(row, row, row, timestamp, row));
WALKey key = new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(),
WALKey.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
HConstants.NO_NONCE, mvcc);
log.append(htd, hri, key, cols, true);
HConstants.NO_NONCE, mvcc, scopes);
log.append(hri, key, cols, true);
}
log.sync();
}
@ -261,11 +260,21 @@ public class TestFSHLog {
new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// add edits and roll the wal
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : t1.getFamiliesKeys()) {
scopes1.put(fam, 0);
}
NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : t2.getFamiliesKeys()) {
scopes2.put(fam, 0);
}
try {
addEdits(wal, hri1, t1, 2, mvcc);
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
wal.rollWriter();
// add some more edits and roll the wal. This would reach the log number threshold
addEdits(wal, hri1, t1, 2, mvcc);
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
wal.rollWriter();
// with above rollWriter call, the max logs limit is reached.
assertTrue(wal.getNumRolledLogFiles() == 2);
@ -276,7 +285,7 @@ public class TestFSHLog {
assertEquals(1, regionsToFlush.length);
assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
// insert edits in second region
addEdits(wal, hri2, t2, 2, mvcc);
addEdits(wal, hri2, t2, 2, mvcc, scopes2);
// get the regions to flush, it should still read region1.
regionsToFlush = wal.findRegionsToForceFlush();
assertEquals(regionsToFlush.length, 1);
@ -293,12 +302,12 @@ public class TestFSHLog {
// no wal should remain now.
assertEquals(0, wal.getNumRolledLogFiles());
// add edits both to region 1 and region 2, and roll.
addEdits(wal, hri1, t1, 2, mvcc);
addEdits(wal, hri2, t2, 2, mvcc);
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
addEdits(wal, hri2, t2, 2, mvcc, scopes2);
wal.rollWriter();
// add edits and roll the writer, to reach the max logs limit.
assertEquals(1, wal.getNumRolledLogFiles());
addEdits(wal, hri1, t1, 2, mvcc);
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
wal.rollWriter();
// it should return two regions to flush, as the oldest wal file has entries
// for both regions.
@ -310,7 +319,7 @@ public class TestFSHLog {
wal.rollWriter(true);
assertEquals(0, wal.getNumRolledLogFiles());
// Add an edit to region1, and roll the wal.
addEdits(wal, hri1, t1, 2, mvcc);
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
wal.rollWriter();
@ -360,6 +369,11 @@ public class TestFSHLog {
HBaseTestingUtility.closeRegionAndWAL(r);
final int countPerFamily = 10;
final MutableBoolean goslow = new MutableBoolean(false);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
// subclass and doctor a method.
FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
testName, conf) {
@ -403,9 +417,9 @@ public class TestFSHLog {
for (int i = 0; i < countPerFamily; i++) {
final HRegionInfo info = region.getRegionInfo();
final WALKey logkey = new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC());
wal.append(htd, info, logkey, edits, true);
}
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
wal.append(info, logkey, edits, true);
}
region.flush(true);
// FlushResult.flushSequenceId is not visible here so go get the current sequence id.
long currentSequenceId = region.getReadPoint(null);
@ -439,11 +453,16 @@ public class TestFSHLog {
syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
HRegionInfo hri =
new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
for (int i = 0; i < 10; i++) {
addEdits(log, hri, htd, 1, mvcc);
addEdits(log, hri, htd, 1, mvcc, scopes);
}
} finally {
log.close();

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -199,8 +202,13 @@ public class TestLogRollAbort {
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
log.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), kvs, true);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
log.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Send the data to HDFS datanodes and close the HDFS writer
log.sync();

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -139,8 +141,13 @@ public class TestLogRollingNoCluster {
edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
final HTableDescriptor htd = TEST_UTIL.getMetaTableDescriptor();
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc), edit, true);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, now, mvcc, scopes), edit, true);
wal.sync(txid);
}
String msg = getName() + " finished";

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -98,9 +100,13 @@ public class TestWALActionsListener {
edit.add(kv);
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(SOME_BYTES));
htd.addFamily(new HColumnDescriptor(b));
final long txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.valueOf(b), 0), edit, true);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
final long txid = wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(),
TableName.valueOf(b), 0, scopes), edit, true);
wal.sync(txid);
if (i == 10) {
wal.registerWALActionsListener(laterobserver);

View File

@ -37,7 +37,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -308,9 +310,14 @@ public class TestWALReplay {
// Add 1k to each family.
final int countPerFamily = 1000;
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
wal1, htd, mvcc);
wal1, htd, mvcc, scopes);
}
wal1.shutdown();
runWALSplit(this.conf);
@ -319,7 +326,7 @@ public class TestWALReplay {
// Add 1k to each family.
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
ee, wal2, htd, mvcc);
ee, wal2, htd, mvcc, scopes);
}
wal2.shutdown();
runWALSplit(this.conf);
@ -800,9 +807,14 @@ public class TestWALReplay {
// Add 1k to each family.
final int countPerFamily = 1000;
Set<byte[]> familyNames = new HashSet<byte[]>();
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
ee, wal, htd, mvcc);
ee, wal, htd, mvcc, scopes);
familyNames.add(hcd.getName());
}
@ -815,13 +827,15 @@ public class TestWALReplay {
long now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
true);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTime();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily));
wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc), edit, true);
wal.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes), edit,
true);
// Sync.
wal.sync();
@ -1046,12 +1060,16 @@ public class TestWALReplay {
deleteDir(basedir);
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
HBaseTestingUtility.closeRegionAndWAL(region);
final byte[] family = htd.getColumnFamilies()[0].getName();
final byte[] rowName = tableName.getName();
FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1);
FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2);
FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1, scopes);
FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2, scopes);
Path largeFile = new Path(logDir, "wal-1");
Path smallFile = new Path(logDir, "wal-2");
@ -1154,8 +1172,8 @@ public class TestWALReplay {
}
private WALKey createWALKey(final TableName tableName, final HRegionInfo hri,
final MultiVersionConcurrencyControl mvcc) {
return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc);
final MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes) {
return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc, scopes);
}
private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
@ -1169,19 +1187,20 @@ public class TestWALReplay {
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
int index) throws IOException {
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
FSWALEntry entry =
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit(
rowName, family, ee, index), htd, hri, true);
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes), createWALEdit(
rowName, family, ee, index), hri, true);
entry.stampRegionSequenceId();
return entry;
}
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc,
NavigableMap<byte[], Integer> scopes) throws IOException {
for (int j = 0; j < count; j++) {
wal.append(htd, hri, createWALKey(tableName, hri, mvcc),
wal.append(hri, createWALKey(tableName, hri, mvcc, scopes),
createWALEdit(rowName, family, ee, j), true);
}
wal.sync();

View File

@ -18,6 +18,9 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -63,6 +66,7 @@ public class TestReplicationBase {
protected static Table htable1;
protected static Table htable2;
protected static NavigableMap<byte[], Integer> scopes;
protected static HBaseTestingUtility utility1;
protected static HBaseTestingUtility utility2;
@ -140,6 +144,11 @@ public class TestReplicationBase {
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(HColumnDescriptor f : table.getColumnFamilies()) {
scopes.put(f.getName(), f.getScope());
}
Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
try (Admin admin1 = connection1.getAdmin()) {

View File

@ -26,6 +26,8 @@ import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -658,7 +660,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
HRegionInfo hri = new HRegionInfo(htable1.getName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
Replication.scopeWALEdits(new WALKey(), edit,
htable1.getConfiguration(), null);
}
@ -767,7 +769,10 @@ public class TestReplicationSmallTests extends TestReplicationBase {
HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
HRegionInfo hri = region.getRegionInfo();
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htable1.getTableDescriptor().getFamiliesKeys()) {
scopes.put(fam, 1);
}
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
@ -778,8 +783,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
long now = EnvironmentEdgeManager.currentTime();
edit.add(new KeyValue(rowName, famName, qualifier,
now, value));
WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc);
wal.append(htable1.getTableDescriptor(), hri, walKey, edit, true);
WALKey walKey = new WALKey(hri.getEncodedNameAsBytes(), tableName, now, mvcc, scopes);
wal.append(hri, walKey, edit, true);
wal.sync();
Get get = new Get(rowName);

View File

@ -58,19 +58,19 @@ public class TestReplicationWALEntryFilters {
// meta
WALKey key1 = new WALKey( HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME);
TableName.META_TABLE_NAME, null);
Entry metaEntry = new Entry(key1, null);
assertNull(filter.filter(metaEntry));
// ns table
WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME);
WALKey key2 = new WALKey(new byte[] {}, TableName.NAMESPACE_TABLE_NAME, null);
Entry nsEntry = new Entry(key2, null);
assertNull(filter.filter(nsEntry));
// user table
WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo"));
WALKey key3 = new WALKey(new byte[] {}, TableName.valueOf("foo"), null);
Entry userEntry = new Entry(key3, null);
assertEquals(userEntry, filter.filter(userEntry));
@ -80,33 +80,30 @@ public class TestReplicationWALEntryFilters {
public void testScopeWALEntryFilter() {
ScopeWALEntryFilter filter = new ScopeWALEntryFilter();
Entry userEntry = createEntry(a, b);
Entry userEntryA = createEntry(a);
Entry userEntryB = createEntry(b);
Entry userEntryEmpty = createEntry();
Entry userEntry = createEntry(null, a, b);
Entry userEntryA = createEntry(null, a);
Entry userEntryB = createEntry(null, b);
Entry userEntryEmpty = createEntry(null);
// no scopes
assertEquals(null, filter.filter(userEntry));
// empty scopes
TreeMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
userEntry = createEntry(scopes, a, b);
assertEquals(null, filter.filter(userEntry));
// different scope
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(c, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
userEntry = createEntry(scopes, a, b);
// all kvs should be filtered
assertEquals(userEntryEmpty, filter.filter(userEntry));
// local scope
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
userEntry = createEntry(scopes, a, b);
assertEquals(userEntryEmpty, filter.filter(userEntry));
scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryEmpty, filter.filter(userEntry));
@ -114,8 +111,7 @@ public class TestReplicationWALEntryFilters {
// only scope a
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(a, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
userEntry = createEntry(scopes, a, b);
assertEquals(userEntryA, filter.filter(userEntry));
scopes.put(b, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryA, filter.filter(userEntry));
@ -123,8 +119,7 @@ public class TestReplicationWALEntryFilters {
// only scope b
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
userEntry = createEntry(scopes, a, b);
assertEquals(userEntryB, filter.filter(userEntry));
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryB, filter.filter(userEntry));
@ -132,8 +127,7 @@ public class TestReplicationWALEntryFilters {
// scope a and b
scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
userEntry = createEntry(a, b);
userEntry.getKey().setScopes(scopes);
userEntry = createEntry(scopes, a, b);
assertEquals(userEntryB, filter.filter(userEntry));
scopes.put(a, HConstants.REPLICATION_SCOPE_LOCAL);
assertEquals(userEntryB, filter.filter(userEntry));
@ -155,16 +149,16 @@ public class TestReplicationWALEntryFilters {
@Test
public void testChainWALEntryFilter() {
Entry userEntry = createEntry(a, b, c);
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter = new ChainWALEntryFilter(passFilter);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(passFilter, passFilter);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(passFilter, passFilter, passFilter);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter = new ChainWALEntryFilter(nullFilter);
assertEquals(null, filter.filter(userEntry));
@ -189,7 +183,7 @@ public class TestReplicationWALEntryFilters {
new ChainWALEntryFilter(passFilter),
new ChainWALEntryFilter(passFilter)),
new ChainWALEntryFilter(passFilter));
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
filter =
@ -206,19 +200,19 @@ public class TestReplicationWALEntryFilters {
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getTableCFs()).thenReturn(null);
Entry userEntry = createEntry(a, b, c);
Entry userEntry = createEntry(null, a, b, c);
TableCfWALEntryFilter filter = new TableCfWALEntryFilter(peer);
assertEquals(createEntry(a,b,c), filter.filter(userEntry));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
// empty map
userEntry = createEntry(a, b, c);
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(null, filter.filter(userEntry));
// table bar
userEntry = createEntry(a, b, c);
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs);
@ -226,24 +220,24 @@ public class TestReplicationWALEntryFilters {
assertEquals(null, filter.filter(userEntry));
// table foo:a
userEntry = createEntry(a, b, c);
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(createEntry(a), filter.filter(userEntry));
assertEquals(createEntry(null, a), filter.filter(userEntry));
// table foo:a,c
userEntry = createEntry(a, b, c, d);
userEntry = createEntry(null, a, b, c, d);
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
filter = new TableCfWALEntryFilter(peer);
assertEquals(createEntry(a,c), filter.filter(userEntry));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
}
private Entry createEntry(byte[]... kvs) {
WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo"));
private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
WALKey key1 = new WALKey(new byte[] {}, TableName.valueOf("foo"), scopes);
WALEdit edit1 = new WALEdit();
for (byte[] kv : kvs) {

View File

@ -32,6 +32,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@ -131,6 +132,7 @@ public class TestReplicationSourceManager {
private static CountDownLatch latch;
private static List<String> files = new ArrayList<String>();
private static NavigableMap<byte[], Integer> scopes;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -177,6 +179,11 @@ public class TestReplicationSourceManager {
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
hri = new HRegionInfo(htd.getTableName(), r1, r2);
}
@ -214,15 +221,20 @@ public class TestReplicationSourceManager {
manager.init();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
htd.addFamily(new HColumnDescriptor(f1));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
// Testing normal log rolling every 20
for(long i = 1; i < 101; i++) {
if(i > 1 && i % 20 == 0) {
wal.rollWriter();
}
LOG.info(i);
final long txid = wal.append(htd,
final long txid = wal.append(
hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
wal.sync(txid);
@ -236,8 +248,8 @@ public class TestReplicationSourceManager {
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
wal.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
wal.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
}
@ -254,8 +266,8 @@ public class TestReplicationSourceManager {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
wal.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
wal.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
edit,
true);
wal.sync();
@ -427,33 +439,35 @@ public class TestReplicationSourceManager {
@Test
public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
// 1. Create wal key
WALKey logKey = new WALKey();
// 2. Get the bulk load wal edit event
WALEdit logEdit = getBulkLoadWALEdit();
NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
// 1. Get the bulk load wal edit event
WALEdit logEdit = getBulkLoadWALEdit(scope);
// 2. Create wal key
WALKey logKey = new WALKey(scope);
// 3. Get the scopes for the key
Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager);
Replication.scopeWALEdits(logKey, logEdit, conf, manager);
// 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
assertNull("No bulk load entries scope should be added if bulk load replication is diabled.",
logKey.getScopes());
assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
logKey.getReplicationScopes());
}
@Test
public void testBulkLoadWALEdits() throws Exception {
// 1. Create wal key
WALKey logKey = new WALKey();
// 2. Get the bulk load wal edit event
WALEdit logEdit = getBulkLoadWALEdit();
// 1. Get the bulk load wal edit event
NavigableMap<byte[], Integer> scope = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
WALEdit logEdit = getBulkLoadWALEdit(scope);
// 2. Create wal key
WALKey logKey = new WALKey(scope);
// 3. Enable bulk load hfile replication
Configuration bulkLoadConf = HBaseConfiguration.create(conf);
bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
// 4. Get the scopes for the key
Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager);
Replication.scopeWALEdits(logKey, logEdit, bulkLoadConf, manager);
NavigableMap<byte[], Integer> scopes = logKey.getScopes();
NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
// Assert family with replication scope global is present in the key scopes
assertTrue("This family scope is set to global, should be part of replication key scopes.",
scopes.containsKey(f1));
@ -462,17 +476,16 @@ public class TestReplicationSourceManager {
scopes.containsKey(f2));
}
private WALEdit getBulkLoadWALEdit() {
private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
List<Path> p = new ArrayList<>(1);
p.add(new Path(Bytes.toString(f1)));
storeFiles.put(f1, p);
scope.put(f1, 1);
p = new ArrayList<>(1);
p.add(new Path(Bytes.toString(f2)));
storeFiles.put(f2, p);
// 2. Create bulk load descriptor
BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);

View File

@ -28,6 +28,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@ -75,6 +77,7 @@ public class TestReplicationWALReaderManager {
private static final HRegionInfo info = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
private static final HTableDescriptor htd = new HTableDescriptor(tableName);
private static NavigableMap<byte[], Integer> scopes;
private WAL log;
private ReplicationWALReaderManager logManager;
@ -123,6 +126,11 @@ public class TestReplicationWALReaderManager {
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
}
@AfterClass
@ -204,9 +212,8 @@ public class TestReplicationWALReaderManager {
}
private void appendToLogPlus(int count) throws IOException {
final long txid = log.append(htd, info,
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc),
getWALEdits(count), true);
final long txid = log.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count), true);
log.sync(txid);
}

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
// imports for things that haven't moved yet
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
@ -60,12 +59,12 @@ public class FaultyFSLog extends FSHLog {
}
@Override
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
boolean inMemstore) throws IOException {
public long append(HRegionInfo info, WALKey key,
WALEdit edits, boolean inMemstore) throws IOException {
if (this.ft == FailureType.APPEND) {
throw new IOException("append");
}
return super.append(htd, info, key, edits, inMemstore);
return super.append(info, key, edits, inMemstore);
}
}

View File

@ -25,8 +25,10 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashSet;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
@ -151,23 +153,25 @@ public class TestDefaultWALProvider {
protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd,
int times) throws IOException {
int times, NavigableMap<byte[], Integer> scopes) throws IOException {
final byte[] row = Bytes.toBytes("row");
for (int i = 0; i < times; i++) {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp),
cols, true);
log.append(hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp, scopes),
cols, true);
}
log.sync();
}
/**
* used by TestDefaultWALProviderWithHLogKey
* @param scopes
*/
WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
return new WALKey(info, tableName, timestamp, mvcc);
WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp,
NavigableMap<byte[], Integer> scopes) {
return new WALKey(info, tableName, timestamp, mvcc, scopes);
}
/**
@ -191,6 +195,16 @@ public class TestDefaultWALProvider {
final HTableDescriptor htd2 =
new HTableDescriptor(TableName.valueOf("testLogCleaning2"))
.addFamily(new HColumnDescriptor("row"));
NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes1.put(fam, 0);
}
NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd2.getFamiliesKeys()) {
scopes2.put(fam, 0);
}
final Configuration localConf = new Configuration(conf);
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
@ -205,26 +219,26 @@ public class TestDefaultWALProvider {
// Add a single edit and make sure that rolling won't remove the file
// Before HBASE-3198 it used to delete it
addEdits(log, hri, htd, 1);
addEdits(log, hri, htd, 1, scopes1);
log.rollWriter();
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log));
// See if there's anything wrong with more than 1 edit
addEdits(log, hri, htd, 2);
addEdits(log, hri, htd, 2, scopes1);
log.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log));
// Now mix edits from 2 regions, still no flushing
addEdits(log, hri, htd, 1);
addEdits(log, hri2, htd2, 1);
addEdits(log, hri, htd, 1);
addEdits(log, hri2, htd2, 1);
addEdits(log, hri, htd, 1, scopes1);
addEdits(log, hri2, htd2, 1, scopes2);
addEdits(log, hri, htd, 1, scopes1);
addEdits(log, hri2, htd2, 1, scopes2);
log.rollWriter();
assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log));
// Flush the first region, we expect to see the first two files getting
// archived. We need to append something or writer won't be rolled.
addEdits(log, hri2, htd2, 1);
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.rollWriter();
@ -233,7 +247,7 @@ public class TestDefaultWALProvider {
// Flush the second region, which removes all the remaining output files
// since the oldest was completely flushed and the two others only contain
// flush information
addEdits(log, hri2, htd2, 1);
addEdits(log, hri2, htd2, 1, scopes2);
log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys());
log.completeCacheFlush(hri2.getEncodedNameAsBytes());
log.rollWriter();
@ -264,6 +278,16 @@ public class TestDefaultWALProvider {
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
HTableDescriptor table2 =
new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
NavigableMap<byte[], Integer> scopes1 = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : table1.getFamiliesKeys()) {
scopes1.put(fam, 0);
}
NavigableMap<byte[], Integer> scopes2 = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : table2.getFamiliesKeys()) {
scopes2.put(fam, 0);
}
final Configuration localConf = new Configuration(conf);
localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName());
final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName());
@ -281,31 +305,31 @@ public class TestDefaultWALProvider {
hri2.setSplit(false);
// variables to mock region sequenceIds.
// start with the testing logic: insert a waledit, and roll writer
addEdits(wal, hri1, table1, 1);
addEdits(wal, hri1, table1, 1, scopes1);
wal.rollWriter();
// assert that the wal is rolled
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
// add edits in the second wal file, and roll writer.
addEdits(wal, hri1, table1, 1);
addEdits(wal, hri1, table1, 1, scopes1);
wal.rollWriter();
// assert that the wal is rolled
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// add a waledit to table1, and flush the region.
addEdits(wal, hri1, table1, 3);
addEdits(wal, hri1, table1, 3, scopes1);
flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys());
// roll log; all old logs should be archived.
wal.rollWriter();
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));
// add an edit to table2, and roll writer
addEdits(wal, hri2, table2, 1);
addEdits(wal, hri2, table2, 1, scopes2);
wal.rollWriter();
assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(wal));
// add edits for table1, and roll writer
addEdits(wal, hri1, table1, 2);
addEdits(wal, hri1, table1, 2, scopes1);
wal.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// add edits for table2, and flush hri1.
addEdits(wal, hri2, table2, 2);
addEdits(wal, hri2, table2, 2, scopes2);
flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys());
// the log : region-sequenceId map is
// log1: region2 (unflushed)
@ -315,7 +339,7 @@ public class TestDefaultWALProvider {
wal.rollWriter();
assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal));
// flush region2, and all logs should be archived.
addEdits(wal, hri2, table2, 2);
addEdits(wal, hri2, table2, 2, scopes2);
flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys());
wal.rollWriter();
assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.wal;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -28,7 +30,8 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@Category({RegionServerTests.class, LargeTests.class})
public class TestDefaultWALProviderWithHLogKey extends TestDefaultWALProvider {
@Override
WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp) {
return new HLogKey(info, tableName, timestamp, mvcc);
WALKey getWalKey(final byte[] info, final TableName tableName, final long timestamp,
final NavigableMap<byte[], Integer> scopes) {
return new HLogKey(info, tableName, timestamp, mvcc, scopes);
}
}

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
@ -79,6 +81,11 @@ public class TestSecureWAL {
TableName tableName = TableName.valueOf("TestSecureWAL");
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
final int total = 10;
@ -95,8 +102,8 @@ public class TestSecureWAL {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis()), kvs, true);
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), scopes), kvs, true);
}
wal.sync();
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);

View File

@ -30,6 +30,8 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -181,6 +183,11 @@ public class TestWALFactory {
}
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
// Add edits for three regions.
for (int ii = 0; ii < howmany; ii++) {
@ -196,8 +203,8 @@ public class TestWALFactory {
System.currentTimeMillis(), column));
LOG.info("Region " + i + ": " + edit);
WALKey walKey = new WALKey(infos[i].getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc);
log.append(htd, infos[i], walKey, edit, true);
System.currentTimeMillis(), mvcc, scopes);
log.append(infos[i], walKey, edit, true);
walKey.getWriteEntry();
}
log.sync();
@ -249,13 +256,18 @@ public class TestWALFactory {
null,null, false);
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
final WAL wal = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), kvs, true);
wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Now call sync and try reading. Opening a Reader before you sync just
// gives you EOFE.
@ -273,8 +285,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), kvs, true);
wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();
reader = wals.createReader(fs, walPath);
@ -295,8 +307,8 @@ public class TestWALFactory {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
wal.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), kvs, true);
wal.append(info, new WALKey(info.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
@ -370,12 +382,17 @@ public class TestWALFactory {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis()), kvs, true);
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), scopes), kvs, true);
}
// Now call sync to send the data to HDFS datanodes
wal.sync();
@ -485,6 +502,11 @@ public class TestWALFactory {
final HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
"column"));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
final byte [] row = Bytes.toBytes("row");
WAL.Reader reader = null;
try {
@ -503,9 +525,9 @@ public class TestWALFactory {
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
final WAL log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
final long txid = log.append(htd, info,
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
mvcc),
mvcc, scopes),
cols, true);
log.sync(txid);
log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys());
@ -545,6 +567,11 @@ public class TestWALFactory {
final HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor(
"column"));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
final byte [] row = Bytes.toBytes("row");
WAL.Reader reader = null;
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
@ -561,9 +588,9 @@ public class TestWALFactory {
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
final long txid = log.append(htd, hri,
final long txid = log.append(hri,
new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis(),
mvcc),
mvcc, scopes),
cols, true);
log.sync(txid);
log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys());
@ -607,7 +634,11 @@ public class TestWALFactory {
long timestamp = System.currentTimeMillis();
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
HRegionInfo hri = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
final WAL log = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
@ -617,8 +648,8 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[]{(byte) (i + '0')}));
log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), cols, true);
log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), cols, true);
}
log.sync();
assertEquals(COL_COUNT, visitor.increments);
@ -627,8 +658,8 @@ public class TestWALFactory {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(11)),
timestamp, new byte[]{(byte) (11 + '0')}));
log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), cols, true);
log.append(hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), cols, true);
log.sync();
assertEquals(COL_COUNT, visitor.increments);
}
@ -722,8 +753,9 @@ public class TestWALFactory {
}
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {
//To change body of implemented methods use File | Settings | File Templates.
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
// To change body of implemented methods use File | Settings | File
// Templates.
increments++;
}
}

View File

@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.LogFactory;
@ -96,6 +98,11 @@ public class TestWALReaderOnSecureWAL {
TableName tableName = TableName.valueOf(tblName);
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(tableName.getName()));
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
final int total = 10;
@ -109,8 +116,8 @@ public class TestWALReaderOnSecureWAL {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
wal.append(htd, regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc), kvs, true);
wal.append(regioninfo, new WALKey(regioninfo.getEncodedNameAsBytes(), tableName,
System.currentTimeMillis(), mvcc, scopes), kvs, true);
}
wal.sync();
final Path walPath = DefaultWALProvider.getCurrentFileName(wal);

View File

@ -23,8 +23,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@ -128,6 +130,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
private final int syncInterval;
private final HTableDescriptor htd;
private final Sampler loopSampler;
private final NavigableMap<byte[], Integer> scopes;
WALPutBenchmark(final HRegion region, final HTableDescriptor htd,
final long numIterations, final boolean noSync, final int syncInterval,
@ -138,6 +141,11 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
this.numFamilies = htd.getColumnFamilies().length;
this.region = region;
this.htd = htd;
scopes = new TreeMap<byte[], Integer>(
Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
scopes.put(fam, 0);
}
String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes");
if (spanReceivers == null || spanReceivers.isEmpty()) {
loopSampler = Sampler.NEVER;
@ -180,8 +188,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
HRegionInfo hri = region.getRegionInfo();
final WALKey logkey =
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc);
wal.append(htd, hri, logkey, walEdit, true);
new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now, mvcc, scopes);
wal.append(hri, logkey, walEdit, true);
if (!this.noSync) {
if (++lastSync >= this.syncInterval) {
wal.sync();
@ -498,8 +506,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
private int appends = 0;
@Override
public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
WALEdit logEdit) {
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
this.appends++;
if (this.appends % whenToRoll == 0) {
LOG.info("Rolling after " + appends + " edits");