HBASE-8701: distributedLogReplay need to apply wal edits in the receiving order of those edits

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1552828 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jeffreyz 2013-12-20 22:21:01 +00:00
parent d9e510d284
commit 7d411abdfb
17 changed files with 428 additions and 66 deletions

View File

@ -2053,11 +2053,39 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
return compare;
}
// Negate following comparisons so later edits show up first
// compare log replay tag value if there is any
// when either keyvalue tagged with log replay sequence number, we need to compare them:
// 1) when both keyvalues have the tag, then use the tag values for comparison
// 2) when one has and the other doesn't have, the one without the log replay tag wins because
// it means the edit isn't from recovery but new one coming from clients during recovery
// 3) when both doesn't have, then skip to the next mvcc comparison
long leftChangeSeqNum = getReplaySeqNum(left);
long RightChangeSeqNum = getReplaySeqNum(right);
if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) {
return Longs.compare(RightChangeSeqNum, leftChangeSeqNum);
}
// compare Mvcc Version
// Negate this comparison so later edits show up first
return Longs.compare(right.getMvccVersion(), left.getMvccVersion());
}
/**
* Return replay log sequence number for the cell
* @param c
* @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG
*/
private long getReplaySeqNum(final Cell c) {
Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(),
TagType.LOG_REPLAY_TAG_TYPE);
if(tag != null) {
return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength());
}
return Long.MAX_VALUE;
}
public int compareTimestamps(final KeyValue left, final KeyValue right) {
// Compare timestamps
long ltimestamp = left.getTimestamp(left.getKeyLength());
@ -2736,6 +2764,27 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
return new KeyValue(bytes, 0, length);
}
/**
* Create a new KeyValue by copying existing cell and adding new tags
* @param c
* @param newTags
* @return a new KeyValue instance with new tags
*/
public static KeyValue cloneAndAddTags(Cell c, List<Tag> newTags) {
List<Tag> existingTags = null;
if(c.getTagsLength() > 0) {
existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
existingTags.addAll(newTags);
} else {
existingTags = newTags;
}
return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(),
c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(),
c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(),
c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(),
c.getValueLength(), existingTags);
}
/**
* Create a KeyValue reading from the raw InputStream.
* Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}

View File

@ -156,6 +156,26 @@ public class Tag {
return tags;
}
/**
* Retrieve the first tag from the tags byte array matching the passed in tag type
* @param b
* @param offset
* @param length
* @param type
* @return null if there is no tag of the passed in tag type
*/
public static Tag getTag(byte[] b, int offset, int length, byte type) {
int pos = offset;
while (pos < offset + length) {
short tagLen = Bytes.toShort(b, pos);
if(b[pos + TAG_LENGTH_SIZE] == type) {
return new Tag(b, pos, (short) (tagLen + TAG_LENGTH_SIZE));
}
pos += TAG_LENGTH_SIZE + tagLen;
}
return null;
}
/**
* Returns the total length of the entire tag entity
*/

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class TagType {
// Please declare new Tag Types here to avoid step on pre-existing tag types.
public static final byte ACL_TAG_TYPE = (byte) 1;
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3;
}

View File

@ -856,6 +856,15 @@ MasterServices, Server {
// may also host user regions
}
Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
// need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
// instead of previouslyFailedMetaRSs alone to address the following two situations:
// 1) the chained failure situation(recovery failed multiple times in a row).
// 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
// same server still has non-meta wals to be replayed so that
// removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
// Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
// there is no op for the server.
previouslyFailedMetaRSs.addAll(previouslyFailedServers);
this.initializationBeforeMetaAssignment = true;
@ -866,26 +875,11 @@ MasterServices, Server {
// Make sure meta assigned before proceeding.
status.setStatus("Assigning Meta Region");
assignMeta(status);
assignMeta(status, previouslyFailedMetaRSs);
// check if master is shutting down because above assignMeta could return even hbase:meta isn't
// assigned when master is shutting down
if(this.stopped) return;
if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) {
// replay WAL edits mode need new hbase:meta RS is assigned firstly
status.setStatus("replaying log for Meta Region");
// need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
// instead of oldMetaServerLocation to address the following two situations:
// 1) the chained failure situation(recovery failed multiple times in a row).
// 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
// same server still has non-meta wals to be replayed so that
// removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
// Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
// there is no op for the server.
previouslyFailedMetaRSs.addAll(previouslyFailedServers);
this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
}
status.setStatus("Submitting log splitting work for previously failed region servers");
// Master has recovered hbase:meta region server and we put
// other failed region servers in a queue to be handled later by SSH
@ -982,17 +976,17 @@ MasterServices, Server {
/**
* Check <code>hbase:meta</code> is assigned. If not, assign it.
* @param status MonitoredTask
* @param previouslyFailedMetaRSs
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
*/
void assignMeta(MonitoredTask status)
void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs)
throws InterruptedException, IOException, KeeperException {
// Work on meta region
int assigned = 0;
long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
status.setStatus("Assigning hbase:meta region");
ServerName logReplayFailedMetaServer = null;
RegionStates regionStates = assignmentManager.getRegionStates();
regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
@ -1010,12 +1004,10 @@ MasterServices, Server {
LOG.info("Forcing expire of " + currentMetaServer);
serverManager.expireServer(currentMetaServer);
splitMetaLogBeforeAssignment(currentMetaServer);
if (this.distributedLogReplay) {
logReplayFailedMetaServer = currentMetaServer;
previouslyFailedMetaRSs.add(currentMetaServer);
}
}
// Make sure assignment manager knows where the meta is,
// so that meta sever shutdown handler kicks in.
// Make sure following meta assignment happens
assignmentManager.getRegionStates().clearLastAssignment(HRegionInfo.FIRST_META_REGIONINFO);
assignmentManager.assignMeta();
}
} else {
@ -1028,19 +1020,18 @@ MasterServices, Server {
enableMeta(TableName.META_TABLE_NAME);
if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) {
// replay WAL edits mode need new hbase:meta RS is assigned firstly
status.setStatus("replaying log for Meta Region");
this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
}
// Make sure a hbase:meta location is set. We need to enable SSH here since
// if the meta region server is died at this time, we need it to be re-assigned
// by SSH so that system tables can be assigned.
// No need to wait for meta is assigned = 0 when meta is just verified.
enableServerShutdownHandler(assigned != 0);
// logReplayFailedMetaServer is set only if log replay is
// enabled and the current meta server is expired
if (logReplayFailedMetaServer != null) {
// In Replay WAL Mode, we need the new hbase:meta server online
this.fileSystemManager.splitMetaLog(logReplayFailedMetaServer);
}
LOG.info("hbase:meta assigned=" + assigned + ", rit=" + rit +
", location=" + catalogTracker.getMetaLocation());
status.setStatus("META assigned.");

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -120,8 +121,7 @@ public class MasterFileSystem {
FSUtils.setFsDefault(conf, new Path(this.fs.getUri()));
// make sure the fs has the same conf
fs.setConf(conf);
this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
// setup the filesystem variable
// set up the archived logs path
this.oldLogDir = createInitialFileSystemLayout();

View File

@ -426,6 +426,10 @@ public class RegionStates {
* Log split is done for a given region, so it is assignable now.
*/
public synchronized void logSplit(final HRegionInfo region) {
clearLastAssignment(region);
}
public synchronized void clearLastAssignment(final HRegionInfo region) {
lastAssignments.remove(region.getEncodedName());
}

View File

@ -222,8 +222,7 @@ public class SplitLogManager extends ZooKeeperListener {
this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
this.unassignedTimeout =
conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
", distributedLogReplay=" + this.distributedLogReplay);

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@ -84,9 +85,7 @@ public class ServerShutdownHandler extends EventHandler {
LOG.warn(this.serverName + " is NOT in deadservers; it should be!");
}
this.shouldSplitHlog = shouldSplitHlog;
this.distributedLogReplay = server.getConfiguration().getBoolean(
HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(server.getConfiguration());
this.regionAssignmentWaitTimeout = server.getConfiguration().getInt(
HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000);
}

View File

@ -210,7 +210,7 @@ public class HRegion implements HeapSize { // , Writable{
*/
final AtomicBoolean closing = new AtomicBoolean(false);
protected long completeSequenceId = -1L;
protected volatile long completeSequenceId = -1L;
/**
* Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1,
@ -399,6 +399,8 @@ public class HRegion implements HeapSize { // , Writable{
private RegionServerAccounting rsAccounting;
private List<Pair<Long, Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
private long flushCheckInterval;
// flushPerChanges is to prevent too many changes in memstore
private long flushPerChanges;
private long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard closes
@ -493,6 +495,12 @@ public class HRegion implements HeapSize { // , Writable{
.addWritableMap(htd.getValues());
this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,
DEFAULT_CACHE_FLUSH_INTERVAL);
this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES);
if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) {
throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed "
+ MAX_FLUSH_PER_CHANGES);
}
this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration",
DEFAULT_ROWLOCK_WAIT_DURATION);
@ -651,6 +659,12 @@ public class HRegion implements HeapSize { // , Writable{
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1;
if (this.isRecovering) {
// In distributedLogReplay mode, we don't know the last change sequence number because region
// is opened before recovery completes. So we add a safety bumper to avoid new sequence number
// overlaps used sequence numbers
nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million
}
LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() +
"; next sequenceid=" + nextSeqid);
@ -658,6 +672,7 @@ public class HRegion implements HeapSize { // , Writable{
this.closing.set(false);
this.closed.set(false);
this.completeSequenceId = nextSeqid;
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
@ -965,6 +980,16 @@ public class HRegion implements HeapSize { // , Writable{
/** Default interval for the memstore flush */
public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;
/** Conf key to force a flush if there are already enough changes for one region in memstore */
public static final String MEMSTORE_FLUSH_PER_CHANGES =
"hbase.regionserver.flush.per.changes";
public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions
/**
* The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes
* overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region
*/
public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G
/**
* Close down this HRegion. Flush the cache unless abort parameter is true,
* Shut down each HStore, don't service any more calls.
@ -1465,6 +1490,9 @@ public class HRegion implements HeapSize { // , Writable{
* Should the memstore be flushed now
*/
boolean shouldFlush() {
if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) {
return true;
}
if (flushCheckInterval <= 0) { //disabled
return false;
}
@ -1675,9 +1703,7 @@ public class HRegion implements HeapSize { // , Writable{
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Update the last flushed sequence id for region
if (this.rsServices != null) {
completeSequenceId = flushSeqId;
}
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@ -5250,7 +5276,7 @@ public class HRegion implements HeapSize { // , Writable{
ClassSize.OBJECT +
ClassSize.ARRAY +
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(11 * Bytes.SIZEOF_LONG) +
(12 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
@ -623,13 +624,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
};
this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
this.rsInfo = RegionServerInfo.newBuilder();
// Put up the webui. Webui may come up on port other than configured if
// that port is occupied. Adjust serverInfo if this is the case.
this.rsInfo.setInfoPort(putUpWebUI());
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
}
/**
@ -3935,6 +3934,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost();
List<Pair<HLogKey, WALEdit>> walEntries = new ArrayList<Pair<HLogKey, WALEdit>>();
List<HLogSplitter.MutationReplay> mutations = new ArrayList<HLogSplitter.MutationReplay>();
// when tag is enabled, we need tag replay edits with log sequence number
boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3);
for (WALEntry entry : entries) {
if (nonceManager != null) {
long nonceGroup = entry.getKey().hasNonceGroup()
@ -3944,8 +3945,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa
}
Pair<HLogKey, WALEdit> walEntry = (coprocessorHost == null) ? null :
new Pair<HLogKey, WALEdit>();
List<HLogSplitter.MutationReplay> edits =
HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry);
List<HLogSplitter.MutationReplay> edits = HLogSplitter.getMutationsFromWALEntry(entry,
cells, walEntry, needAddReplayTag);
if (coprocessorHost != null) {
// Start coprocessor replay here. The coprocessor is for each WALEdit instead of a
// KeyValue.

View File

@ -174,9 +174,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
try {
LOG.info("SplitLogWorker " + this.serverName + " starting");
this.watcher.registerListener(this);
boolean distributedLogReplay = this.conf.getBoolean(
HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
if (distributedLogReplay) {
// initialize a new connection for splitlogworker configuration
HConnectionManager.getConnection(conf);

View File

@ -65,6 +65,9 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
@ -73,6 +76,7 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -178,8 +182,7 @@ public class HLogSplitter {
// a larger minBatchSize may slow down recovery because replay writer has to wait for
// enough edits before replaying them
this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (zkw != null && this.distributedLogReplay) {
@ -191,6 +194,7 @@ public class HLogSplitter {
this.distributedLogReplay = false;
outputSink = new LogRecoveredEditsOutputSink(numWriterThreads);
}
}
/**
@ -1483,6 +1487,7 @@ public class HLogSplitter {
if (!skippedKVs.isEmpty()) {
kvs.removeAll(skippedKVs);
}
synchronized (serverToBufferQueueMap) {
locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
List<Pair<HRegionLocation, HLog.Entry>> queue = serverToBufferQueueMap.get(locKey);
@ -1851,6 +1856,33 @@ public class HLogSplitter {
public final long nonce;
}
/**
* Tag original sequence number for each edit to be replayed
* @param entry
* @param cell
* @return
*/
private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) {
// Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet
boolean needAddRecoveryTag = true;
if (cell.getTagsLength() > 0) {
Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(),
TagType.LOG_REPLAY_TAG_TYPE);
if (tmpTag != null) {
// found an existing log replay tag so reuse it
needAddRecoveryTag = false;
}
}
if (needAddRecoveryTag) {
List<Tag> newTags = new ArrayList<Tag>();
Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey()
.getLogSequenceNumber()));
newTags.add(replayTag);
return KeyValue.cloneAndAddTags(cell, newTags);
}
return cell;
}
/**
* This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey &
* WALEdit from the passed in WALEntry
@ -1858,11 +1890,12 @@ public class HLogSplitter {
* @param cells
* @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances
* extracted from the passed in WALEntry.
* @param addLogReplayTag
* @return list of Pair<MutationType, Mutation> to be replayed
* @throws IOException
*/
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry,
CellScanner cells, Pair<HLogKey, WALEdit> logEntry) throws IOException {
public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
Pair<HLogKey, WALEdit> logEntry, boolean addLogReplayTag) throws IOException {
if (entry == null) {
// return an empty array
@ -1907,7 +1940,11 @@ public class HLogSplitter {
if (CellUtil.isDelete(cell)) {
((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
} else {
((Put) m).add(KeyValueUtil.ensureKeyValue(cell));
Cell tmpNewCell = cell;
if (addLogReplayTag) {
tmpNewCell = tagReplayLogSequenceNumber(entry, cell);
}
((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
}
previousCell = cell;
}
@ -1928,4 +1965,14 @@ public class HLogSplitter {
return mutations;
}
/**
* Returns if distributed log replay is turned on or not
* @param conf
* @return true when distributed log replay is turned on
*/
public static boolean isDistributedLogReplay(Configuration conf) {
return conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
}
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@ -103,7 +104,7 @@ public class AccessControlLists {
public static final String ACL_LIST_FAMILY_STR = "l";
public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR);
/** KV tag to store per cell access control lists */
public static final byte ACL_TAG_TYPE = (byte) 1;
public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE;
public static final char NAMESPACE_PREFIX = '@';

View File

@ -23,6 +23,7 @@ import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations;
@ -43,7 +44,7 @@ public class VisibilityUtils {
public static final String VISIBILITY_LABEL_GENERATOR_CLASS =
"hbase.regionserver.scan.visibility.label.generator.class";
public static final byte VISIBILITY_TAG_TYPE = (byte) 2;
public static final byte VISIBILITY_TAG_TYPE = TagType.VISIBILITY_TAG_TYPE;
public static final String SYSTEM_LABEL = "system";
/**

View File

@ -962,11 +962,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
hbaseAdmin = null;
}
if (zooKeeperWatcher != null) {
zooKeeperWatcher.close();
zooKeeperWatcher = null;
}
// unset the configuration for MIN and MAX RS to start
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1);
@ -976,6 +971,11 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
this.hbaseCluster.waitUntilShutDown();
this.hbaseCluster = null;
}
if (zooKeeperWatcher != null) {
zooKeeperWatcher.close();
zooKeeperWatcher = null;
}
}
/**

View File

@ -70,17 +70,20 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.NonceGenerator;
import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@ -1159,6 +1162,198 @@ public class TestDistributedLogSplitting {
zkw.close();
}
@Test(timeout = 300000)
public void testSameVersionUpdatesRecovery() throws Exception {
LOG.info("testSameVersionUpdatesRecovery");
conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40;
final int NUM_LOG_LINES = 1000;
// turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits
master.balanceSwitch(false);
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
List<HRegionInfo> regions = null;
HRegionServer hrs = null;
for (int i = 0; i < NUM_RS; i++) {
boolean isCarryingMeta = false;
hrs = rsts.get(i).getRegionServer();
regions = ProtobufUtil.getOnlineRegions(hrs);
for (HRegionInfo region : regions) {
if (region.isMetaRegion()) {
isCarryingMeta = true;
break;
}
}
if (isCarryingMeta) {
continue;
}
break;
}
LOG.info("#regions = " + regions.size());
Iterator<HRegionInfo> it = regions.iterator();
while (it.hasNext()) {
HRegionInfo region = it.next();
if (region.isMetaTable()
|| region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
it.remove();
}
}
if (regions.size() == 0) return;
HRegionInfo curRegionInfo = regions.get(0);
byte[] startRow = curRegionInfo.getStartKey();
if (startRow == null || startRow.length == 0) {
startRow = new byte[] { 0, 0, 0, 0, 1 };
}
byte[] row = Bytes.incrementBytes(startRow, 1);
// use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
row = Arrays.copyOfRange(row, 3, 8);
long value = 0;
byte[] tableName = Bytes.toBytes("table");
byte[] family = Bytes.toBytes("family");
byte[] qualifier = Bytes.toBytes("c1");
long timeStamp = System.currentTimeMillis();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
htd.addFamily(new HColumnDescriptor(family));
for (int i = 0; i < NUM_LOG_LINES; i += 1) {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
System.currentTimeMillis(), htd, sequenceId);
}
hrs.getWAL().sync();
hrs.getWAL().close();
// wait for abort completes
this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
// verify we got the last value
LOG.info("Verification Starts...");
Get g = new Get(row);
Result r = ht.get(g);
long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
assertEquals(value, theStoredVal);
// after flush
LOG.info("Verification after flush...");
TEST_UTIL.getHBaseAdmin().flush(tableName);
r = ht.get(g);
theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
assertEquals(value, theStoredVal);
ht.close();
}
@Test(timeout = 300000)
public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception {
LOG.info("testSameVersionUpdatesRecoveryWithWrites");
conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024);
conf.setInt("hbase.hstore.compactionThreshold", 3);
conf.setInt("hfile.format.version", 3);
startCluster(NUM_RS);
final AtomicLong sequenceId = new AtomicLong(100);
final int NUM_REGIONS_TO_CREATE = 40;
final int NUM_LOG_LINES = 1000;
// turn off load balancing to prevent regions from moving around otherwise
// they will consume recovered.edits
master.balanceSwitch(false);
List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null);
HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE);
List<HRegionInfo> regions = null;
HRegionServer hrs = null;
for (int i = 0; i < NUM_RS; i++) {
boolean isCarryingMeta = false;
hrs = rsts.get(i).getRegionServer();
regions = ProtobufUtil.getOnlineRegions(hrs);
for (HRegionInfo region : regions) {
if (region.isMetaRegion()) {
isCarryingMeta = true;
break;
}
}
if (isCarryingMeta) {
continue;
}
break;
}
LOG.info("#regions = " + regions.size());
Iterator<HRegionInfo> it = regions.iterator();
while (it.hasNext()) {
HRegionInfo region = it.next();
if (region.isMetaTable()
|| region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) {
it.remove();
}
}
if (regions.size() == 0) return;
HRegionInfo curRegionInfo = regions.get(0);
byte[] startRow = curRegionInfo.getStartKey();
if (startRow == null || startRow.length == 0) {
startRow = new byte[] { 0, 0, 0, 0, 1 };
}
byte[] row = Bytes.incrementBytes(startRow, 1);
// use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key
row = Arrays.copyOfRange(row, 3, 8);
long value = 0;
final byte[] tableName = Bytes.toBytes("table");
byte[] family = Bytes.toBytes("family");
byte[] qualifier = Bytes.toBytes("c1");
long timeStamp = System.currentTimeMillis();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
htd.addFamily(new HColumnDescriptor(family));
for (int i = 0; i < NUM_LOG_LINES; i += 1) {
WALEdit e = new WALEdit();
value++;
e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value)));
hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e,
System.currentTimeMillis(), htd, sequenceId);
}
hrs.getWAL().sync();
hrs.getWAL().close();
// wait for abort completes
this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE);
// verify we got the last value
LOG.info("Verification Starts...");
Get g = new Get(row);
Result r = ht.get(g);
long theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
assertEquals(value, theStoredVal);
// after flush & compaction
LOG.info("Verification after flush...");
TEST_UTIL.getHBaseAdmin().flush(tableName);
TEST_UTIL.getHBaseAdmin().compact(tableName);
// wait for compaction completes
TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE);
}
});
r = ht.get(g);
theStoredVal = Bytes.toLong(r.getValue(family, qualifier));
assertEquals(value, theStoredVal);
ht.close();
}
HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception {
return installTable(zkw, tname, fname, nrs, 0);
}

View File

@ -27,6 +27,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@ -363,7 +364,7 @@ public class TestMasterNoCluster {
HMaster master = new HMaster(conf) {
@Override
void assignMeta(MonitoredTask status) {
void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMeatRSs) {
}
@Override