HBASE-23221 Polish the WAL interface after HBASE-23181 (#774)
Removes the closeRegion flag added by HBASE-23181 and instead relies on reading meta WALEdit content. Modified how qualifier is written when the meta WALEdit is for a RegionEventDescriptor so the 'type' is added to the qualifer so can figure type w/o having to deserialize protobuf value content: e.g. HBASE::REGION_EVENT::REGION_CLOSE Added doc on WALEdit and tried to formalize the 'meta' WALEdit type and how it works. Needs complete redo in part as suggested by HBASE-8457. Meantime, some doc and cleanup. Also changed the LogRoller constructor to remove redundant param. Because of constructor change, need to change also TestFailedAppendAndSync, TestWALLockup, TestAsyncFSWAL & WALPerformanceEvaluation.java Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Lijin Bin <binlijin@apache.org>
This commit is contained in:
parent
9ab0489eab
commit
471538ca9e
|
@ -756,7 +756,7 @@ public final class CellUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Instead use
|
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Instead use
|
||||||
* {@link #matchingRows(Cell, byte[]))}
|
* {@link #matchingRows(Cell, byte[])}
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static boolean matchingRow(final Cell left, final byte[] buf) {
|
public static boolean matchingRow(final Cell left, final byte[] buf) {
|
||||||
|
@ -894,8 +894,15 @@ public final class CellUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) {
|
public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) {
|
||||||
if (!matchingFamily(left, fam)) return false;
|
return matchingFamily(left, fam) && matchingQualifier(left, qual);
|
||||||
return matchingQualifier(left, qual);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return True if matching column family and the qualifier starts with <code>qual</code>
|
||||||
|
*/
|
||||||
|
public static boolean matchingColumnFamilyAndQualifierPrefix(final Cell left, final byte[] fam,
|
||||||
|
final byte[] qual) {
|
||||||
|
return matchingFamily(left, fam) && PrivateCellUtil.qualifierStartsWith(left, qual);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -199,11 +199,10 @@ public class WALPlayer extends Configured implements Tool {
|
||||||
Delete del = null;
|
Delete del = null;
|
||||||
Cell lastCell = null;
|
Cell lastCell = null;
|
||||||
for (Cell cell : value.getCells()) {
|
for (Cell cell : value.getCells()) {
|
||||||
// filtering WAL meta entries
|
// Filtering WAL meta marker entries.
|
||||||
if (WALEdit.isMetaEditFamily(cell)) {
|
if (WALEdit.isMetaEditFamily(cell)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allow a subclass filter out this cell.
|
// Allow a subclass filter out this cell.
|
||||||
if (filter(context, cell)) {
|
if (filter(context, cell)) {
|
||||||
// A WALEdit may contain multiple operations (HBASE-3584) and/or
|
// A WALEdit may contain multiple operations (HBASE-3584) and/or
|
||||||
|
|
|
@ -3566,7 +3566,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.preListReplicationPeers(regex);
|
cpHost.preListReplicationPeers(regex);
|
||||||
}
|
}
|
||||||
LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex);
|
LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
|
||||||
Pattern pattern = regex == null ? null : Pattern.compile(regex);
|
Pattern pattern = regex == null ? null : Pattern.compile(regex);
|
||||||
List<ReplicationPeerDescription> peers =
|
List<ReplicationPeerDescription> peers =
|
||||||
this.replicationPeerManager.listPeers(pattern);
|
this.replicationPeerManager.listPeers(pattern);
|
||||||
|
|
|
@ -1725,7 +1725,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
status.setStatus("Writing region close event to WAL");
|
status.setStatus("Writing region close event to WAL");
|
||||||
// Always write close marker to wal even for read only table. This is not a big problem as we
|
// Always write close marker to wal even for read only table. This is not a big problem as we
|
||||||
// do not write any data into the region.
|
// do not write any data into the region; it is just a meta edit in the WAL file.
|
||||||
if (!abort && wal != null && getRegionServerServices() != null &&
|
if (!abort && wal != null && getRegionServerServices() != null &&
|
||||||
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
||||||
writeRegionCloseMarker(wal);
|
writeRegionCloseMarker(wal);
|
||||||
|
@ -2691,7 +2691,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
|
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
|
||||||
LOG.info("Flushing " + storesToFlush.size() + "/" + stores.size() + " column families," +
|
LOG.info("Flushing " + this.getRegionInfo().getEncodedName() + " " +
|
||||||
|
storesToFlush.size() + "/" + stores.size() + " column families," +
|
||||||
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
|
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
|
||||||
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
|
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
|
||||||
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
|
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
|
||||||
|
@ -4817,7 +4818,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
for (Cell cell: val.getCells()) {
|
for (Cell cell: val.getCells()) {
|
||||||
// Check this edit is for me. Also, guard against writing the special
|
// Check this edit is for me. Also, guard against writing the special
|
||||||
// METACOLUMN info such as HBASE::CACHEFLUSH entries
|
// METACOLUMN info such as HBASE::CACHEFLUSH entries
|
||||||
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
|
if (WALEdit.isMetaEditFamily(cell)) {
|
||||||
// if region names don't match, skipp replaying compaction marker
|
// if region names don't match, skipp replaying compaction marker
|
||||||
if (!checkRowWithinBoundary) {
|
if (!checkRowWithinBoundary) {
|
||||||
//this is a special edit, we should handle it
|
//this is a special edit, we should handle it
|
||||||
|
|
|
@ -1926,7 +1926,7 @@ public class HRegionServer extends HasThread implements
|
||||||
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
|
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
this.walRoller = new LogRoller(this, this);
|
this.walRoller = new LogRoller(this);
|
||||||
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
|
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
|
||||||
this.procedureResultReporter = new RemoteProcedureResultReporter(this);
|
this.procedureResultReporter = new RemoteProcedureResultReporter(this);
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -28,7 +28,6 @@ import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
|
@ -56,7 +55,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
public class LogRoller extends HasThread implements Closeable {
|
public class LogRoller extends HasThread implements Closeable {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
|
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
|
||||||
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
|
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
|
||||||
private final Server server;
|
|
||||||
protected final RegionServerServices services;
|
protected final RegionServerServices services;
|
||||||
private volatile long lastRollTime = System.currentTimeMillis();
|
private volatile long lastRollTime = System.currentTimeMillis();
|
||||||
// Period to roll log.
|
// Period to roll log.
|
||||||
|
@ -99,16 +97,14 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @param server */
|
public LogRoller(RegionServerServices services) {
|
||||||
public LogRoller(final Server server, final RegionServerServices services) {
|
|
||||||
super("LogRoller");
|
super("LogRoller");
|
||||||
this.server = server;
|
|
||||||
this.services = services;
|
this.services = services;
|
||||||
this.rollPeriod = this.server.getConfiguration().
|
this.rollPeriod = this.services.getConfiguration().
|
||||||
getLong("hbase.regionserver.logroll.period", 3600000);
|
getLong("hbase.regionserver.logroll.period", 3600000);
|
||||||
this.threadWakeFrequency = this.server.getConfiguration().
|
this.threadWakeFrequency = this.services.getConfiguration().
|
||||||
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||||
this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
|
this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
|
||||||
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,7 +140,7 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
LOG.warn("Failed to shutdown wal", e);
|
LOG.warn("Failed to shutdown wal", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
server.abort(reason, cause);
|
this.services.abort(reason, cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -156,7 +152,7 @@ public class LogRoller extends HasThread implements Closeable {
|
||||||
periodic = (now - this.lastRollTime) > this.rollPeriod;
|
periodic = (now - this.lastRollTime) > this.rollPeriod;
|
||||||
if (periodic) {
|
if (periodic) {
|
||||||
// Time for periodic roll, fall through
|
// Time for periodic roll, fall through
|
||||||
LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
|
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
|
||||||
} else {
|
} else {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
|
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
|
||||||
|
|
|
@ -261,8 +261,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
private static final class WalProps {
|
private static final class WalProps {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map the encoded region name to the highest sequence id. Contain all the regions it has
|
* Map the encoded region name to the highest sequence id.
|
||||||
* entries of
|
* <p/>Contains all the regions it has an entry for.
|
||||||
*/
|
*/
|
||||||
public final Map<byte[], Long> encodedName2HighestSequenceId;
|
public final Map<byte[], Long> encodedName2HighestSequenceId;
|
||||||
|
|
||||||
|
@ -610,9 +610,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the number of un-archived WAL files is greater than maximum allowed, check the first
|
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
|
||||||
* (oldest) WAL file, and returns those regions which should be flushed so that it can be
|
* check the first (oldest) WAL, and return those regions which should be flushed so that
|
||||||
* archived.
|
* it can be let-go/'archived'.
|
||||||
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
|
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
|
||||||
*/
|
*/
|
||||||
byte[][] findRegionsToForceFlush() throws IOException {
|
byte[][] findRegionsToForceFlush() throws IOException {
|
||||||
|
@ -888,10 +888,6 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
/**
|
/**
|
||||||
* updates the sequence number of a specific store. depending on the flag: replaces current seq
|
* updates the sequence number of a specific store. depending on the flag: replaces current seq
|
||||||
* number if the given seq id is bigger, or even if it is lower than existing one
|
* number if the given seq id is bigger, or even if it is lower than existing one
|
||||||
* @param encodedRegionName
|
|
||||||
* @param familyName
|
|
||||||
* @param sequenceid
|
|
||||||
* @param onlyIfGreater
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
|
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
|
||||||
|
@ -1015,7 +1011,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
|
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
|
||||||
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
|
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (this.closed) {
|
if (this.closed) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
|
@ -1029,7 +1025,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
|
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
|
||||||
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
|
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
|
||||||
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
|
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
|
||||||
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
|
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
|
||||||
entry.stampRegionSequenceId(we);
|
entry.stampRegionSequenceId(we);
|
||||||
ringBuffer.get(txid).load(entry);
|
ringBuffer.get(txid).load(entry);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -1067,13 +1063,13 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
|
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
|
||||||
return append(info, key, edits, true, false);
|
return append(info, key, edits, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
|
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return append(info, key, edits, false, closeRegion);
|
return append(info, key, edits, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1097,17 +1093,17 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
||||||
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
||||||
* sequence id that is after all currently appended edits.
|
* sequence id that is after all currently appended edits.
|
||||||
* @param inMemstore Always true except for case where we are writing a region event marker, for
|
* @param inMemstore Always true except for case where we are writing a region event meta
|
||||||
* example, a compaction completion record into the WAL; in this case the entry is just
|
* marker edit, for example, a compaction completion record into the WAL or noting a
|
||||||
* so we can finish an unfinished compaction -- it is not an edit for memstore.
|
* Region Open event. In these cases the entry is just so we can finish an unfinished
|
||||||
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
|
* compaction after a crash when the new Server reads the WAL on recovery, etc. These
|
||||||
* region on this region server. The WAL implementation should remove all the related
|
* transition event 'Markers' do not go via the memstore. When memstore is false,
|
||||||
* stuff, for example, the sequence id accounting.
|
* we presume a Marker event edit.
|
||||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||||
* in it.
|
* in it.
|
||||||
*/
|
*/
|
||||||
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||||
boolean closeRegion) throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
|
protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
|
||||||
|
|
||||||
|
|
|
@ -564,9 +564,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||||
boolean closeRegion) throws IOException {
|
throws IOException {
|
||||||
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
|
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
|
||||||
waitingConsumePayloads);
|
waitingConsumePayloads);
|
||||||
if (shouldScheduleConsumer()) {
|
if (shouldScheduleConsumer()) {
|
||||||
consumeExecutor.execute(consumer);
|
consumeExecutor.execute(consumer);
|
||||||
|
|
|
@ -436,8 +436,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
|
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
|
||||||
final boolean inMemstore, boolean closeRegion) throws IOException {
|
final boolean inMemstore) throws IOException {
|
||||||
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
|
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
|
||||||
disruptor.getRingBuffer());
|
disruptor.getRingBuffer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
|
||||||
/**
|
/**
|
||||||
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
|
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
|
||||||
* A subclass of {@link Entry} that carries extra info across the ring buffer such as
|
* A subclass of {@link Entry} that carries extra info across the ring buffer such as
|
||||||
* region sequence id (we want to use this later, just before we write the WAL to ensure region
|
* region sequenceid (we want to use this later, just before we write the WAL to ensure region
|
||||||
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
|
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
|
||||||
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
|
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
|
||||||
* the assign of the region sequence id. See #stampRegionSequenceId().
|
* the assign of the region sequence id. See #stampRegionSequenceId().
|
||||||
|
@ -50,17 +50,32 @@ class FSWALEntry extends Entry {
|
||||||
// The below data members are denoted 'transient' just to highlight these are not persisted;
|
// The below data members are denoted 'transient' just to highlight these are not persisted;
|
||||||
// they are only in memory and held here while passing over the ring buffer.
|
// they are only in memory and held here while passing over the ring buffer.
|
||||||
private final transient long txid;
|
private final transient long txid;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If false, means this is a meta edit written by the hbase system itself. It was not in
|
||||||
|
* memstore. HBase uses these edit types to note in the log operational transitions such
|
||||||
|
* as compactions, flushes, or region open/closes.
|
||||||
|
*/
|
||||||
private final transient boolean inMemstore;
|
private final transient boolean inMemstore;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set if this is a meta edit and it is of close region type.
|
||||||
|
*/
|
||||||
private final transient boolean closeRegion;
|
private final transient boolean closeRegion;
|
||||||
|
|
||||||
private final transient RegionInfo regionInfo;
|
private final transient RegionInfo regionInfo;
|
||||||
private final transient Set<byte[]> familyNames;
|
private final transient Set<byte[]> familyNames;
|
||||||
private final transient ServerCall<?> rpcCall;
|
private final transient ServerCall<?> rpcCall;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param inMemstore If true, then this is a data edit, one that came from client. If false, it
|
||||||
|
* is a meta edit made by the hbase system itself and is for the WAL only.
|
||||||
|
*/
|
||||||
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
|
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
|
||||||
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
|
final boolean inMemstore, ServerCall<?> rpcCall) {
|
||||||
super(key, edit);
|
super(key, edit);
|
||||||
this.inMemstore = inMemstore;
|
this.inMemstore = inMemstore;
|
||||||
this.closeRegion = closeRegion;
|
this.closeRegion = !inMemstore && edit.isRegionCloseMarker();
|
||||||
this.regionInfo = regionInfo;
|
this.regionInfo = regionInfo;
|
||||||
this.txid = txid;
|
this.txid = txid;
|
||||||
if (inMemstore) {
|
if (inMemstore) {
|
||||||
|
@ -68,7 +83,7 @@ class FSWALEntry extends Entry {
|
||||||
Set<byte[]> families = edit.getFamilies();
|
Set<byte[]> families = edit.getFamilies();
|
||||||
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
|
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
|
||||||
} else {
|
} else {
|
||||||
this.familyNames = Collections.<byte[]> emptySet();
|
this.familyNames = Collections.emptySet();
|
||||||
}
|
}
|
||||||
this.rpcCall = rpcCall;
|
this.rpcCall = rpcCall;
|
||||||
if (rpcCall != null) {
|
if (rpcCall != null) {
|
||||||
|
@ -83,7 +98,7 @@ class FSWALEntry extends Entry {
|
||||||
} else {
|
} else {
|
||||||
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||||
for (Cell cell: cells) {
|
for (Cell cell: cells) {
|
||||||
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
|
if (!WALEdit.isMetaEditFamily(cell)) {
|
||||||
set.add(CellUtil.cloneFamily(cell));
|
set.add(CellUtil.cloneFamily(cell));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -94,7 +109,7 @@ class FSWALEntry extends Entry {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "sequence=" + this.txid + ", " + super.toString();
|
return "sequence=" + this.txid + ", " + super.toString();
|
||||||
};
|
}
|
||||||
|
|
||||||
boolean isInMemStore() {
|
boolean isInMemStore() {
|
||||||
return this.inMemstore;
|
return this.inMemstore;
|
||||||
|
|
|
@ -38,13 +38,11 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* Accounting of sequence ids per region and then by column family. So we can keep our accounting
|
||||||
* Accounting of sequence ids per region and then by column family. So we can our accounting
|
|
||||||
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
|
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
|
||||||
* keep abreast of the state of sequence id persistence. Also call update per append.
|
* keep abreast of the state of sequence id persistence. Also call update per append.
|
||||||
* </p>
|
|
||||||
* <p>
|
* <p>
|
||||||
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
|
* For the implementation, we assume that all the {@code encodedRegionName} passed in are gotten by
|
||||||
* {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
|
* {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
|
||||||
* it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
|
* it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
|
||||||
* hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
|
* hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
|
||||||
|
@ -53,8 +51,8 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class SequenceIdAccounting {
|
class SequenceIdAccounting {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
|
* This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
|
||||||
* {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
|
* {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
|
||||||
|
@ -110,7 +108,6 @@ class SequenceIdAccounting {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the lowest unflushed sequence id for the region.
|
* Returns the lowest unflushed sequence id for the region.
|
||||||
* @param encodedRegionName
|
|
||||||
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
|
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
|
||||||
* return {@link HConstants#NO_SEQNUM} when none.
|
* return {@link HConstants#NO_SEQNUM} when none.
|
||||||
*/
|
*/
|
||||||
|
@ -125,8 +122,6 @@ class SequenceIdAccounting {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param encodedRegionName
|
|
||||||
* @param familyName
|
|
||||||
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
|
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
|
||||||
* <code>familyName</code>. Returned sequenceid may be for an edit currently being
|
* <code>familyName</code>. Returned sequenceid may be for an edit currently being
|
||||||
* flushed.
|
* flushed.
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -68,7 +67,7 @@ public class WALUtil {
|
||||||
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
|
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
|
||||||
MultiVersionConcurrencyControl mvcc) throws IOException {
|
MultiVersionConcurrencyControl mvcc) throws IOException {
|
||||||
WALKeyImpl walKey =
|
WALKeyImpl walKey =
|
||||||
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), false, mvcc, null);
|
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
|
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
|
||||||
}
|
}
|
||||||
|
@ -84,7 +83,7 @@ public class WALUtil {
|
||||||
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
|
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
|
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
|
||||||
WALEdit.createFlushWALEdit(hri, f), false, mvcc, null, sync);
|
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
|
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
|
||||||
}
|
}
|
||||||
|
@ -96,11 +95,11 @@ public class WALUtil {
|
||||||
* only. Not for external client consumption.
|
* only. Not for external client consumption.
|
||||||
*/
|
*/
|
||||||
public static WALKeyImpl writeRegionEventMarker(WAL wal,
|
public static WALKeyImpl writeRegionEventMarker(WAL wal,
|
||||||
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final RegionEventDescriptor r,
|
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
|
||||||
final MultiVersionConcurrencyControl mvcc) throws IOException {
|
MultiVersionConcurrencyControl mvcc)
|
||||||
WALKeyImpl walKey =
|
throws IOException {
|
||||||
writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r),
|
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
|
||||||
r.getEventType() == RegionEventDescriptor.EventType.REGION_CLOSE, mvcc, null);
|
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
|
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
|
||||||
}
|
}
|
||||||
|
@ -122,7 +121,7 @@ public class WALUtil {
|
||||||
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
|
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
|
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
|
||||||
WALEdit.createBulkLoadEvent(hri, desc), false, mvcc, null);
|
WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
|
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
|
||||||
}
|
}
|
||||||
|
@ -130,11 +129,11 @@ public class WALUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static WALKeyImpl writeMarker(final WAL wal,
|
private static WALKeyImpl writeMarker(final WAL wal,
|
||||||
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
|
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, WALEdit edit, MultiVersionConcurrencyControl mvcc,
|
||||||
boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
|
Map<String, byte[]> extendedAttributes)
|
||||||
final Map<String, byte[]> extendedAttributes) throws IOException {
|
throws IOException {
|
||||||
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
|
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
|
||||||
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, closeRegion, mvcc,
|
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc,
|
||||||
extendedAttributes, true);
|
extendedAttributes, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,16 +145,16 @@ public class WALUtil {
|
||||||
* This write is for internal use only. Not for external client consumption.
|
* This write is for internal use only. Not for external client consumption.
|
||||||
* @return WALKeyImpl that was added to the WAL.
|
* @return WALKeyImpl that was added to the WAL.
|
||||||
*/
|
*/
|
||||||
private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
|
private static WALKeyImpl doFullMarkerAppendTransaction(WAL wal,
|
||||||
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
|
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final WALEdit edit,
|
||||||
boolean closeRegion, final MultiVersionConcurrencyControl mvcc,
|
MultiVersionConcurrencyControl mvcc, Map<String, byte[]> extendedAttributes, boolean sync)
|
||||||
final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
|
throws IOException {
|
||||||
// TODO: Pass in current time to use?
|
// TODO: Pass in current time to use?
|
||||||
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
|
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
|
||||||
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
|
System.currentTimeMillis(), mvcc, replicationScope, extendedAttributes);
|
||||||
long trx = MultiVersionConcurrencyControl.NONE;
|
long trx = MultiVersionConcurrencyControl.NONE;
|
||||||
try {
|
try {
|
||||||
trx = wal.appendMarker(hri, walKey, edit, closeRegion);
|
trx = wal.appendMarker(hri, walKey, edit);
|
||||||
if (sync) {
|
if (sync) {
|
||||||
wal.sync(trx);
|
wal.sync(trx);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
@ -74,7 +73,7 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
|
||||||
}
|
}
|
||||||
// For replay, or if all the cells are markers, do not need to store replication scope.
|
// For replay, or if all the cells are markers, do not need to store replication scope.
|
||||||
if (logEdit.isReplay() ||
|
if (logEdit.isReplay() ||
|
||||||
logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) {
|
logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c))) {
|
||||||
((WALKeyImpl) logKey).clearReplicationScope();
|
((WALKeyImpl) logKey).clearReplicationScope();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -162,17 +162,17 @@ class DisabledWALProvider implements WALProvider {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
|
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
|
||||||
return append(info, key, edits, true, false);
|
return append(info, key, edits, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
|
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return append(info, key, edits, false, closeRegion);
|
return append(info, key, edits, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
private long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||||
boolean closeRegion) throws IOException {
|
throws IOException {
|
||||||
WriteEntry writeEntry = key.getMvcc().begin();
|
WriteEntry writeEntry = key.getMvcc().begin();
|
||||||
if (!edits.isReplay()) {
|
if (!edits.isReplay()) {
|
||||||
for (Cell cell : edits.getCells()) {
|
for (Cell cell : edits.getCells()) {
|
||||||
|
|
|
@ -362,7 +362,7 @@ public class LogRecoveredEditsOutputSink extends OutputSink {
|
||||||
// We make the assumption that most cells will be kept.
|
// We make the assumption that most cells will be kept.
|
||||||
ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
|
ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
|
||||||
for (Cell cell : logEntry.getEdit().getCells()) {
|
for (Cell cell : logEntry.getEdit().getCells()) {
|
||||||
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
|
if (WALEdit.isMetaEditFamily(cell)) {
|
||||||
keptCells.add(cell);
|
keptCells.add(cell);
|
||||||
} else {
|
} else {
|
||||||
byte[] family = CellUtil.cloneFamily(cell);
|
byte[] family = CellUtil.cloneFamily(cell);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -81,7 +81,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
||||||
* can clean logs. Returns null if nothing to flush. Names are actual
|
* can clean logs. Returns null if nothing to flush. Names are actual
|
||||||
* region names as returned by {@link RegionInfo#getEncodedName()}
|
* region names as returned by {@link RegionInfo#getEncodedName()}
|
||||||
*/
|
*/
|
||||||
byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException;
|
byte[][] rollWriter(boolean force) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
|
* Stop accepting new writes. If we have unsynced writes still in buffer, sync them.
|
||||||
|
@ -99,7 +99,7 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append a set of data edits to the WAL. 'Data' here means that the content in the edits will
|
* Append a set of data edits to the WAL. 'Data' here means that the content in the edits will
|
||||||
* also be added to memstore.
|
* also have transitioned through the memstore.
|
||||||
* <p/>
|
* <p/>
|
||||||
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
|
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
|
||||||
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
|
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
|
||||||
|
@ -110,13 +110,16 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
||||||
* sequence id that is after all currently appended edits.
|
* sequence id that is after all currently appended edits.
|
||||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||||
* in it.
|
* in it.
|
||||||
* @see #appendMarker(RegionInfo, WALKeyImpl, WALEdit, boolean)
|
* @see #appendMarker(RegionInfo, WALKeyImpl, WALEdit)
|
||||||
*/
|
*/
|
||||||
long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException;
|
long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append a marker edit to the WAL. A marker could be a FlushDescriptor, a compaction marker, or
|
* Append an operational 'meta' event marker edit to the WAL. A marker meta edit could
|
||||||
* region event marker. The difference here is that, a marker will not be added to memstore.
|
* be a FlushDescriptor, a compaction marker, or a region event marker; e.g. region open
|
||||||
|
* or region close. The difference between a 'marker' append and a 'data' append as in
|
||||||
|
* {@link #appendData(RegionInfo, WALKeyImpl, WALEdit)}is that a marker will not have
|
||||||
|
* transitioned through the memstore.
|
||||||
* <p/>
|
* <p/>
|
||||||
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
|
* The WAL is not flushed/sync'd after this transaction completes BUT on return this edit must
|
||||||
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
|
* have its region edit/sequence id assigned else it messes up our unification of mvcc and
|
||||||
|
@ -125,15 +128,11 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
||||||
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
||||||
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
||||||
* sequence id that is after all currently appended edits.
|
* sequence id that is after all currently appended edits.
|
||||||
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
|
|
||||||
* region on this region server. The WAL implementation should remove all the related
|
|
||||||
* stuff, for example, the sequence id accounting.
|
|
||||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||||
* in it.
|
* in it.
|
||||||
* @see #appendData(RegionInfo, WALKeyImpl, WALEdit)
|
* @see #appendData(RegionInfo, WALKeyImpl, WALEdit)
|
||||||
*/
|
*/
|
||||||
long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
|
long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException;
|
||||||
throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* updates the seuence number of a specific store.
|
* updates the seuence number of a specific store.
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
/**
|
/*
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -37,8 +36,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||||
|
@ -52,45 +49,103 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
|
||||||
* single record, in PB format, followed (optionally) by Cells written via the WALCellEncoder.
|
* single record, in PB format, followed (optionally) by Cells written via the WALCellEncoder.
|
||||||
* <p>This class is LimitedPrivate for CPs to read-only. The {@link #add} methods are
|
* <p>This class is LimitedPrivate for CPs to read-only. The {@link #add} methods are
|
||||||
* classified as private methods, not for use by CPs.</p>
|
* classified as private methods, not for use by CPs.</p>
|
||||||
|
*
|
||||||
|
* <p>A particular WALEdit 'type' is the 'meta' type used to mark key operational
|
||||||
|
* events in the WAL such as compaction, flush, or region open. These meta types do not traverse
|
||||||
|
* hbase memstores. They are edits made by the hbase system rather than edit data submitted by
|
||||||
|
* clients. They only show in the WAL. These 'Meta' types have not been formally specified
|
||||||
|
* (or made into an explicit class type). They evolved organically. HBASE-8457 suggests codifying
|
||||||
|
* a WALEdit 'type' by adding a type field to WALEdit that gets serialized into the WAL. TODO.
|
||||||
|
* Would have to work on the consumption-side. Reading WALs on replay we seem to consume
|
||||||
|
* a Cell-at-a-time rather than by WALEdit. We are already in the below going out of our
|
||||||
|
* way to figure particular types -- e.g. if a compaction, replay, or close meta Marker -- during
|
||||||
|
* normal processing so would make sense to do this. Current system is an awkward marking of Cell
|
||||||
|
* columnfamily as {@link #METAFAMILY} and then setting qualifier based off meta edit type. For
|
||||||
|
* replay-time where we read Cell-at-a-time, there are utility methods below for figuring
|
||||||
|
* meta type. See also
|
||||||
|
* {@link #createBulkLoadEvent(RegionInfo, WALProtos.BulkLoadDescriptor)}, etc., for where we
|
||||||
|
* create meta WALEdit instances.</p>
|
||||||
|
*
|
||||||
* <p>WALEdit will accumulate a Set of all column family names referenced by the Cells
|
* <p>WALEdit will accumulate a Set of all column family names referenced by the Cells
|
||||||
* {@link #add(Cell)}'d. This is an optimization. Usually when loading a WALEdit, we have the
|
* {@link #add(Cell)}'d. This is an optimization. Usually when loading a WALEdit, we have the
|
||||||
* column family name to-hand.. just shove it into the WALEdit if available. Doing this, we can
|
* column family name to-hand.. just shove it into the WALEdit if available. Doing this, we can
|
||||||
* save on a parse of each Cell to figure column family down the line when we go to add the
|
* save on a parse of each Cell to figure column family down the line when we go to add the
|
||||||
* WALEdit to the WAL file. See the hand-off in FSWALEntry Constructor.
|
* WALEdit to the WAL file. See the hand-off in FSWALEntry Constructor.
|
||||||
|
* @see WALKey
|
||||||
*/
|
*/
|
||||||
// TODO: Do not expose this class to Coprocessors. It has set methods. A CP might meddle.
|
// TODO: Do not expose this class to Coprocessors. It has set methods. A CP might meddle.
|
||||||
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
|
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
|
||||||
HBaseInterfaceAudience.COPROC })
|
HBaseInterfaceAudience.COPROC })
|
||||||
public class WALEdit implements HeapSize {
|
public class WALEdit implements HeapSize {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(WALEdit.class);
|
// Below defines are for writing WALEdit 'meta' Cells..
|
||||||
|
// TODO: Get rid of this system of special 'meta' Cells. See HBASE-8457. It suggests
|
||||||
// TODO: Get rid of this; see HBASE-8457
|
// adding a type to WALEdit itself for use denoting meta Edits and their types.
|
||||||
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
|
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
|
||||||
@VisibleForTesting
|
|
||||||
|
/**
|
||||||
|
* @deprecated Since 2.3.0. Not used.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public static final byte [] METAROW = Bytes.toBytes("METAROW");
|
public static final byte [] METAROW = Bytes.toBytes("METAROW");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Since 2.3.0. Make it protected, internal-use only. Use
|
||||||
|
* {@link #isCompactionMarker(Cell)}
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
|
public static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Since 2.3.0. Make it protected, internal-use only.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
|
public static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
|
||||||
@VisibleForTesting
|
|
||||||
public static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT");
|
/**
|
||||||
|
* Qualifier for region event meta 'Marker' WALEdits start with the
|
||||||
|
* {@link #REGION_EVENT_PREFIX} prefix ('HBASE::REGION_EVENT::'). After the prefix,
|
||||||
|
* we note the type of the event which we get from the RegionEventDescriptor protobuf
|
||||||
|
* instance type (A RegionEventDescriptor protobuf instance is written as the meta Marker
|
||||||
|
* Cell value). Adding a type suffix means we do not have to deserialize the protobuf to
|
||||||
|
* figure out what type of event this is.. .just read the qualifier suffix. For example,
|
||||||
|
* a close region event descriptor will have a qualifier of HBASE::REGION_EVENT::REGION_CLOSE.
|
||||||
|
* See WAL.proto and the EventType in RegionEventDescriptor protos for all possible
|
||||||
|
* event types.
|
||||||
|
*/
|
||||||
|
private static final String REGION_EVENT_STR = "HBASE::REGION_EVENT";
|
||||||
|
private static final String REGION_EVENT_PREFIX_STR = REGION_EVENT_STR + "::";
|
||||||
|
private static final byte [] REGION_EVENT_PREFIX = Bytes.toBytes(REGION_EVENT_PREFIX_STR);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Since 2.3.0. Remove. Not for external use. Not used.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public static final byte [] REGION_EVENT = Bytes.toBytes(REGION_EVENT_STR);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We use this define figuring if we are carrying a close event.
|
||||||
|
*/
|
||||||
|
private static final byte [] REGION_EVENT_CLOSE =
|
||||||
|
createRegionEventDescriptorQualifier(RegionEventDescriptor.EventType.REGION_CLOSE);
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
|
public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
|
||||||
|
|
||||||
private final boolean replay;
|
private final transient boolean replay;
|
||||||
|
|
||||||
private ArrayList<Cell> cells = null;
|
private ArrayList<Cell> cells;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* All the Cell families in <code>cells</code>. Updated by {@link #add(Cell)} and
|
* All the Cell families in <code>cells</code>. Updated by {@link #add(Cell)} and
|
||||||
* {@link #add(Map)}. This Set is passed to the FSWALEntry so it does not have
|
* {@link #add(Map)}. This Set is passed to the FSWALEntry so it does not have
|
||||||
* to recalculate the Set of families in a transaction; makes for a bunch of CPU savings.
|
* to recalculate the Set of families in a transaction; makes for a bunch of CPU savings.
|
||||||
* An optimization that saves on CPU-expensive Cell-parsing.
|
|
||||||
*/
|
*/
|
||||||
private Set<byte []> families = null;
|
private Set<byte []> families = null;
|
||||||
|
|
||||||
public WALEdit() {
|
public WALEdit() {
|
||||||
this(false);
|
this(1, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -100,8 +155,8 @@ public class WALEdit implements HeapSize {
|
||||||
* @see <a href="https://issues.apache.org/jira/browse/HBASE-20781">HBASE-20781</a>
|
* @see <a href="https://issues.apache.org/jira/browse/HBASE-20781">HBASE-20781</a>
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public WALEdit(boolean isReplay) {
|
public WALEdit(boolean replay) {
|
||||||
this(1, isReplay);
|
this(1, replay);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -125,7 +180,7 @@ public class WALEdit implements HeapSize {
|
||||||
|
|
||||||
private Set<byte[]> getOrCreateFamilies() {
|
private Set<byte[]> getOrCreateFamilies() {
|
||||||
if (this.families == null) {
|
if (this.families == null) {
|
||||||
this.families = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
|
this.families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||||
}
|
}
|
||||||
return this.families;
|
return this.families;
|
||||||
}
|
}
|
||||||
|
@ -140,22 +195,26 @@ public class WALEdit implements HeapSize {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True is <code>f</code> is {@link #METAFAMILY}
|
* @return True is <code>f</code> is {@link #METAFAMILY}
|
||||||
|
* @deprecated Since 2.3.0. Do not expose. Make protected.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public static boolean isMetaEditFamily(final byte [] f) {
|
public static boolean isMetaEditFamily(final byte [] f) {
|
||||||
return Bytes.equals(METAFAMILY, f);
|
return Bytes.equals(METAFAMILY, f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replaying WALs can read Cell-at-a-time so need this method in those cases.
|
||||||
|
*/
|
||||||
public static boolean isMetaEditFamily(Cell cell) {
|
public static boolean isMetaEditFamily(Cell cell) {
|
||||||
return CellUtil.matchingFamily(cell, METAFAMILY);
|
return CellUtil.matchingFamily(cell, METAFAMILY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return True if this is a meta edit; has one edit only and its columnfamily
|
||||||
|
* is {@link #METAFAMILY}.
|
||||||
|
*/
|
||||||
public boolean isMetaEdit() {
|
public boolean isMetaEdit() {
|
||||||
for (Cell cell: cells) {
|
return this.families != null && this.families.size() == 1 && this.families.contains(METAFAMILY);
|
||||||
if (!isMetaEditFamily(cell)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -215,7 +274,7 @@ public class WALEdit implements HeapSize {
|
||||||
cells.clear();
|
cells.clear();
|
||||||
cells.ensureCapacity(expectedCount);
|
cells.ensureCapacity(expectedCount);
|
||||||
while (cells.size() < expectedCount && cellDecoder.advance()) {
|
while (cells.size() < expectedCount && cellDecoder.advance()) {
|
||||||
cells.add(cellDecoder.current());
|
add(cellDecoder.current());
|
||||||
}
|
}
|
||||||
return cells.size();
|
return cells.size();
|
||||||
}
|
}
|
||||||
|
@ -241,7 +300,7 @@ public class WALEdit implements HeapSize {
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
|
|
||||||
sb.append("[#edits: " + cells.size() + " = <");
|
sb.append("[#edits: ").append(cells.size()).append(" = <");
|
||||||
for (Cell cell : cells) {
|
for (Cell cell : cells) {
|
||||||
sb.append(cell);
|
sb.append(cell);
|
||||||
sb.append("; ");
|
sb.append("; ");
|
||||||
|
@ -257,30 +316,61 @@ public class WALEdit implements HeapSize {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FlushDescriptor getFlushDescriptor(Cell cell) throws IOException {
|
public static FlushDescriptor getFlushDescriptor(Cell cell) throws IOException {
|
||||||
if (CellUtil.matchingColumn(cell, METAFAMILY, FLUSH)) {
|
return CellUtil.matchingColumn(cell, METAFAMILY, FLUSH)?
|
||||||
return FlushDescriptor.parseFrom(CellUtil.cloneValue(cell));
|
FlushDescriptor.parseFrom(CellUtil.cloneValue(cell)): null;
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return A meta Marker WALEdit that has a single Cell whose value is the passed in
|
||||||
|
* <code>regionEventDesc</code> serialized and whose row is this region,
|
||||||
|
* columnfamily is {@link #METAFAMILY} and qualifier is
|
||||||
|
* {@link #REGION_EVENT_PREFIX} + {@link RegionEventDescriptor#getEventType()};
|
||||||
|
* for example HBASE::REGION_EVENT::REGION_CLOSE.
|
||||||
|
*/
|
||||||
public static WALEdit createRegionEventWALEdit(RegionInfo hri,
|
public static WALEdit createRegionEventWALEdit(RegionInfo hri,
|
||||||
RegionEventDescriptor regionEventDesc) {
|
RegionEventDescriptor regionEventDesc) {
|
||||||
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, REGION_EVENT,
|
return createRegionEventWALEdit(getRowForRegion(hri), regionEventDesc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static WALEdit createRegionEventWALEdit(byte [] rowForRegion,
|
||||||
|
RegionEventDescriptor regionEventDesc) {
|
||||||
|
KeyValue kv = new KeyValue(rowForRegion, METAFAMILY,
|
||||||
|
createRegionEventDescriptorQualifier(regionEventDesc.getEventType()),
|
||||||
EnvironmentEdgeManager.currentTime(), regionEventDesc.toByteArray());
|
EnvironmentEdgeManager.currentTime(), regionEventDesc.toByteArray());
|
||||||
return new WALEdit().add(kv, METAFAMILY);
|
return new WALEdit().add(kv, METAFAMILY);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RegionEventDescriptor getRegionEventDescriptor(Cell cell) throws IOException {
|
/**
|
||||||
if (CellUtil.matchingColumn(cell, METAFAMILY, REGION_EVENT)) {
|
* @return Cell qualifier for the passed in RegionEventDescriptor Type; e.g. we'll
|
||||||
return RegionEventDescriptor.parseFrom(CellUtil.cloneValue(cell));
|
* return something like a byte array with HBASE::REGION_EVENT::REGION_OPEN in it.
|
||||||
}
|
*/
|
||||||
return null;
|
@VisibleForTesting
|
||||||
|
public static byte [] createRegionEventDescriptorQualifier(RegionEventDescriptor.EventType t) {
|
||||||
|
return Bytes.toBytes(REGION_EVENT_PREFIX_STR + t.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a compaction WALEdit
|
* Public so can be accessed from regionserver.wal package.
|
||||||
* @param c
|
* @return True if this is a Marker Edit and it is a RegionClose type.
|
||||||
* @return A WALEdit that has <code>c</code> serialized as its value
|
*/
|
||||||
|
public boolean isRegionCloseMarker() {
|
||||||
|
return isMetaEdit() && PrivateCellUtil.matchingQualifier(this.cells.get(0),
|
||||||
|
REGION_EVENT_CLOSE, 0, REGION_EVENT_CLOSE.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Returns a RegionEventDescriptor made by deserializing the content of the
|
||||||
|
* passed in <code>cell</code>, IFF the <code>cell</code> is a RegionEventDescriptor
|
||||||
|
* type WALEdit.
|
||||||
|
*/
|
||||||
|
public static RegionEventDescriptor getRegionEventDescriptor(Cell cell) throws IOException {
|
||||||
|
return CellUtil.matchingColumnFamilyAndQualifierPrefix(cell, METAFAMILY, REGION_EVENT_PREFIX)?
|
||||||
|
RegionEventDescriptor.parseFrom(CellUtil.cloneValue(cell)): null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return A Marker WALEdit that has <code>c</code> serialized as its value
|
||||||
*/
|
*/
|
||||||
public static WALEdit createCompaction(final RegionInfo hri, final CompactionDescriptor c) {
|
public static WALEdit createCompaction(final RegionInfo hri, final CompactionDescriptor c) {
|
||||||
byte [] pbbytes = c.toByteArray();
|
byte [] pbbytes = c.toByteArray();
|
||||||
|
@ -305,10 +395,7 @@ public class WALEdit implements HeapSize {
|
||||||
* @return deserialized CompactionDescriptor or null.
|
* @return deserialized CompactionDescriptor or null.
|
||||||
*/
|
*/
|
||||||
public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
|
public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
|
||||||
if (isCompactionMarker(kv)) {
|
return isCompactionMarker(kv)? CompactionDescriptor.parseFrom(CellUtil.cloneValue(kv)): null;
|
||||||
return CompactionDescriptor.parseFrom(CellUtil.cloneValue(kv));
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -329,11 +416,8 @@ public class WALEdit implements HeapSize {
|
||||||
*/
|
*/
|
||||||
public static WALEdit createBulkLoadEvent(RegionInfo hri,
|
public static WALEdit createBulkLoadEvent(RegionInfo hri,
|
||||||
WALProtos.BulkLoadDescriptor bulkLoadDescriptor) {
|
WALProtos.BulkLoadDescriptor bulkLoadDescriptor) {
|
||||||
KeyValue kv = new KeyValue(getRowForRegion(hri),
|
KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, BULK_LOAD,
|
||||||
METAFAMILY,
|
EnvironmentEdgeManager.currentTime(), bulkLoadDescriptor.toByteArray());
|
||||||
BULK_LOAD,
|
|
||||||
EnvironmentEdgeManager.currentTime(),
|
|
||||||
bulkLoadDescriptor.toByteArray());
|
|
||||||
return new WALEdit().add(kv, METAFAMILY);
|
return new WALEdit().add(kv, METAFAMILY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,17 +427,16 @@ public class WALEdit implements HeapSize {
|
||||||
* @return deserialized BulkLoadDescriptor or null.
|
* @return deserialized BulkLoadDescriptor or null.
|
||||||
*/
|
*/
|
||||||
public static WALProtos.BulkLoadDescriptor getBulkLoadDescriptor(Cell cell) throws IOException {
|
public static WALProtos.BulkLoadDescriptor getBulkLoadDescriptor(Cell cell) throws IOException {
|
||||||
if (CellUtil.matchingColumn(cell, METAFAMILY, BULK_LOAD)) {
|
return CellUtil.matchingColumn(cell, METAFAMILY, BULK_LOAD)?
|
||||||
return WALProtos.BulkLoadDescriptor.parseFrom(CellUtil.cloneValue(cell));
|
WALProtos.BulkLoadDescriptor.parseFrom(CellUtil.cloneValue(cell)): null;
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append the given map of family->edits to a WALEdit data structure.
|
* Append the given map of family->edits to a WALEdit data structure.
|
||||||
* This does not write to the WAL itself.
|
* This does not write to the WAL itself.
|
||||||
* Note that as an optimization, we will stamp the Set of column families into the WALEdit
|
* Note that as an optimization, we will stamp the Set of column families into the WALEdit
|
||||||
* to save on our having to calculate it subsequently way down in the actual WAL writing.
|
* to save on our having to calculate column families subsequently down in the actual WAL
|
||||||
|
* writing.
|
||||||
*
|
*
|
||||||
* @param familyMap map of family->edits
|
* @param familyMap map of family->edits
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -113,10 +112,9 @@ public class TestBulkLoad {
|
||||||
storeFileName = (new Path(storeFileName)).getName();
|
storeFileName = (new Path(storeFileName)).getName();
|
||||||
List<String> storeFileNames = new ArrayList<>();
|
List<String> storeFileNames = new ArrayList<>();
|
||||||
storeFileNames.add(storeFileName);
|
storeFileNames.add(storeFileName);
|
||||||
when(log.appendMarker(any(), any(),
|
when(log.appendMarker(any(), any(), argThat(
|
||||||
argThat(bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(),
|
bulkLogWalEdit(WALEdit.BULK_LOAD, tableName.toBytes(), familyName, storeFileNames)))).
|
||||||
familyName, storeFileNames)),
|
thenAnswer(new Answer() {
|
||||||
anyBoolean())).thenAnswer(new Answer() {
|
|
||||||
@Override
|
@Override
|
||||||
public Object answer(InvocationOnMock invocation) {
|
public Object answer(InvocationOnMock invocation) {
|
||||||
WALKeyImpl walKey = invocation.getArgument(1);
|
WALKeyImpl walKey = invocation.getArgument(1);
|
||||||
|
@ -141,8 +139,7 @@ public class TestBulkLoad {
|
||||||
@Test
|
@Test
|
||||||
public void shouldBulkLoadSingleFamilyHLog() throws IOException {
|
public void shouldBulkLoadSingleFamilyHLog() throws IOException {
|
||||||
when(log.appendMarker(any(),
|
when(log.appendMarker(any(),
|
||||||
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
|
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
|
||||||
anyBoolean())).thenAnswer(new Answer() {
|
|
||||||
@Override
|
@Override
|
||||||
public Object answer(InvocationOnMock invocation) {
|
public Object answer(InvocationOnMock invocation) {
|
||||||
WALKeyImpl walKey = invocation.getArgument(1);
|
WALKeyImpl walKey = invocation.getArgument(1);
|
||||||
|
@ -161,8 +158,7 @@ public class TestBulkLoad {
|
||||||
@Test
|
@Test
|
||||||
public void shouldBulkLoadManyFamilyHLog() throws IOException {
|
public void shouldBulkLoadManyFamilyHLog() throws IOException {
|
||||||
when(log.appendMarker(any(),
|
when(log.appendMarker(any(),
|
||||||
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
|
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
|
||||||
anyBoolean())).thenAnswer(new Answer() {
|
|
||||||
@Override
|
@Override
|
||||||
public Object answer(InvocationOnMock invocation) {
|
public Object answer(InvocationOnMock invocation) {
|
||||||
WALKeyImpl walKey = invocation.getArgument(1);
|
WALKeyImpl walKey = invocation.getArgument(1);
|
||||||
|
@ -182,8 +178,7 @@ public class TestBulkLoad {
|
||||||
@Test
|
@Test
|
||||||
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
|
public void shouldBulkLoadManyFamilyHLogEvenWhenTableNameNamespaceSpecified() throws IOException {
|
||||||
when(log.appendMarker(any(),
|
when(log.appendMarker(any(),
|
||||||
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)),
|
any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))).thenAnswer(new Answer() {
|
||||||
anyBoolean())).thenAnswer(new Answer() {
|
|
||||||
@Override
|
@Override
|
||||||
public Object answer(InvocationOnMock invocation) {
|
public Object answer(InvocationOnMock invocation) {
|
||||||
WALKeyImpl walKey = invocation.getArgument(1);
|
WALKeyImpl walKey = invocation.getArgument(1);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.Server;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
@ -162,18 +161,17 @@ public class TestFailedAppendAndSync {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make up mocked server and services.
|
// Make up mocked server and services.
|
||||||
Server server = mock(Server.class);
|
|
||||||
when(server.getConfiguration()).thenReturn(CONF);
|
|
||||||
when(server.isStopped()).thenReturn(false);
|
|
||||||
when(server.isAborted()).thenReturn(false);
|
|
||||||
RegionServerServices services = mock(RegionServerServices.class);
|
RegionServerServices services = mock(RegionServerServices.class);
|
||||||
|
when(services.getConfiguration()).thenReturn(CONF);
|
||||||
|
when(services.isStopped()).thenReturn(false);
|
||||||
|
when(services.isAborted()).thenReturn(false);
|
||||||
// OK. Now I have my mocked up Server and RegionServerServices and my dodgy WAL, go ahead with
|
// OK. Now I have my mocked up Server and RegionServerServices and my dodgy WAL, go ahead with
|
||||||
// the test.
|
// the test.
|
||||||
FileSystem fs = FileSystem.get(CONF);
|
FileSystem fs = FileSystem.get(CONF);
|
||||||
Path rootDir = new Path(dir + getName());
|
Path rootDir = new Path(dir + getName());
|
||||||
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
|
DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
|
||||||
dodgyWAL.init();
|
dodgyWAL.init();
|
||||||
LogRoller logRoller = new LogRoller(server, services);
|
LogRoller logRoller = new LogRoller(services);
|
||||||
logRoller.addWAL(dodgyWAL);
|
logRoller.addWAL(dodgyWAL);
|
||||||
logRoller.start();
|
logRoller.start();
|
||||||
|
|
||||||
|
@ -224,7 +222,7 @@ public class TestFailedAppendAndSync {
|
||||||
// to just continue.
|
// to just continue.
|
||||||
|
|
||||||
// So, should be no abort at this stage. Verify.
|
// So, should be no abort at this stage. Verify.
|
||||||
Mockito.verify(server, Mockito.atLeast(0)).
|
Mockito.verify(services, Mockito.atLeast(0)).
|
||||||
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
|
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
|
||||||
try {
|
try {
|
||||||
dodgyWAL.throwAppendException = false;
|
dodgyWAL.throwAppendException = false;
|
||||||
|
@ -240,7 +238,7 @@ public class TestFailedAppendAndSync {
|
||||||
// happens. If it don't we'll timeout the whole test. That is fine.
|
// happens. If it don't we'll timeout the whole test. That is fine.
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
Mockito.verify(server, Mockito.atLeast(1)).
|
Mockito.verify(services, Mockito.atLeast(1)).
|
||||||
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
|
abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
|
||||||
break;
|
break;
|
||||||
} catch (WantedButNotInvoked t) {
|
} catch (WantedButNotInvoked t) {
|
||||||
|
@ -249,7 +247,7 @@ public class TestFailedAppendAndSync {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// To stop logRoller, its server has to say it is stopped.
|
// To stop logRoller, its server has to say it is stopped.
|
||||||
Mockito.when(server.isStopped()).thenReturn(true);
|
Mockito.when(services.isStopped()).thenReturn(true);
|
||||||
if (logRoller != null) logRoller.close();
|
if (logRoller != null) logRoller.close();
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -29,7 +29,6 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -5639,7 +5638,7 @@ public class TestHRegion {
|
||||||
TEST_UTIL.getConfiguration(), rss, null);
|
TEST_UTIL.getConfiguration(), rss, null);
|
||||||
|
|
||||||
verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
|
verify(wal, times(1)).appendMarker(any(RegionInfo.class), any(WALKeyImpl.class),
|
||||||
editCaptor.capture(), anyBoolean());
|
editCaptor.capture());
|
||||||
|
|
||||||
WALEdit edit = editCaptor.getValue();
|
WALEdit edit = editCaptor.getValue();
|
||||||
assertNotNull(edit);
|
assertNotNull(edit);
|
||||||
|
@ -5721,8 +5720,8 @@ public class TestHRegion {
|
||||||
return 1L;
|
return 1L;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class),
|
when(wal.appendMarker(any(RegionInfo.class), any(WALKeyImpl.class), any(WALEdit.class))).
|
||||||
anyBoolean())).thenAnswer(new Answer<Long>() {
|
thenAnswer(new Answer<Long>() {
|
||||||
@Override
|
@Override
|
||||||
public Long answer(InvocationOnMock invocation) throws Throwable {
|
public Long answer(InvocationOnMock invocation) throws Throwable {
|
||||||
WALKeyImpl key = invocation.getArgument(1);
|
WALKeyImpl key = invocation.getArgument(1);
|
||||||
|
@ -5763,8 +5762,8 @@ public class TestHRegion {
|
||||||
region.close(false);
|
region.close(false);
|
||||||
|
|
||||||
// 2 times, one for region open, the other close region
|
// 2 times, one for region open, the other close region
|
||||||
verify(wal, times(2)).appendMarker(any(RegionInfo.class), (WALKeyImpl) any(WALKeyImpl.class),
|
verify(wal, times(2)).appendMarker(any(RegionInfo.class),
|
||||||
editCaptor.capture(), anyBoolean());
|
(WALKeyImpl) any(WALKeyImpl.class), editCaptor.capture());
|
||||||
|
|
||||||
WALEdit edit = editCaptor.getAllValues().get(1);
|
WALEdit edit = editCaptor.getAllValues().get(1);
|
||||||
assertNotNull(edit);
|
assertNotNull(edit);
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -195,15 +194,18 @@ public class TestRecoveredEdits {
|
||||||
WALEdit val = entry.getEdit();
|
WALEdit val = entry.getEdit();
|
||||||
count++;
|
count++;
|
||||||
// Check this edit is for this region.
|
// Check this edit is for this region.
|
||||||
if (!Bytes
|
if (!Bytes.equals(key.getEncodedRegionName(),
|
||||||
.equals(key.getEncodedRegionName(), region.getRegionInfo().getEncodedNameAsBytes())) {
|
region.getRegionInfo().getEncodedNameAsBytes())) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Cell previous = null;
|
Cell previous = null;
|
||||||
for (Cell cell : val.getCells()) {
|
for (Cell cell : val.getCells()) {
|
||||||
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
|
if (WALEdit.isMetaEditFamily(cell)) {
|
||||||
if (previous != null && CellComparatorImpl.COMPARATOR.compareRows(previous, cell) == 0)
|
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
if (previous != null && CellComparatorImpl.COMPARATOR.compareRows(previous, cell) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
previous = cell;
|
previous = cell;
|
||||||
walCells.add(cell);
|
walCells.add(cell);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -204,11 +204,10 @@ public class TestWALLockup {
|
||||||
@Test
|
@Test
|
||||||
public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException {
|
public void testLockupWhenSyncInMiddleOfZigZagSetup() throws IOException {
|
||||||
// Mocked up server and regionserver services. Needed below.
|
// Mocked up server and regionserver services. Needed below.
|
||||||
Server server = Mockito.mock(Server.class);
|
|
||||||
Mockito.when(server.getConfiguration()).thenReturn(CONF);
|
|
||||||
Mockito.when(server.isStopped()).thenReturn(false);
|
|
||||||
Mockito.when(server.isAborted()).thenReturn(false);
|
|
||||||
RegionServerServices services = Mockito.mock(RegionServerServices.class);
|
RegionServerServices services = Mockito.mock(RegionServerServices.class);
|
||||||
|
Mockito.when(services.getConfiguration()).thenReturn(CONF);
|
||||||
|
Mockito.when(services.isStopped()).thenReturn(false);
|
||||||
|
Mockito.when(services.isAborted()).thenReturn(false);
|
||||||
|
|
||||||
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test.
|
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test.
|
||||||
FileSystem fs = FileSystem.get(CONF);
|
FileSystem fs = FileSystem.get(CONF);
|
||||||
|
@ -217,7 +216,7 @@ public class TestWALLockup {
|
||||||
dodgyWAL.init();
|
dodgyWAL.init();
|
||||||
Path originalWAL = dodgyWAL.getCurrentFileName();
|
Path originalWAL = dodgyWAL.getCurrentFileName();
|
||||||
// I need a log roller running.
|
// I need a log roller running.
|
||||||
LogRoller logRoller = new LogRoller(server, services);
|
LogRoller logRoller = new LogRoller(services);
|
||||||
logRoller.addWAL(dodgyWAL);
|
logRoller.addWAL(dodgyWAL);
|
||||||
// There is no 'stop' once a logRoller is running.. it just dies.
|
// There is no 'stop' once a logRoller is running.. it just dies.
|
||||||
logRoller.start();
|
logRoller.start();
|
||||||
|
@ -294,7 +293,7 @@ public class TestWALLockup {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// To stop logRoller, its server has to say it is stopped.
|
// To stop logRoller, its server has to say it is stopped.
|
||||||
Mockito.when(server.isStopped()).thenReturn(true);
|
Mockito.when(services.isStopped()).thenReturn(true);
|
||||||
Closeables.close(logRoller, true);
|
Closeables.close(logRoller, true);
|
||||||
try {
|
try {
|
||||||
if (region != null) {
|
if (region != null) {
|
||||||
|
@ -380,11 +379,10 @@ public class TestWALLockup {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mocked up server and regionserver services. Needed below.
|
// Mocked up server and regionserver services. Needed below.
|
||||||
final Server server = Mockito.mock(Server.class);
|
|
||||||
Mockito.when(server.getConfiguration()).thenReturn(CONF);
|
|
||||||
Mockito.when(server.isStopped()).thenReturn(false);
|
|
||||||
Mockito.when(server.isAborted()).thenReturn(false);
|
|
||||||
RegionServerServices services = Mockito.mock(RegionServerServices.class);
|
RegionServerServices services = Mockito.mock(RegionServerServices.class);
|
||||||
|
Mockito.when(services.getConfiguration()).thenReturn(CONF);
|
||||||
|
Mockito.when(services.isStopped()).thenReturn(false);
|
||||||
|
Mockito.when(services.isAborted()).thenReturn(false);
|
||||||
|
|
||||||
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test.
|
// OK. Now I have my mocked up Server & RegionServerServices and dodgy WAL, go ahead with test.
|
||||||
FileSystem fs = FileSystem.get(CONF);
|
FileSystem fs = FileSystem.get(CONF);
|
||||||
|
@ -392,7 +390,7 @@ public class TestWALLockup {
|
||||||
final DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
|
final DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF);
|
||||||
dodgyWAL.init();
|
dodgyWAL.init();
|
||||||
// I need a log roller running.
|
// I need a log roller running.
|
||||||
LogRoller logRoller = new LogRoller(server, services);
|
LogRoller logRoller = new LogRoller(services);
|
||||||
logRoller.addWAL(dodgyWAL);
|
logRoller.addWAL(dodgyWAL);
|
||||||
// There is no 'stop' once a logRoller is running.. it just dies.
|
// There is no 'stop' once a logRoller is running.. it just dies.
|
||||||
logRoller.start();
|
logRoller.start();
|
||||||
|
@ -433,7 +431,7 @@ public class TestWALLockup {
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
// To stop logRoller, its server has to say it is stopped.
|
// To stop logRoller, its server has to say it is stopped.
|
||||||
Mockito.when(server.isStopped()).thenReturn(true);
|
Mockito.when(services.isStopped()).thenReturn(true);
|
||||||
if (logRoller != null) {
|
if (logRoller != null) {
|
||||||
logRoller.close();
|
logRoller.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -429,7 +429,7 @@ public abstract class AbstractTestFSWAL {
|
||||||
final RegionInfo info = region.getRegionInfo();
|
final RegionInfo info = region.getRegionInfo();
|
||||||
final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
final WALKeyImpl logkey = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||||
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
|
System.currentTimeMillis(), clusterIds, -1, -1, region.getMVCC(), scopes);
|
||||||
wal.append(info, logkey, edits, true, false);
|
wal.append(info, logkey, edits, true);
|
||||||
region.getMVCC().completeAndWait(logkey.getWriteEntry());
|
region.getMVCC().completeAndWait(logkey.getWriteEntry());
|
||||||
}
|
}
|
||||||
region.flush(true);
|
region.flush(true);
|
||||||
|
@ -479,7 +479,7 @@ public abstract class AbstractTestFSWAL {
|
||||||
new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
|
new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
|
||||||
timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
|
timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
|
||||||
try {
|
try {
|
||||||
wal.append(ri, key, cols, true, false);
|
wal.append(ri, key, cols, true);
|
||||||
fail("Should fail since the wal has already been closed");
|
fail("Should fail since the wal has already been closed");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// expected
|
// expected
|
||||||
|
|
|
@ -1157,7 +1157,7 @@ public abstract class AbstractTestWALReplay {
|
||||||
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
|
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
|
||||||
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
|
int index, NavigableMap<byte[], Integer> scopes) throws IOException {
|
||||||
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
|
FSWALEntry entry = new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc, scopes),
|
||||||
createWALEdit(rowName, family, ee, index), hri, true, false, null);
|
createWALEdit(rowName, family, ee, index), hri, true, null);
|
||||||
entry.stampRegionSequenceId(mvcc.begin());
|
entry.stampRegionSequenceId(mvcc.begin());
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -35,7 +35,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.Server;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
@ -124,9 +123,8 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBrokenWriter() throws Exception {
|
public void testBrokenWriter() throws Exception {
|
||||||
Server server = mock(Server.class);
|
|
||||||
when(server.getConfiguration()).thenReturn(CONF);
|
|
||||||
RegionServerServices services = mock(RegionServerServices.class);
|
RegionServerServices services = mock(RegionServerServices.class);
|
||||||
|
when(services.getConfiguration()).thenReturn(CONF);
|
||||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
|
||||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||||
RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
|
RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
|
||||||
|
@ -138,7 +136,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
||||||
long timestamp = System.currentTimeMillis();
|
long timestamp = System.currentTimeMillis();
|
||||||
String testName = currentTest.getMethodName();
|
String testName = currentTest.getMethodName();
|
||||||
AtomicInteger failedCount = new AtomicInteger(0);
|
AtomicInteger failedCount = new AtomicInteger(0);
|
||||||
try (LogRoller roller = new LogRoller(server, services);
|
try (LogRoller roller = new LogRoller(services);
|
||||||
AsyncFSWAL wal = new AsyncFSWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
|
AsyncFSWAL wal = new AsyncFSWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
|
||||||
testName, CONF, null, true, null, null, GROUP, CHANNEL_CLASS) {
|
testName, CONF, null, true, null, null, GROUP, CHANNEL_CLASS) {
|
||||||
|
|
||||||
|
@ -196,7 +194,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
||||||
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
|
SequenceId.NO_SEQUENCE_ID, timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE,
|
||||||
HConstants.NO_NONCE, mvcc, scopes);
|
HConstants.NO_NONCE, mvcc, scopes);
|
||||||
try {
|
try {
|
||||||
wal.append(ri, key, cols, true, false);
|
wal.append(ri, key, cols, true);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// should not happen
|
// should not happen
|
||||||
throw new UncheckedIOException(e);
|
throw new UncheckedIOException(e);
|
||||||
|
|
|
@ -63,12 +63,12 @@ public class FaultyFSLog extends FSHLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
|
protected long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
|
||||||
boolean closeRegion) throws IOException {
|
throws IOException {
|
||||||
if (this.ft == FailureType.APPEND) {
|
if (this.ft == FailureType.APPEND) {
|
||||||
throw new IOException("append");
|
throw new IOException("append");
|
||||||
}
|
}
|
||||||
return super.append(info, key, edits, inMemstore, closeRegion);
|
return super.append(info, key, edits, inMemstore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1376,12 +1376,10 @@ public class TestWALSplit {
|
||||||
1,
|
1,
|
||||||
ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
|
ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
|
||||||
final long time = EnvironmentEdgeManager.currentTime();
|
final long time = EnvironmentEdgeManager.currentTime();
|
||||||
KeyValue kv = new KeyValue(Bytes.toBytes(region), WALEdit.METAFAMILY, WALEdit.REGION_EVENT,
|
|
||||||
time, regionOpenDesc.toByteArray());
|
|
||||||
final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
|
final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
|
||||||
HConstants.DEFAULT_CLUSTER_ID);
|
HConstants.DEFAULT_CLUSTER_ID);
|
||||||
w.append(
|
WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
|
||||||
new Entry(walKey, new WALEdit().add(kv)));
|
w.append(new Entry(walKey, we));
|
||||||
w.sync(false);
|
w.sync(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -321,7 +321,7 @@ public final class WALPerformanceEvaluation extends Configured implements Tool {
|
||||||
final HRegion[] regions = new HRegion[numRegions];
|
final HRegion[] regions = new HRegion[numRegions];
|
||||||
final Runnable[] benchmarks = new Runnable[numRegions];
|
final Runnable[] benchmarks = new Runnable[numRegions];
|
||||||
final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
|
final MockRegionServerServices mockServices = new MockRegionServerServices(getConf());
|
||||||
final LogRoller roller = new LogRoller(mockServices, mockServices);
|
final LogRoller roller = new LogRoller(mockServices);
|
||||||
Threads.setDaemonThreadRunning(roller.getThread(), "WALPerfEval.logRoller");
|
Threads.setDaemonThreadRunning(roller.getThread(), "WALPerfEval.logRoller");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue