HBASE-23326 Implement a ProcedureStore which stores procedures in a HRegion (#941)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Duo Zhang 2019-12-25 12:02:12 +08:00
parent 976563e094
commit 5cae75e124
45 changed files with 2277 additions and 880 deletions

View File

@ -1182,12 +1182,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
}
private byte[] toEncodeRegionName(byte[] regionName) {
try {
return RegionInfo.isEncodedRegionName(regionName) ? regionName
: Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
} catch (IOException e) {
return regionName;
}
return RegionInfo.isEncodedRegionName(regionName) ? regionName :
Bytes.toBytes(RegionInfo.encodeRegionName(regionName));
}
private void checkAndGetTableName(byte[] encodeRegionName, AtomicReference<TableName> tableName,

View File

@ -351,7 +351,7 @@ public interface RegionInfo {
* @return True if <code>regionName</code> represents an encoded name.
*/
@InterfaceAudience.Private // For use by internals only.
public static boolean isEncodedRegionName(byte[] regionName) throws IOException {
public static boolean isEncodedRegionName(byte[] regionName) {
// If not parseable as region name, presume encoded. TODO: add stringency; e.g. if hex.
return parseRegionNameOrReturnNull(regionName) == null && regionName.length <= MD5_HEX_LENGTH;
}

View File

@ -134,5 +134,8 @@ class CompletedProcedureCleaner<TEnvironment> extends ProcedureInMemoryChore<TEn
if (batchCount > 0) {
store.delete(batchIds, 0, batchCount);
}
// let the store do some cleanup works, i.e, delete the place marker for preserving the max
// procedure id.
store.cleanup();
}
}

View File

@ -370,4 +370,17 @@ public final class ProcedureUtil {
.setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
return new RetryCounter(retryConfig);
}
public static boolean isFinished(ProcedureProtos.Procedure proc) {
if (!proc.hasParentId()) {
switch (proc.getState()) {
case ROLLEDBACK:
case SUCCESS:
return true;
default:
break;
}
}
return false;
}
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A procedure iterator which holds all the procedure protos in memory. For fast access.
*/
@InterfaceAudience.Private
public class InMemoryProcedureIterator implements ProcedureIterator {
private final List<ProtoAndProcedure> procs;
private Iterator<ProtoAndProcedure> iter;
private ProtoAndProcedure current;
public InMemoryProcedureIterator(List<ProtoAndProcedure> procs) {
this.procs = procs;
reset();
}
@Override
public void reset() {
iter = procs.iterator();
if (iter.hasNext()) {
current = iter.next();
} else {
current = null;
}
}
@Override
public boolean hasNext() {
return current != null;
}
private void checkNext() {
if (!hasNext()) {
throw new NoSuchElementException();
}
}
@Override
public boolean isNextFinished() {
checkNext();
return ProcedureUtil.isFinished(current.getProto());
}
private void moveToNext() {
if (iter.hasNext()) {
current = iter.next();
} else {
current = null;
}
}
@Override
public void skipNext() {
checkNext();
moveToNext();
}
@Override
public Procedure<?> next() throws IOException {
checkNext();
Procedure<?> proc = current.getProcedure();
moveToNext();
return proc;
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface LeaseRecovery {
void recoverFileLease(FileSystem fs, Path path) throws IOException;
}

View File

@ -24,9 +24,18 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* The ProcedureStore is used by the executor to persist the state of each procedure execution.
* This allows to resume the execution of pending/in-progress procedures in case
* of machine failure or service shutdown.
* The ProcedureStore is used by the executor to persist the state of each procedure execution. This
* allows to resume the execution of pending/in-progress procedures in case of machine failure or
* service shutdown.
* <p/>
* Notice that, the implementation must guarantee that the maxProcId when loading is the maximum one
* in the whole history, not only the current live procedures. This is very important as for
* executing remote procedures, we have some nonce checks at region server side to prevent executing
* non-idempotent operations more than once. If the procedure id could go back, then we may
* accidentally ignore some important operations such as region assign or unassign.<br/>
* This may lead to some garbages so we provide a {@link #cleanup()} method, the framework will call
* this method periodically and the store implementation could do some clean up works in this
* method.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -240,4 +249,13 @@ public interface ProcedureStore {
* @param count the number of IDs to delete
*/
void delete(long[] procIds, int offset, int count);
/**
* Will be called by the framework to give the store a chance to do some clean up works.
* <p/>
* Notice that this is for periodical clean up work, not for the clean up after close, if you want
* to close the store just call the {@link #stop(boolean)} method above.
*/
default void cleanup() {
}
}

View File

@ -15,18 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
@ -50,9 +47,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* we will also consider them as corrupted. Please see the code in {@link #checkOrphan(Map)} method.
*/
@InterfaceAudience.Private
public final class WALProcedureTree {
public final class ProcedureTree {
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureTree.class);
private static final Logger LOG = LoggerFactory.getLogger(ProcedureTree.class);
private static final class Entry {
@ -78,50 +75,18 @@ public final class WALProcedureTree {
}
}
// when loading we will iterator the procedures twice, so use this class to cache the deserialized
// result to prevent deserializing multiple times.
private static final class ProtoAndProc {
private final ProcedureProtos.Procedure proto;
private final List<ProtoAndProcedure> validProcs = new ArrayList<>();
private Procedure<?> proc;
private final List<ProtoAndProcedure> corruptedProcs = new ArrayList<>();
public ProtoAndProc(ProcedureProtos.Procedure proto) {
this.proto = proto;
}
public Procedure<?> getProc() throws IOException {
if (proc == null) {
proc = ProcedureUtil.convertToProcedure(proto);
}
return proc;
}
}
private final List<ProtoAndProc> validProcs = new ArrayList<>();
private final List<ProtoAndProc> corruptedProcs = new ArrayList<>();
private static boolean isFinished(ProcedureProtos.Procedure proc) {
if (!proc.hasParentId()) {
switch (proc.getState()) {
case ROLLEDBACK:
case SUCCESS:
return true;
default:
break;
}
}
return false;
}
private WALProcedureTree(Map<Long, Entry> procMap) {
private ProcedureTree(Map<Long, Entry> procMap) {
List<Entry> rootEntries = buildTree(procMap);
for (Entry rootEntry : rootEntries) {
checkReady(rootEntry, procMap);
}
checkOrphan(procMap);
Comparator<ProtoAndProc> cmp =
(p1, p2) -> Long.compare(p1.proto.getProcId(), p2.proto.getProcId());
Comparator<ProtoAndProcedure> cmp =
(p1, p2) -> Long.compare(p1.getProto().getProcId(), p2.getProto().getProcId());
Collections.sort(validProcs, cmp);
Collections.sort(corruptedProcs, cmp);
}
@ -160,7 +125,7 @@ public final class WALProcedureTree {
private void addAllToCorruptedAndRemoveFromProcMap(Entry entry,
Map<Long, Entry> remainingProcMap) {
corruptedProcs.add(new ProtoAndProc(entry.proc));
corruptedProcs.add(new ProtoAndProcedure(entry.proc));
remainingProcMap.remove(entry.proc.getProcId());
for (Entry e : entry.subProcs) {
addAllToCorruptedAndRemoveFromProcMap(e, remainingProcMap);
@ -168,7 +133,7 @@ public final class WALProcedureTree {
}
private void addAllToValidAndRemoveFromProcMap(Entry entry, Map<Long, Entry> remainingProcMap) {
validProcs.add(new ProtoAndProc(entry.proc));
validProcs.add(new ProtoAndProcedure(entry.proc));
remainingProcMap.remove(entry.proc.getProcId());
for (Entry e : entry.subProcs) {
addAllToValidAndRemoveFromProcMap(e, remainingProcMap);
@ -180,7 +145,7 @@ public final class WALProcedureTree {
// remainingProcMap, so at last, if there are still procedures in the map, we know that there are
// orphan procedures.
private void checkReady(Entry rootEntry, Map<Long, Entry> remainingProcMap) {
if (isFinished(rootEntry.proc)) {
if (ProcedureUtil.isFinished(rootEntry.proc)) {
if (!rootEntry.subProcs.isEmpty()) {
LOG.error("unexpected active children for root-procedure: {}", rootEntry);
rootEntry.subProcs.forEach(e -> LOG.error("unexpected active children: {}", e));
@ -217,86 +182,23 @@ public final class WALProcedureTree {
private void checkOrphan(Map<Long, Entry> procMap) {
procMap.values().forEach(entry -> {
LOG.error("Orphan procedure: {}", entry);
corruptedProcs.add(new ProtoAndProc(entry.proc));
corruptedProcs.add(new ProtoAndProcedure(entry.proc));
});
}
private static final class Iter implements ProcedureIterator {
private final List<ProtoAndProc> procs;
private Iterator<ProtoAndProc> iter;
private ProtoAndProc current;
public Iter(List<ProtoAndProc> procs) {
this.procs = procs;
reset();
}
@Override
public void reset() {
iter = procs.iterator();
if (iter.hasNext()) {
current = iter.next();
} else {
current = null;
}
}
@Override
public boolean hasNext() {
return current != null;
}
private void checkNext() {
if (!hasNext()) {
throw new NoSuchElementException();
}
}
@Override
public boolean isNextFinished() {
checkNext();
return isFinished(current.proto);
}
private void moveToNext() {
if (iter.hasNext()) {
current = iter.next();
} else {
current = null;
}
}
@Override
public void skipNext() {
checkNext();
moveToNext();
}
@Override
public Procedure<?> next() throws IOException {
checkNext();
Procedure<?> proc = current.getProc();
moveToNext();
return proc;
}
}
public ProcedureIterator getValidProcs() {
return new Iter(validProcs);
return new InMemoryProcedureIterator(validProcs);
}
public ProcedureIterator getCorruptedProcs() {
return new Iter(corruptedProcs);
return new InMemoryProcedureIterator(corruptedProcs);
}
public static WALProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
public static ProcedureTree build(Collection<ProcedureProtos.Procedure> procedures) {
Map<Long, Entry> procMap = new HashMap<>();
for (ProcedureProtos.Procedure proc : procedures) {
procMap.put(proc.getProcId(), new Entry(proc));
}
return new WALProcedureTree(procMap);
return new ProcedureTree(procMap);
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
import java.io.IOException;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* when loading we will iterator the procedures twice, so use this class to cache the deserialized
* result to prevent deserializing multiple times.
*/
@InterfaceAudience.Private
public class ProtoAndProcedure {
private final ProcedureProtos.Procedure proto;
private Procedure<?> proc;
public ProtoAndProcedure(ProcedureProtos.Procedure proto) {
this.proto = proto;
}
public Procedure<?> getProcedure() throws IOException {
if (proc == null) {
proc = ProcedureUtil.convertToProcedure(proto);
}
return proc;
}
public ProcedureProtos.Procedure getProto() {
return proto;
}
}

View File

@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureStoreTracker.DeleteState;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@ -51,7 +51,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the
* partial one, the initial modified value is 0 and the initial deleted value is also 0. In
* {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified.
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
class BitSetNode {
private static final long WORD_MASK = 0xffffffffffffffffL;

View File

@ -22,7 +22,10 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Thrown when a procedure WAL is corrupted
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
public class CorruptedWALProcedureStoreException extends HBaseIOException {

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import java.util.Arrays;
@ -36,9 +36,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
*
* It can be used by the ProcedureStore to identify which procedures are already
* deleted/completed to avoid the deserialization step on restart
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
public class ProcedureStoreTracker {
class ProcedureStoreTracker {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
// Key is procedure id corresponding to first bit of the bitmap.

View File

@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,9 +32,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* Describes a WAL File
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
class ProcedureWALFile implements Comparable<ProcedureWALFile> {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFile.class);
private ProcedureWALHeader header;

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.yetus.audience.InterfaceAudience;
@ -40,9 +39,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* Helper class that contains the WAL serialization utils.
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
public final class ProcedureWALFormat {
final class ProcedureWALFormat {
static final byte LOG_TYPE_STREAM = 0;
static final byte LOG_TYPE_COMPACTED = 1;

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.procedure2.store.wal;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -31,9 +31,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* Helper class that loads the procedures stored in a WAL.
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
public class ProcedureWALFormatReader {
class ProcedureWALFormatReader {
private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
/**
@ -44,8 +47,8 @@ public class ProcedureWALFormatReader {
* See the comments of {@link WALProcedureMap} for more details.
* <p/>
* After reading all the proc wal files, we will use the procedures in the procedureMap to build a
* {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of
* {@link WALProcedureTree} and the code in {@link #finish()} for more details.
* {@link ProcedureTree}, and then give the result to the upper layer. See the comments of
* {@link ProcedureTree} and the code in {@link #finish()} for more details.
*/
private final WALProcedureMap localProcedureMap = new WALProcedureMap();
private final WALProcedureMap procedureMap = new WALProcedureMap();
@ -144,7 +147,7 @@ public class ProcedureWALFormatReader {
// build the procedure execution tree. When building we will verify that whether a procedure is
// valid.
WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures());
ProcedureTree tree = ProcedureTree.build(procedureMap.getProcedures());
loader.load(tree.getValidProcs());
loader.handleCorrupted(tree.getCorruptedProcs());
}

View File

@ -48,7 +48,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
/**
* ProcedureWALPrettyPrinter prints the contents of a given ProcedureWAL file
* @see WALProcedureStore#main(String[]) if you want to check parse of a directory of WALs.
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@InterfaceStability.Evolving
public class ProcedureWALPrettyPrinter extends Configured implements Tool {

View File

@ -46,7 +46,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
* {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this
* method, for the same procedure, the one comes earlier will win, as we read the proc wal files
* from new to old(the reverse order).
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
class WALProcedureMap {

View File

@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@ -106,7 +106,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
* deleted.
* @see ProcedureWALPrettyPrinter for printing content of a single WAL.
* @see #main(String[]) to parse a directory of MasterWALProcs.
* @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
* use the new region based procedure store.
*/
@Deprecated
@InterfaceAudience.Private
public class WALProcedureStore extends ProcedureStoreBase {
private static final Logger LOG = LoggerFactory.getLogger(WALProcedureStore.class);
@ -115,10 +118,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
public interface LeaseRecovery {
void recoverFileLease(FileSystem fs, Path path) throws IOException;
}
public static final String WAL_COUNT_WARN_THRESHOLD_CONF_KEY =
"hbase.procedure.store.wal.warn.threshold";
private static final int DEFAULT_WAL_COUNT_WARN_THRESHOLD = 10;
@ -233,10 +232,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
}
}
public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
throws IOException {
this(conf,
new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
public WALProcedureStore(Configuration conf, LeaseRecovery leaseRecovery) throws IOException {
this(conf, new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
new Path(CommonFSUtils.getWALRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME),
leaseRecovery);
}
@ -1411,7 +1408,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
System.exit(-1);
}
WALProcedureStore store = new WALProcedureStore(conf, new Path(args[0]), null,
new WALProcedureStore.LeaseRecovery() {
new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
@ -58,7 +59,7 @@ public final class ProcedureTestingUtility {
public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
throws IOException {
return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
return new WALProcedureStore(conf, dir, null, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op
@ -66,13 +67,13 @@ public final class ProcedureTestingUtility {
});
}
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
boolean abort, boolean startWorkers) throws Exception {
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort,
boolean startWorkers) throws Exception {
restart(procExecutor, false, true, null, null, null, abort, startWorkers);
}
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
boolean abort) throws Exception {
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, boolean abort)
throws Exception {
restart(procExecutor, false, true, null, null, null, abort, true);
}
@ -145,7 +146,12 @@ public final class ProcedureTestingUtility {
public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
throws Exception {
procStore.stop(false);
storeRestart(procStore, false, loader);
}
public static void storeRestart(ProcedureStore procStore, boolean abort,
ProcedureStore.ProcedureLoader loader) throws Exception {
procStore.stop(abort);
procStore.start(procStore.getNumThreads());
procStore.recoverLease();
procStore.load(loader);
@ -216,16 +222,16 @@ public final class ProcedureTestingUtility {
assertSingleExecutorForKillTests(procExecutor);
}
private static <TEnv> void assertSingleExecutorForKillTests(
final ProcedureExecutor<TEnv> procExecutor) {
private static <TEnv> void
assertSingleExecutorForKillTests(final ProcedureExecutor<TEnv> procExecutor) {
if (procExecutor.testing == null) {
return;
}
if (procExecutor.testing.killBeforeStoreUpdate ||
procExecutor.testing.toggleKillBeforeStoreUpdate) {
assertEquals("expected only one executor running during test with kill/restart",
1, procExecutor.getCorePoolSize());
assertEquals("expected only one executor running during test with kill/restart", 1,
procExecutor.getCorePoolSize());
}
}
@ -306,8 +312,7 @@ public final class ProcedureTestingUtility {
assertEquals(null, procExecutor.getResult(procId));
}
public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor,
long procId) {
public static <TEnv> void assertProcNotFailed(ProcedureExecutor<TEnv> procExecutor, long procId) {
Procedure<?> result = procExecutor.getResult(procId);
assertTrue("expected procedure result", result != null);
assertProcNotFailed(result);
@ -332,7 +337,8 @@ public final class ProcedureTestingUtility {
public static void assertIsAbortException(final Procedure<?> result) {
Throwable cause = assertProcFailed(result);
assertTrue("expected abort exception, got "+ cause, cause instanceof ProcedureAbortedException);
assertTrue("expected abort exception, got " + cause,
cause instanceof ProcedureAbortedException);
}
public static void assertIsTimeoutException(final Procedure<?> result) {
@ -353,17 +359,17 @@ public final class ProcedureTestingUtility {
}
/**
* Run through all procedure flow states TWICE while also restarting
* procedure executor at each step; i.e force a reread of procedure store.
*
*<p>It does
* <ol><li>Execute step N - kill the executor before store update
* Run through all procedure flow states TWICE while also restarting procedure executor at each
* step; i.e force a reread of procedure store.
* <p>
* It does
* <ol>
* <li>Execute step N - kill the executor before store update
* <li>Restart executor/store
* <li>Execute step N - and then save to store
* </ol>
*
*<p>This is a good test for finding state that needs persisting and steps that are not
* idempotent.
* <p>
* This is a good test for finding state that needs persisting and steps that are not idempotent.
*/
public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
final long procId) throws Exception {
@ -376,8 +382,7 @@ public final class ProcedureTestingUtility {
}
public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
final long procId, final boolean expectFailure, final Runnable customRestart)
throws Exception {
final long procId, final boolean expectFailure, final Runnable customRestart) throws Exception {
Procedure proc = procExec.getProcedure(procId);
waitProcedure(procExec, procId);
assertEquals(false, procExec.isRunning());
@ -401,7 +406,8 @@ public final class ProcedureTestingUtility {
}
public static class NoopProcedure<TEnv> extends Procedure<TEnv> {
public NoopProcedure() {}
public NoopProcedure() {
}
@Override
protected Procedure<TEnv>[] execute(TEnv env)
@ -419,13 +425,11 @@ public final class ProcedureTestingUtility {
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
}
}
@ -472,7 +476,8 @@ public final class ProcedureTestingUtility {
public static class TestProcedure extends NoopProcedure<Void> {
private byte[] data = null;
public TestProcedure() {}
public TestProcedure() {
}
public TestProcedure(long procId) {
this(procId, 0);
@ -510,16 +515,14 @@ public final class ProcedureTestingUtility {
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
ByteString dataString = ByteString.copyFrom((data == null) ? new byte[0] : data);
BytesValue.Builder builder = BytesValue.newBuilder().setValue(dataString);
serializer.serialize(builder.build());
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
BytesValue bytesValue = serializer.deserialize(BytesValue.class);
ByteString dataString = bytesValue.getValue();
@ -603,7 +606,7 @@ public final class ProcedureTestingUtility {
}
public boolean isRunnable(final long procId) {
for (Procedure proc: runnable) {
for (Procedure proc : runnable) {
if (proc.getProcId() == procId) {
return true;
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.wal;
package org.apache.hadoop.hbase.procedure2.store;
import static org.junit.Assert.assertEquals;
@ -41,11 +41,11 @@ import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@Category({ MasterTests.class, SmallTests.class })
public class TestWALProcedureTree {
public class TestProcedureTree {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALProcedureTree.class);
HBaseClassTestRule.forClass(TestProcedureTree.class);
public static final class TestProcedure extends Procedure<Void> {
@ -123,7 +123,7 @@ public class TestWALProcedureTree {
proc1.addStackIndex(1);
TestProcedure proc2 = createProc(3, 2);
proc2.addStackIndex(3);
WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2));
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
assertEquals(0, validProcs.size());
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
@ -141,7 +141,7 @@ public class TestWALProcedureTree {
proc1.addStackIndex(1);
TestProcedure proc2 = createProc(3, 2);
proc2.addStackIndex(1);
WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2));
ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2));
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
assertEquals(0, validProcs.size());
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());
@ -161,7 +161,7 @@ public class TestWALProcedureTree {
proc2.addStackIndex(0);
TestProcedure proc3 = createProc(5, 4);
proc3.addStackIndex(1);
WALProcedureTree tree = WALProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
ProcedureTree tree = ProcedureTree.build(toProtos(proc0, proc1, proc2, proc3));
List<TestProcedure> validProcs = getProcs(tree.getValidProcs());
assertEquals(3, validProcs.size());
List<TestProcedure> corruptedProcs = getProcs(tree.getCorruptedProcs());

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
@ -246,7 +247,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
private static class NoSyncWalProcedureStore extends WALProcedureStore {
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
super(conf, logDir, null, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
package org.apache.hadoop.hbase.procedure2.store.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -23,7 +23,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureStoreTracker.DeleteState;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store;
package org.apache.hadoop.hbase.procedure2.store.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.SequentialProcedure;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.io.IOUtils;
@ -627,7 +627,7 @@ public class TestWALProcedureStore {
// simulate another active master removing the wals
procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
new WALProcedureStore.LeaseRecovery() {
new LeaseRecovery() {
private int count = 0;
@Override
@ -795,7 +795,7 @@ public class TestWALProcedureStore {
}
private WALProcedureStore createWALProcedureStore(Configuration conf) throws IOException {
return new WALProcedureStore(conf, new WALProcedureStore.LeaseRecovery() {
return new WALProcedureStore(conf, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
// no-op

View File

@ -161,8 +161,9 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
@ -331,6 +332,10 @@ public class HMaster extends HRegionServer implements MasterServices {
"hbase.master.wait.on.service.seconds";
public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval";
public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
// Metrics for the HMaster
final MetricsMaster metricsMaster;
// file system manager for the master FS operations
@ -425,7 +430,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private SnapshotQuotaObserverChore snapshotQuotaChore;
private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
private WALProcedureStore procedureStore;
private ProcedureStore procedureStore;
// handle table states
private TableStateManager tableStateManager;
@ -909,10 +914,8 @@ public class HMaster extends HRegionServer implements MasterServices {
this.masterActiveTime = System.currentTimeMillis();
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
// Only initialize the MemStoreLAB when master carry table
if (LoadBalancer.isTablesOnMaster(conf)) {
// always initialize the MemStoreLAB as we use a region to store procedure now.
initializeMemStoreChunkCreator();
}
this.fileSystemManager = new MasterFileSystem(conf);
this.walManager = new MasterWalManager(this);
@ -1433,7 +1436,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Start log cleaner thread
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
int cleanerInterval =
conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
getChoreService().scheduleChore(logCleaner);
@ -1537,7 +1541,7 @@ public class HMaster extends HRegionServer implements MasterServices {
private void createProcedureExecutor() throws IOException {
MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
procedureStore =
new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
new RegionProcedureStore(this, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
procedureStore.registerListener(new ProcedureStoreListener() {
@Override
@ -2707,10 +2711,10 @@ public class HMaster extends HRegionServer implements MasterServices {
}
public int getNumWALFiles() {
return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
return 0;
}
public WALProcedureStore getWalProcedureStore() {
public ProcedureStore getProcedureStore() {
return procedureStore;
}

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.CancelableProgressable;
@ -47,10 +47,10 @@ public class MasterProcedureEnv implements ConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureEnv.class);
@InterfaceAudience.Private
public static class WALStoreLeaseRecovery implements WALProcedureStore.LeaseRecovery {
public static class FsUtilsLeaseRecovery implements LeaseRecovery {
private final MasterServices master;
public WALStoreLeaseRecovery(final MasterServices master) {
public FsUtilsLeaseRecovery(final MasterServices master) {
this.master = master;
}

View File

@ -146,8 +146,14 @@ public final class MasterProcedureUtil {
/**
* Pattern used to validate a Procedure WAL file name see
* {@link #validateProcedureWALFilename(String)} for description.
* @deprecated Since 2.3.0, will be removed in 4.0.0. We do not use this style of procedure wal
* file name any more.
*/
private static final Pattern pattern = Pattern.compile(".*pv2-\\d{20}.log");
@Deprecated
private static final Pattern PATTERN = Pattern.compile(".*pv2-\\d{20}.log");
// Use the character $ to let the log cleaner know that this is not the normal wal file.
public static final String ARCHIVED_PROC_WAL_SUFFIX = "$masterproc$";
/**
* A Procedure WAL file name is of the format: pv-&lt;wal-id&gt;.log where wal-id is 20 digits.
@ -155,7 +161,7 @@ public final class MasterProcedureUtil {
* @return <tt>true</tt> if the filename matches a Procedure WAL, <tt>false</tt> otherwise
*/
public static boolean validateProcedureWALFilename(String filename) {
return pattern.matcher(filename).matches();
return PATTERN.matcher(filename).matches() || filename.endsWith(ARCHIVED_PROC_WAL_SUFFIX);
}
/**

View File

@ -0,0 +1,239 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* As long as there is no RegionServerServices for the procedure store region, we need implement the
* flush and compaction logic by our own.
* <p/>
* The flush logic is very simple, every time after calling a modification method in
* {@link RegionProcedureStore}, we will call the {@link #onUpdate()} method below, and in this
* method, we will check the memstore size and if it is above the flush size, we will call
* {@link HRegion#flush(boolean)} to force flush all stores.
* <p/>
* And for compaction, the logic is also very simple. After flush, we will check the store file
* count, if it is above the compactMin, we will do a major compaction.
*/
@InterfaceAudience.Private
class RegionFlusherAndCompactor implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RegionFlusherAndCompactor.class);
static final String FLUSH_SIZE_KEY = "hbase.procedure.store.region.flush.size";
private static final long DEFAULT_FLUSH_SIZE = 16L * 1024 * 1024;
static final String FLUSH_PER_CHANGES_KEY = "hbase.procedure.store.region.flush.per.changes";
private static final long DEFAULT_FLUSH_PER_CHANGES = 1_000_000;
static final String FLUSH_INTERVAL_MS_KEY = "hbase.procedure.store.region.flush.interval.ms";
// default to flush every 15 minutes, for safety
private static final long DEFAULT_FLUSH_INTERVAL_MS = TimeUnit.MINUTES.toMillis(15);
static final String COMPACT_MIN_KEY = "hbase.procedure.store.region.compact.min";
private static final int DEFAULT_COMPACT_MIN = 4;
private final Abortable abortable;
private final HRegion region;
// as we can only count this outside the region's write/flush process so it is not accurate, but
// it is enough.
private final AtomicLong changesAfterLastFlush = new AtomicLong(0);
private final long flushSize;
private final long flushPerChanges;
private final long flushIntervalMs;
private final int compactMin;
private final Thread flushThread;
private final Lock flushLock = new ReentrantLock();
private final Condition flushCond = flushLock.newCondition();
private boolean flushRequest = false;
private long lastFlushTime;
private final ExecutorService compactExecutor;
private final Lock compactLock = new ReentrantLock();
private boolean compactRequest = false;
private volatile boolean closed = false;
RegionFlusherAndCompactor(Configuration conf, Abortable abortable, HRegion region) {
this.abortable = abortable;
this.region = region;
flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
compactMin = conf.getInt(COMPACT_MIN_KEY, DEFAULT_COMPACT_MIN);
flushThread = new Thread(this::flushLoop, "Procedure-Region-Store-Flusher");
flushThread.setDaemon(true);
flushThread.start();
compactExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("Procedure-Region-Store-Compactor").setDaemon(true).build());
}
// inject our flush related configurations
static void setupConf(Configuration conf) {
long flushSize = conf.getLong(FLUSH_SIZE_KEY, DEFAULT_FLUSH_SIZE);
conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSize);
long flushPerChanges = conf.getLong(FLUSH_PER_CHANGES_KEY, DEFAULT_FLUSH_PER_CHANGES);
conf.setLong(HRegion.MEMSTORE_FLUSH_PER_CHANGES, flushPerChanges);
long flushIntervalMs = conf.getLong(FLUSH_INTERVAL_MS_KEY, DEFAULT_FLUSH_INTERVAL_MS);
conf.setLong(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, flushIntervalMs);
}
private void compact() {
try {
region.compact(true);
} catch (IOException e) {
LOG.error("Failed to compact procedure store region", e);
}
compactLock.lock();
try {
if (needCompaction()) {
compactExecutor.execute(this::compact);
} else {
compactRequest = false;
}
} finally {
compactLock.unlock();
}
}
private boolean needCompaction() {
return Iterables.getOnlyElement(region.getStores()).getStorefilesCount() >= compactMin;
}
private void flushLoop() {
lastFlushTime = EnvironmentEdgeManager.currentTime();
while (!closed) {
flushLock.lock();
try {
while (!flushRequest) {
long waitTimeMs = lastFlushTime + flushIntervalMs - EnvironmentEdgeManager.currentTime();
if (waitTimeMs <= 0) {
flushRequest = true;
break;
}
flushCond.await(waitTimeMs, TimeUnit.MILLISECONDS);
if (closed) {
return;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
continue;
} finally {
flushLock.unlock();
}
assert flushRequest;
changesAfterLastFlush.set(0);
try {
region.flush(true);
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to flush procedure store region, aborting...", e);
abortable.abort("Failed to flush procedure store region", e);
return;
}
compactLock.lock();
try {
if (!compactRequest && needCompaction()) {
compactRequest = true;
compactExecutor.execute(this::compact);
}
} finally {
compactLock.unlock();
}
flushLock.lock();
try {
// reset the flushRequest flag
if (!shouldFlush(changesAfterLastFlush.get())) {
flushRequest = false;
}
} finally {
flushLock.unlock();
}
}
}
private boolean shouldFlush(long changes) {
return region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize() >= flushSize ||
changes > flushPerChanges;
}
void onUpdate() {
long changes = changesAfterLastFlush.incrementAndGet();
if (shouldFlush(changes)) {
requestFlush();
}
}
void requestFlush() {
flushLock.lock();
try {
if (flushRequest) {
return;
}
flushRequest = true;
flushCond.signalAll();
} finally {
flushLock.unlock();
}
}
@Override
public void close() {
closed = true;
flushThread.interrupt();
compactExecutor.shutdown();
}
}

View File

@ -0,0 +1,584 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* A procedure store which uses a region to store all the procedures.
* <p/>
* FileSystem layout:
*
* <pre>
* hbase
* |
* --MasterProcs
* |
* --data
* | |
* | --/master/procedure/&lt;encoded-region-name&gt; <---- The region data
* | |
* | --replay <---- The edits to replay
* |
* --WALs
* |
* --&lt;master-server-name&gt; <---- The WAL dir for active master
* |
* --&lt;master-server-name&gt;-dead <---- The WAL dir dead master
* </pre>
*
* We use p:d column to store the serialized protobuf format procedure, and when deleting we
* will first fill the info:proc column with an empty byte array, and then actually delete them in
* the {@link #cleanup()} method. This is because that we need to retain the max procedure id, so we
* can not directly delete a procedure row as we do not know if it is the one with the max procedure
* id.
*/
@InterfaceAudience.Private
public class RegionProcedureStore extends ProcedureStoreBase {
private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStore.class);
static final String MAX_WALS_KEY = "hbase.procedure.store.region.maxwals";
private static final int DEFAULT_MAX_WALS = 10;
static final String USE_HSYNC_KEY = "hbase.procedure.store.region.wal.hsync";
static final String MASTER_PROCEDURE_DIR = "MasterProcs";
static final String LOGCLEANER_PLUGINS = "hbase.procedure.store.region.logcleaner.plugins";
private static final String DATA_DIR = "data";
private static final String REPLAY_EDITS_DIR = "replay";
private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
private static final TableName TABLE_NAME = TableName.valueOf("master:procedure");
private static final byte[] FAMILY = Bytes.toBytes("p");
private static final byte[] PROC_QUALIFIER = Bytes.toBytes("d");
private static final int REGION_ID = 1;
private static final TableDescriptor TABLE_DESC = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
private final Server server;
private final LeaseRecovery leaseRecovery;
private WALFactory walFactory;
@VisibleForTesting
HRegion region;
private RegionFlusherAndCompactor flusherAndCompactor;
@VisibleForTesting
RegionProcedureStoreWALRoller walRoller;
private int numThreads;
public RegionProcedureStore(Server server, LeaseRecovery leaseRecovery) {
this.server = server;
this.leaseRecovery = leaseRecovery;
}
@Override
public void start(int numThreads) throws IOException {
if (!setRunning(true)) {
return;
}
LOG.info("Starting the Region Procedure Store...");
this.numThreads = numThreads;
}
private void shutdownWAL() {
if (walFactory != null) {
try {
walFactory.shutdown();
} catch (IOException e) {
LOG.warn("Failed to shutdown WAL", e);
}
}
}
private void closeRegion(boolean abort) {
if (region != null) {
try {
region.close(abort);
} catch (IOException e) {
LOG.warn("Failed to close region", e);
}
}
}
@Override
public void stop(boolean abort) {
if (!setRunning(false)) {
return;
}
LOG.info("Stopping the Region Procedure Store, isAbort={}", abort);
if (flusherAndCompactor != null) {
flusherAndCompactor.close();
}
// if abort, we shutdown wal first to fail the ongoing updates to the region, and then close the
// region, otherwise there will be dead lock.
if (abort) {
shutdownWAL();
closeRegion(true);
} else {
closeRegion(false);
shutdownWAL();
}
if (walRoller != null) {
walRoller.close();
}
}
@Override
public int getNumThreads() {
return numThreads;
}
@Override
public int setRunningProcedureCount(int count) {
// useless for region based storage.
return count;
}
private WAL createWAL(FileSystem fs, Path rootDir, RegionInfo regionInfo) throws IOException {
String logName = AbstractFSWALProvider.getWALDirectoryName(server.getServerName().toString());
Path walDir = new Path(rootDir, logName);
LOG.debug("WALDir={}", walDir);
if (fs.exists(walDir)) {
throw new HBaseIOException(
"Master procedure store has already created directory at " + walDir);
}
if (!fs.mkdirs(walDir)) {
throw new IOException("Can not create master procedure wal directory " + walDir);
}
WAL wal = walFactory.getWAL(regionInfo);
walRoller.addWAL(wal);
return wal;
}
private HRegion bootstrap(Configuration conf, FileSystem fs, Path rootDir, Path dataDir)
throws IOException {
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).setRegionId(REGION_ID).build();
Path tmpDataDir = new Path(dataDir.getParent(), dataDir.getName() + "-tmp");
if (fs.exists(tmpDataDir) && !fs.delete(tmpDataDir, true)) {
throw new IOException("Can not delete partial created proc region " + tmpDataDir);
}
Path tableDir = CommonFSUtils.getTableDir(tmpDataDir, TABLE_NAME);
HRegion.createHRegion(conf, regionInfo, fs, tableDir, TABLE_DESC).close();
if (!fs.rename(tmpDataDir, dataDir)) {
throw new IOException("Can not rename " + tmpDataDir + " to " + dataDir);
}
WAL wal = createWAL(fs, rootDir, regionInfo);
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
null);
}
private HRegion open(Configuration conf, FileSystem fs, Path rootDir, Path dataDir)
throws IOException {
String factoryId = server.getServerName().toString();
Path tableDir = CommonFSUtils.getTableDir(dataDir, TABLE_NAME);
Path regionDir =
fs.listStatus(tableDir, p -> RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
.getPath();
Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
if (!fs.exists(replayEditsDir) && !fs.mkdirs(replayEditsDir)) {
throw new IOException("Failed to create replay directory: " + replayEditsDir);
}
Path walsDir = new Path(rootDir, HREGION_LOGDIR_NAME);
for (FileStatus walDir : fs.listStatus(walsDir)) {
if (!walDir.isDirectory()) {
continue;
}
if (walDir.getPath().getName().startsWith(factoryId)) {
LOG.warn("This should not happen in real production as we have not created our WAL " +
"directory yet, ignore if you are running a procedure related UT");
}
Path deadWALDir;
if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
deadWALDir =
new Path(walDir.getPath().getParent(), walDir.getPath().getName() + DEAD_WAL_DIR_SUFFIX);
if (!fs.rename(walDir.getPath(), deadWALDir)) {
throw new IOException("Can not rename " + walDir + " to " + deadWALDir +
" when recovering lease of proc store");
}
LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), deadWALDir);
} else {
deadWALDir = walDir.getPath();
LOG.info("{} is already marked as dead", deadWALDir);
}
for (FileStatus walFile : fs.listStatus(deadWALDir)) {
Path replayEditsFile = new Path(replayEditsDir, walFile.getPath().getName());
leaseRecovery.recoverFileLease(fs, walFile.getPath());
if (!fs.rename(walFile.getPath(), replayEditsFile)) {
throw new IOException("Can not rename " + walFile.getPath() + " to " + replayEditsFile +
" when recovering lease of proc store");
}
LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
}
LOG.info("Delete empty proc wal dir {}", deadWALDir);
fs.delete(deadWALDir, true);
}
RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
WAL wal = createWAL(fs, rootDir, regionInfo);
conf.set(HRegion.SPECIAL_RECOVERED_EDITS_DIR,
replayEditsDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, TABLE_DESC, wal, null,
null);
}
@SuppressWarnings("deprecation")
private void tryMigrate(FileSystem fs) throws IOException {
Configuration conf = server.getConfiguration();
Path procWALDir =
new Path(CommonFSUtils.getWALRootDir(conf), WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
if (!fs.exists(procWALDir)) {
return;
}
LOG.info("The old procedure wal directory {} exists, start migrating", procWALDir);
WALProcedureStore store = new WALProcedureStore(conf, leaseRecovery);
store.start(numThreads);
store.recoverLease();
MutableLong maxProcIdSet = new MutableLong(-1);
MutableLong maxProcIdFromProcs = new MutableLong(-1);
store.load(new ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
long procCount = 0;
while (procIter.hasNext()) {
Procedure<?> proc = procIter.next();
update(proc);
procCount++;
if (proc.getProcId() > maxProcIdFromProcs.longValue()) {
maxProcIdFromProcs.setValue(proc.getProcId());
}
}
LOG.info("Migrated {} procedures", procCount);
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
long corruptedCount = 0;
while (procIter.hasNext()) {
LOG.error("Corrupted procedure {}", procIter.next());
corruptedCount++;
}
if (corruptedCount > 0) {
throw new IOException("There are " + corruptedCount + " corrupted procedures when" +
" migrating from the old WAL based store to the new region based store, please" +
" fix them before upgrading again.");
}
}
});
LOG.info("The max pid is {}, and the max pid of all loaded procedures is {}",
maxProcIdSet.longValue(), maxProcIdFromProcs.longValue());
// Theoretically, the maxProcIdSet should be greater than or equal to maxProcIdFromProcs, but
// anyway, let's do a check here.
if (maxProcIdSet.longValue() > maxProcIdFromProcs.longValue()) {
if (maxProcIdSet.longValue() > 0) {
// let's add a fake row to retain the max proc id
region.put(new Put(Bytes.toBytes(maxProcIdSet.longValue())).addColumn(FAMILY,
PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
}
} else if (maxProcIdSet.longValue() < maxProcIdFromProcs.longValue()) {
LOG.warn("The max pid is less than the max pid of all loaded procedures");
}
if (!fs.delete(procWALDir, true)) {
throw new IOException("Failed to delete the migrated proc wal directory " + procWALDir);
}
LOG.info("Migration finished");
}
@Override
public void recoverLease() throws IOException {
LOG.debug("Starting Region Procedure Store lease recovery...");
Configuration baseConf = server.getConfiguration();
FileSystem fs = CommonFSUtils.getWALFileSystem(baseConf);
Path globalWALRootDir = CommonFSUtils.getWALRootDir(baseConf);
Path rootDir = new Path(globalWALRootDir, MASTER_PROCEDURE_DIR);
// we will override some configurations so create a new one.
Configuration conf = new Configuration(baseConf);
CommonFSUtils.setRootDir(conf, rootDir);
CommonFSUtils.setWALRootDir(conf, rootDir);
RegionFlusherAndCompactor.setupConf(conf);
walRoller = RegionProcedureStoreWALRoller.create(conf, server, fs, rootDir, globalWALRootDir);
walRoller.start();
conf.setInt(AbstractFSWAL.MAX_LOGS, conf.getInt(MAX_WALS_KEY, DEFAULT_MAX_WALS));
if (conf.get(USE_HSYNC_KEY) != null) {
conf.set(HRegion.WAL_HSYNC_CONF_KEY, conf.get(USE_HSYNC_KEY));
}
walFactory = new WALFactory(conf, server.getServerName().toString());
Path dataDir = new Path(rootDir, DATA_DIR);
if (fs.exists(dataDir)) {
// load the existing region.
region = open(conf, fs, rootDir, dataDir);
} else {
// bootstrapping...
region = bootstrap(conf, fs, rootDir, dataDir);
}
flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
walRoller.setFlusherAndCompactor(flusherAndCompactor);
tryMigrate(fs);
}
@Override
public void load(ProcedureLoader loader) throws IOException {
List<ProcedureProtos.Procedure> procs = new ArrayList<>();
long maxProcId = 0;
try (RegionScanner scanner = region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER))) {
List<Cell> cells = new ArrayList<>();
boolean moreRows;
do {
moreRows = scanner.next(cells);
if (cells.isEmpty()) {
continue;
}
Cell cell = cells.get(0);
cells.clear();
maxProcId = Math.max(maxProcId,
Bytes.toLong(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
if (cell.getValueLength() > 0) {
ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
.parseFrom(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
procs.add(proto);
}
} while (moreRows);
}
loader.setMaxProcId(maxProcId);
ProcedureTree tree = ProcedureTree.build(procs);
loader.load(tree.getValidProcs());
loader.handleCorrupted(tree.getCorruptedProcs());
}
private void serializePut(Procedure<?> proc, List<Mutation> mutations, List<byte[]> rowsToLock)
throws IOException {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
byte[] row = Bytes.toBytes(proc.getProcId());
mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, proto.toByteArray()));
rowsToLock.add(row);
}
// As we need to keep the max procedure id, here we can not simply delete the procedure, just fill
// the proc column with an empty array.
private void serializeDelete(long procId, List<Mutation> mutations, List<byte[]> rowsToLock) {
byte[] row = Bytes.toBytes(procId);
mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
rowsToLock.add(row);
}
@Override
public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
if (subProcs == null || subProcs.length == 0) {
// same with update, just insert a single procedure
update(proc);
return;
}
List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
try {
serializePut(proc, mutations, rowsToLock);
for (Procedure<?> subProc : subProcs) {
serializePut(subProc, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
Arrays.toString(subProcs), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
public void insert(Procedure<?>[] procs) {
List<Mutation> mutations = new ArrayList<>(procs.length);
List<byte[]> rowsToLock = new ArrayList<>(procs.length);
try {
for (Procedure<?> proc : procs) {
serializePut(proc, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
public void update(Procedure<?> proc) {
try {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
proto.toByteArray()));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
public void delete(long procId) {
try {
region
.put(new Put(Bytes.toBytes(procId)).addColumn(FAMILY, PROC_QUALIFIER, EMPTY_BYTE_ARRAY));
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pid={}", procId, e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
public void delete(Procedure<?> parentProc, long[] subProcIds) {
List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1);
List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1);
try {
serializePut(parentProc, mutations, rowsToLock);
for (long subProcId : subProcIds) {
serializeDelete(subProcId, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub pids={}", parentProc,
Arrays.toString(subProcIds), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
public void delete(long[] procIds, int offset, int count) {
if (count == 0) {
return;
}
if (count == 1) {
delete(procIds[offset]);
return;
}
List<Mutation> mutations = new ArrayList<>(count);
List<byte[]> rowsToLock = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
long procId = procIds[offset + i];
serializeDelete(procId, mutations, rowsToLock);
}
try {
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to delete pids={}", Arrays.toString(procIds), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate();
}
@Override
public void cleanup() {
// actually delete the procedures if it is not the one with the max procedure id.
List<Cell> cells = new ArrayList<Cell>();
try (RegionScanner scanner =
region.getScanner(new Scan().addColumn(FAMILY, PROC_QUALIFIER).setReversed(true))) {
// skip the row with max procedure id
boolean moreRows = scanner.next(cells);
if (cells.isEmpty()) {
return;
}
cells.clear();
while (moreRows) {
moreRows = scanner.next(cells);
if (cells.isEmpty()) {
continue;
}
Cell cell = cells.get(0);
cells.clear();
if (cell.getValueLength() == 0) {
region.delete(new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
}
}
} catch (IOException e) {
LOG.warn("Failed to clean up delete procedures", e);
}
}
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.apache.hadoop.hbase.HConstants.HREGION_OLDLOGDIR_NAME;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* As long as there is no RegionServerServices for the procedure store region, we need implement log
* roller logic by our own.
* <p/>
* We can reuse most of the code for normal wal roller, the only difference is that there is only
* one region, so in {@link #scheduleFlush(String)} method we can just schedule flush for the
* procedure store region.
*/
@InterfaceAudience.Private
final class RegionProcedureStoreWALRoller extends AbstractWALRoller<Abortable> {
private static final Logger LOG = LoggerFactory.getLogger(RegionProcedureStoreWALRoller.class);
static final String ROLL_PERIOD_MS_KEY = "hbase.procedure.store.region.walroll.period.ms";
private static final long DEFAULT_ROLL_PERIOD_MS = TimeUnit.MINUTES.toMillis(15);
private volatile RegionFlusherAndCompactor flusherAndCompactor;
private final FileSystem fs;
private final Path walArchiveDir;
private final Path globalWALArchiveDir;
private RegionProcedureStoreWALRoller(Configuration conf, Abortable abortable, FileSystem fs,
Path walRootDir, Path globalWALRootDir) {
super("RegionProcedureStoreWALRoller", conf, abortable);
this.fs = fs;
this.walArchiveDir = new Path(walRootDir, HREGION_OLDLOGDIR_NAME);
this.globalWALArchiveDir = new Path(globalWALRootDir, HREGION_OLDLOGDIR_NAME);
}
@Override
protected void afterRoll(WAL wal) {
// move the archived WAL files to the global archive path
try {
if (!fs.exists(globalWALArchiveDir) && !fs.mkdirs(globalWALArchiveDir)) {
LOG.warn("Failed to create global archive dir {}", globalWALArchiveDir);
return;
}
FileStatus[] archivedWALFiles = fs.listStatus(walArchiveDir);
if (archivedWALFiles == null) {
return;
}
for (FileStatus status : archivedWALFiles) {
Path file = status.getPath();
Path newFile = new Path(globalWALArchiveDir,
file.getName() + MasterProcedureUtil.ARCHIVED_PROC_WAL_SUFFIX);
if (fs.rename(file, newFile)) {
LOG.info("Successfully moved {} to {}", file, newFile);
} else {
LOG.warn("Failed to move archived wal from {} to global place {}", file, newFile);
}
}
} catch (IOException e) {
LOG.warn("Failed to move archived wals from {} to global dir {}", walArchiveDir,
globalWALArchiveDir, e);
}
}
@Override
protected void scheduleFlush(String encodedRegionName) {
RegionFlusherAndCompactor flusher = this.flusherAndCompactor;
if (flusher != null) {
flusher.requestFlush();
}
}
void setFlusherAndCompactor(RegionFlusherAndCompactor flusherAndCompactor) {
this.flusherAndCompactor = flusherAndCompactor;
}
static RegionProcedureStoreWALRoller create(Configuration conf, Abortable abortable,
FileSystem fs, Path walRootDir, Path globalWALRootDir) {
// we can not run with wal disabled, so force set it to true.
conf.setBoolean(WALFactory.WAL_ENABLED, true);
// we do not need this feature, so force disable it.
conf.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, false);
conf.setLong(WAL_ROLL_PERIOD_KEY, conf.getLong(ROLL_PERIOD_MS_KEY, DEFAULT_ROLL_PERIOD_MS));
return new RegionProcedureStoreWALRoller(conf, abortable, fs, walRootDir, globalWALRootDir);
}
}

View File

@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.RandomAccess;
import java.util.Set;
@ -71,6 +72,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -160,8 +162,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
@ -236,12 +236,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String HBASE_MAX_CELL_SIZE_KEY = "hbase.server.keyvalue.maxsize";
public static final int DEFAULT_MAX_CELL_SIZE = 10485760;
/**
* This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value.
*/
private static final Durability DEFAULT_DURABILITY = Durability.SYNC_WAL;
public static final String HBASE_REGIONSERVER_MINIBATCH_SIZE =
"hbase.regionserver.minibatch.size";
public static final int DEFAULT_HBASE_REGIONSERVER_MINIBATCH_SIZE = 20000;
@ -249,6 +243,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync";
public static final boolean DEFAULT_WAL_HSYNC = false;
/**
* This is for for using HRegion as a local storage, where we may put the recovered edits in a
* special place. Once this is set, we will only replay the recovered edits under this directory
* and ignore the original replay directory configs.
*/
public static final String SPECIAL_RECOVERED_EDITS_DIR =
"hbase.hregion.special.recovered.edits.dir";
final AtomicBoolean closed = new AtomicBoolean(false);
/* Closing can take some time; use the closing flag if there is stuff we don't
@ -4556,6 +4558,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return size.getHeapSize() + size.getOffHeapSize() > getMemStoreFlushSize();
}
private void deleteRecoveredEdits(FileSystem fs, Iterable<Path> files) throws IOException {
for (Path file : files) {
if (!fs.delete(file, false)) {
LOG.error("Failed delete of {}", file);
} else {
LOG.debug("Deleted recovered.edits file={}", file);
}
}
}
/**
* Read the edits put under this region by wal splitting process. Put
* the recovered edits back up into this region.
@ -4587,11 +4599,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* the maxSeqId for the store to be applied, else its skipped.
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
protected long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status)
throws IOException {
@VisibleForTesting
long replayRecoveredEditsIfAny(Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, final MonitoredTask status) throws IOException {
long minSeqIdForTheRegion = -1;
for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
@ -4599,7 +4610,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
long seqId = minSeqIdForTheRegion;
String specialRecoveredEditsDirStr = conf.get(SPECIAL_RECOVERED_EDITS_DIR);
if (org.apache.commons.lang3.StringUtils.isBlank(specialRecoveredEditsDirStr)) {
FileSystem walFS = getWalFileSystem();
FileSystem rootFS = getFilesystem();
Path wrongRegionWALDir = FSUtils.getWrongWALRegionDir(conf, getRegionInfo().getTable(),
@ -4622,18 +4634,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionWALDir);
seqId = Math.max(seqId, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS,
files, reporter, regionWALDir));
seqId = Math.max(seqId,
replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS, files, reporter, regionWALDir));
if (seqId > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqId, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
internalFlushcache(null, seqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY);
}
// Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
// For debugging data loss issues!
// If this flag is set, make use of the hfile archiving by making recovered.edits a fake
// column family. Have to fake out file type too by casting our recovered.edits as storefiles
// column family. Have to fake out file type too by casting our recovered.edits as
// storefiles
String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionWALDir).getName();
Set<HStoreFile> fakeStoreFiles = new HashSet<>(files.size());
for (Path file : files) {
@ -4641,21 +4654,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
getRegionWALFileSystem().archiveRecoveredEdits(fakeFamilyName, fakeStoreFiles);
} else {
for (Path file : Iterables.concat(files, filesUnderWrongRegionWALDir)) {
if (!walFS.delete(file, false)) {
LOG.error("Failed delete of {}", file);
deleteRecoveredEdits(walFS, Iterables.concat(files, filesUnderWrongRegionWALDir));
deleteRecoveredEdits(rootFS, filesUnderRootDir);
}
} else {
LOG.debug("Deleted recovered.edits file={}", file);
Path recoveredEditsDir = new Path(specialRecoveredEditsDirStr);
FileSystem fs = recoveredEditsDir.getFileSystem(conf);
FileStatus[] files = fs.listStatus(recoveredEditsDir);
LOG.debug("Found {} recovered edits file(s) under {}", files == null ? 0 : files.length,
recoveredEditsDir);
if (files != null) {
for (FileStatus file : files) {
seqId =
Math.max(seqId, replayRecoveredEdits(file.getPath(), maxSeqIdInStores, reporter, fs));
}
}
for (Path file : filesUnderRootDir) {
if (!rootFS.delete(file, false)) {
LOG.error("Failed delete of {}", file);
} else {
LOG.debug("Deleted recovered.edits file={}", file);
}
if (seqId > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY);
}
deleteRecoveredEdits(fs,
Stream.of(files).map(FileStatus::getPath).collect(Collectors.toList()));
}
return seqId;
}
@ -4720,18 +4742,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return seqid;
}
/*
/**
* @param edits File of recovered edits.
* @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal
* must be larger than this to be replayed for each store.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
* @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in wal must be larger
* than this to be replayed for each store.
* @return the sequence id of the last edit added to this region out of the recovered edits log or
* <code>minSeqId</code> if nothing added from editlogs.
*/
private long replayRecoveredEdits(final Path edits,
Map<byte[], Long> maxSeqIdInStores, final CancelableProgressable reporter, FileSystem fs)
throws IOException {
private long replayRecoveredEdits(final Path edits, Map<byte[], Long> maxSeqIdInStores,
final CancelableProgressable reporter, FileSystem fs) throws IOException {
String msg = "Replaying edits from " + edits;
LOG.info(msg);
MonitoredTask status = TaskMonitor.get().createStatus(msg);
@ -7100,15 +7119,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param wal shared WAL
* @param initialize - true to initialize the region
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final RegionInfo info, final Path rootDir,
final Configuration conf, final TableDescriptor hTableDescriptor,
final WAL wal, final boolean initialize)
throws IOException {
LOG.info("creating " + info
+ ", tableDescriptor=" + (hTableDescriptor == null? "null": hTableDescriptor) +
", regionDir=" + rootDir);
final Configuration conf, final TableDescriptor hTableDescriptor, final WAL wal,
final boolean initialize) throws IOException {
LOG.info("creating " + info + ", tableDescriptor=" +
(hTableDescriptor == null ? "null" : hTableDescriptor) + ", regionDir=" + rootDir);
createRegionDir(conf, info, rootDir);
FileSystem fs = rootDir.getFileSystem(conf);
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
@ -7119,6 +7135,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return region;
}
/**
* Create a region under the given table directory.
*/
public static HRegion createHRegion(Configuration conf, RegionInfo regionInfo, FileSystem fs,
Path tableDir, TableDescriptor tableDesc) throws IOException {
LOG.info("Creating {}, tableDescriptor={}, under table dir {}", regionInfo, tableDesc,
tableDir);
HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, regionInfo);
HRegion region = HRegion.newHRegion(tableDir, null, fs, conf, regionInfo, tableDesc, null);
return region;
}
/**
* Create the region directory in the filesystem.
*/
@ -7270,14 +7298,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
return openHRegion(conf, fs, rootDir, tableDir, info, htd, wal, rsServices, reporter);
return openHRegionFromTableDir(conf, fs, tableDir, info, htd, wal, rsServices, reporter);
}
/**
* Open a Region.
* @param conf The Configuration object to use.
* @param fs Filesystem to use
* @param rootDir Root directory for HBase instance
* @param info Info for region to be opened.
* @param htd the table descriptor
* @param wal WAL for region to use. This method will call
@ -7288,15 +7315,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param reporter An interface we can report progress against.
* @return new HRegion
*/
public static HRegion openHRegion(final Configuration conf, final FileSystem fs,
final Path rootDir, final Path tableDir, final RegionInfo info, final TableDescriptor htd,
final WAL wal, final RegionServerServices rsServices,
final CancelableProgressable reporter)
public static HRegion openHRegionFromTableDir(final Configuration conf, final FileSystem fs,
final Path tableDir, final RegionInfo info, final TableDescriptor htd, final WAL wal,
final RegionServerServices rsServices, final CancelableProgressable reporter)
throws IOException {
if (info == null) throw new NullPointerException("Passed region info is null");
if (LOG.isDebugEnabled()) {
LOG.debug("Opening region: " + info);
}
Objects.requireNonNull(info, "RegionInfo cannot be null");
LOG.debug("Opening region: {}", info);
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, rsServices);
return r.openHRegion(reporter);
}

View File

@ -18,23 +18,9 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
import java.util.Map;
import org.apache.hadoop.hbase.wal.AbstractWALRoller;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,166 +38,21 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
*/
@InterfaceAudience.Private
@VisibleForTesting
public class LogRoller extends HasThread implements Closeable {
public class LogRoller extends AbstractWALRoller<RegionServerServices> {
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
protected final RegionServerServices services;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
private volatile boolean running = true;
public void addWAL(WAL wal) {
// check without lock first
if (walNeedsRoll.containsKey(wal)) {
return;
}
// this is to avoid race between addWAL and requestRollAll.
synchronized (this) {
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized (LogRoller.this) {
walNeedsRoll.put(wal, Boolean.TRUE);
LogRoller.this.notifyAll();
}
}
});
}
}
}
public void requestRollAll() {
synchronized (this) {
List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
for (WAL wal : wals) {
walNeedsRoll.put(wal, Boolean.TRUE);
}
notifyAll();
}
}
public LogRoller(RegionServerServices services) {
super("LogRoller");
this.services = services;
this.rollPeriod = this.services.getConfiguration().
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.services.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
super("LogRoller", services.getConfiguration(), services);
}
/**
* we need to check low replication in period, see HBASE-18132
*/
private void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey();
boolean needRollAlready = entry.getValue();
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
}
} catch (Throwable e) {
LOG.warn("Failed checking low replication", e);
}
}
private void abort(String reason, Throwable cause) {
// close all WALs before calling abort on RS.
// This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
// failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
// is already broken.
for (WAL wal : walNeedsRoll.keySet()) {
// shutdown rather than close here since we are going to abort the RS and the wals need to be
// split when recovery
try {
wal.shutdown();
} catch (IOException e) {
LOG.warn("Failed to shutdown wal", e);
}
}
this.services.abort(reason, cause);
}
@Override
public void run() {
while (running) {
boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
periodic = (now - this.lastRollTime) > this.rollPeriod;
if (periodic) {
// Time for periodic roll, fall through
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
} else {
synchronized (this) {
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
// WAL roll requested, fall through
LOG.debug("WAL roll requested");
} else {
try {
wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// restore the interrupt state
Thread.currentThread().interrupt();
}
// goto the beginning to check whether again whether we should fall through to roll
// several WALs, and also check whether we should quit.
continue;
}
}
}
try {
this.lastRollTime = System.currentTimeMillis();
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
.hasNext();) {
Entry<WAL, Boolean> entry = iter.next();
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
scheduleFlush(Bytes.toString(r));
}
}
}
} catch (FailedLogCloseException | ConnectException e) {
abort("Failed log close in log roller", e);
} catch (IOException ex) {
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
abort("IOE in log roller",
ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
abort("Log rolling failed", ex);
}
}
LOG.info("LogRoller exiting.");
}
/**
* @param encodedRegionName Encoded name of region to flush.
*/
private void scheduleFlush(String encodedRegionName) {
HRegion r = (HRegion) this.services.getRegion(encodedRegionName);
protected void scheduleFlush(String encodedRegionName) {
RegionServerServices services = this.abortable;
HRegion r = (HRegion) services.getRegion(encodedRegionName);
if (r == null) {
LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName);
return;
}
FlushRequester requester = this.services.getFlushRequester();
FlushRequester requester = services.getFlushRequester();
if (requester == null) {
LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null",
encodedRegionName, r);
@ -221,18 +62,8 @@ public class LogRoller extends HasThread implements Closeable {
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
}
/**
* For testing only
* @return true if all WAL roll finished
*/
@VisibleForTesting
public boolean walRollFinished() {
return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll);
}
@Override
public void close() {
running = false;
interrupt();
Map<WAL, Boolean> getWalNeedsRoll() {
return this.walNeedsRoll;
}
}

View File

@ -132,6 +132,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
protected static final String WAL_SYNC_TIMEOUT_MS = "hbase.regionserver.wal.sync.timeout";
protected static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
public static final String MAX_LOGS = "hbase.regionserver.maxlogs";
/**
* file system instance
*/
@ -434,8 +436,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
this.blocksize = WALUtil.getWALBlockSize(this.conf, this.fs, this.walDir);
float multiplier = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.5f);
this.logrollsize = (long)(this.blocksize * multiplier);
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs",
Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
this.maxLogs = conf.getInt(MAX_LOGS, Math.max(32, calculateMaxLogFiles(conf, logrollsize)));
LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" +
StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" +

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Runs periodically to determine if the WAL should be rolled.
* <p/>
* NOTE: This class extends Thread rather than Chore because the sleep time can be interrupted when
* there is something to do, rather than the Chore sleep time which is invariant.
* <p/>
* The {@link #scheduleFlush(String)} is abstract here, as sometimes we hold a region without a
* region server but we still want to roll its WAL.
* <p/>
* TODO: change to a pool of threads
*/
@InterfaceAudience.Private
public abstract class AbstractWALRoller<T extends Abortable> extends HasThread
implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractWALRoller.class);
protected static final String WAL_ROLL_PERIOD_KEY = "hbase.regionserver.logroll.period";
protected final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
protected final T abortable;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
private final long rollPeriod;
private final int threadWakeFrequency;
// The interval to check low replication on hlog's pipeline
private long checkLowReplicationInterval;
private volatile boolean running = true;
public void addWAL(WAL wal) {
// check without lock first
if (walNeedsRoll.containsKey(wal)) {
return;
}
// this is to avoid race between addWAL and requestRollAll.
synchronized (this) {
if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) {
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void logRollRequested(WALActionsListener.RollRequestReason reason) {
// TODO logs will contend with each other here, replace with e.g. DelayedQueue
synchronized (AbstractWALRoller.this) {
walNeedsRoll.put(wal, Boolean.TRUE);
AbstractWALRoller.this.notifyAll();
}
}
});
}
}
}
public void requestRollAll() {
synchronized (this) {
List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet());
for (WAL wal : wals) {
walNeedsRoll.put(wal, Boolean.TRUE);
}
notifyAll();
}
}
protected AbstractWALRoller(String name, Configuration conf, T abortable) {
super(name);
this.abortable = abortable;
this.rollPeriod = conf.getLong(WAL_ROLL_PERIOD_KEY, 3600000);
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval =
conf.getLong("hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
}
/**
* we need to check low replication in period, see HBASE-18132
*/
private void checkLowReplication(long now) {
try {
for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
WAL wal = entry.getKey();
boolean needRollAlready = entry.getValue();
if (needRollAlready || !(wal instanceof AbstractFSWAL)) {
continue;
}
((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval);
}
} catch (Throwable e) {
LOG.warn("Failed checking low replication", e);
}
}
private void abort(String reason, Throwable cause) {
// close all WALs before calling abort on RS.
// This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we
// failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it
// is already broken.
for (WAL wal : walNeedsRoll.keySet()) {
// shutdown rather than close here since we are going to abort the RS and the wals need to be
// split when recovery
try {
wal.shutdown();
} catch (IOException e) {
LOG.warn("Failed to shutdown wal", e);
}
}
abortable.abort(reason, cause);
}
@Override
public void run() {
while (running) {
boolean periodic = false;
long now = System.currentTimeMillis();
checkLowReplication(now);
periodic = (now - this.lastRollTime) > this.rollPeriod;
if (periodic) {
// Time for periodic roll, fall through
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
} else {
synchronized (this) {
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
// WAL roll requested, fall through
LOG.debug("WAL roll requested");
} else {
try {
wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// restore the interrupt state
Thread.currentThread().interrupt();
}
// goto the beginning to check whether again whether we should fall through to roll
// several WALs, and also check whether we should quit.
continue;
}
}
}
try {
this.lastRollTime = System.currentTimeMillis();
for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter
.hasNext();) {
Entry<WAL, Boolean> entry = iter.next();
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
scheduleFlush(Bytes.toString(r));
}
}
afterRoll(wal);
}
} catch (FailedLogCloseException | ConnectException e) {
abort("Failed log close in log roller", e);
} catch (IOException ex) {
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
abort("IOE in log roller",
ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
abort("Log rolling failed", ex);
}
}
LOG.info("LogRoller exiting.");
}
/**
* Called after we finish rolling the give {@code wal}.
*/
protected void afterRoll(WAL wal) {
}
/**
* @param encodedRegionName Encoded name of region to flush.
*/
protected abstract void scheduleFlush(String encodedRegionName);
private boolean isWaiting() {
Thread.State state = getThread().getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
}
/**
* @return true if all WAL roll finished
*/
public boolean walRollFinished() {
return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll) && isWaiting();
}
/**
* Wait until all wals have been rolled after calling {@link #requestRollAll()}.
*/
public void waitUntilWalRollFinished() throws InterruptedException {
while (!walRollFinished()) {
Thread.sleep(100);
}
}
@Override
public void close() {
running = false;
interrupt();
}
}

View File

@ -83,6 +83,8 @@ public class WALFactory {
public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
final String factoryId;
private final WALProvider provider;
// The meta updates are written to a different wal. If this
@ -194,7 +196,7 @@ public class WALFactory {
this.conf = conf;
this.factoryId = factoryId;
// end required early initialization
if (conf.getBoolean("hbase.regionserver.hlog.enabled", true)) {
if (conf.getBoolean(WAL_ENABLED, true)) {
provider = getProvider(WAL_PROVIDER, DEFAULT_WAL_PROVIDER, null);
} else {
// special handling of existing configuration behavior.

View File

@ -30,21 +30,12 @@
import="org.apache.hadoop.hbase.procedure2.LockedResource"
import="org.apache.hadoop.hbase.procedure2.Procedure"
import="org.apache.hadoop.hbase.procedure2.ProcedureExecutor"
import="org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFile"
import="org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore"
import="org.apache.hadoop.hbase.procedure2.util.StringUtils"
import="org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix"
%>
<%
HMaster master = (HMaster) getServletContext().getAttribute(HMaster.MASTER);
ProcedureExecutor<MasterProcedureEnv> procExecutor = master.getMasterProcedureExecutor();
WALProcedureStore walStore = master.getWalProcedureStore();
ArrayList<WALProcedureStore.SyncMetrics> syncMetricsBuff = walStore.getSyncMetrics();
long millisToNextRoll = walStore.getMillisToNextPeriodicRoll();
long millisFromLastRoll = walStore.getMillisFromLastRoll();
ArrayList<ProcedureWALFile> procedureWALFiles = walStore.getActiveLogs();
Set<ProcedureWALFile> corruptedWALFiles = walStore.getCorruptedLogs();
List<Procedure<MasterProcedureEnv>> procedures = procExecutor.getProcedures();
Collections.sort(procedures, new Comparator<Procedure>() {
@Override
@ -159,109 +150,4 @@
<% } %>
</div>
<br />
<div class="container-fluid content">
<div class="row">
<div class="page-header">
<h2>Procedure WAL State</h2>
</div>
</div>
<div class="tabbable">
<ul class="nav nav-pills">
<li class="active">
<a href="#tab_WALFiles" data-toggle="tab">WAL files</a>
</li>
<li class="">
<a href="#tab_WALFilesCorrupted" data-toggle="tab">Corrupted WAL files</a>
</li>
<li class="">
<a href="#tab_WALRollTime" data-toggle="tab">WAL roll time</a>
</li>
<li class="">
<a href="#tab_SyncStats" data-toggle="tab">Sync stats</a>
</li>
</ul>
<div class="tab-content" style="padding-bottom: 9px; border-bottom: 1px solid #ddd;">
<div class="tab-pane active" id="tab_WALFiles">
<% if (procedureWALFiles != null && procedureWALFiles.size() > 0) { %>
<table class="table table-striped">
<tr>
<th>LogID</th>
<th>Size</th>
<th>Timestamp</th>
<th>Path</th>
</tr>
<% for (int i = procedureWALFiles.size() - 1; i >= 0; --i) { %>
<% ProcedureWALFile pwf = procedureWALFiles.get(i); %>
<tr>
<td> <%= pwf.getLogId() %></td>
<td> <%= TraditionalBinaryPrefix.long2String(pwf.getSize(), "B", 1) %> </td>
<td> <%= new Date(pwf.getTimestamp()) %> </td>
<td> <%= escapeXml(pwf.toString()) %> </td>
</tr>
<% } %>
</table>
<% } else {%>
<p> No WAL files</p>
<% } %>
</div>
<div class="tab-pane" id="tab_WALFilesCorrupted">
<% if (corruptedWALFiles != null && corruptedWALFiles.size() > 0) { %>
<table class="table table-striped">
<tr>
<th>LogID</th>
<th>Size</th>
<th>Timestamp</th>
<th>Path</th>
</tr>
<% for (ProcedureWALFile cwf:corruptedWALFiles) { %>
<tr>
<td> <%= cwf.getLogId() %></td>
<td> <%= TraditionalBinaryPrefix.long2String(cwf.getSize(), "B", 1) %> </td>
<td> <%= new Date(cwf.getTimestamp()) %> </td>
<td> <%= escapeXml(cwf.toString()) %> </td>
</tr>
<% } %>
</table>
<% } else {%>
<p> No corrupted WAL files</p>
<% } %>
</div>
<div class="tab-pane" id="tab_WALRollTime">
<table class="table table-striped">
<tr>
<th> Milliseconds to next roll</th>
<th> Milliseconds from last roll</th>
</tr>
<tr>
<td> <%=StringUtils.humanTimeDiff(millisToNextRoll) %></td>
<td> <%=StringUtils.humanTimeDiff(millisFromLastRoll) %></td>
</tr>
</table>
</div>
<div class="tab-pane" id="tab_SyncStats">
<table class="table table-striped">
<tr>
<th> Time</th>
<th> Sync Wait</th>
<th> Last num of synced entries</th>
<th> Total Synced</th>
<th> Synced per second</th>
</tr>
<% for (int i = syncMetricsBuff.size() - 1; i >= 0; --i) { %>
<% WALProcedureStore.SyncMetrics syncMetrics = syncMetricsBuff.get(i); %>
<tr>
<td> <%= new Date(syncMetrics.getTimestamp()) %></td>
<td> <%= StringUtils.humanTimeDiff(syncMetrics.getSyncWaitMs()) %></td>
<td> <%= syncMetrics.getSyncedEntries() %></td>
<td> <%= TraditionalBinaryPrefix.long2String(syncMetrics.getTotalSyncedBytes(), "B", 1) %></td>
<td> <%= TraditionalBinaryPrefix.long2String((long)syncMetrics.getSyncedPerSec(), "B", 1) %></td>
</tr>
<%} %>
</table>
</div>
</div>
</div>
</div>
<br />
<jsp:include page="footer.jsp" />

View File

@ -126,7 +126,7 @@ public class TestLoadProcedureError {
ARRIVE.await();
FAIL_LOAD = true;
// do not persist the store tracker
UTIL.getMiniHBaseCluster().getMaster().getWalProcedureStore().stop(true);
UTIL.getMiniHBaseCluster().getMaster().getProcedureStore().stop(true);
UTIL.getMiniHBaseCluster().getMaster().abort("for testing");
waitNoMaster();
// restart twice, and should fail twice, as we will throw an exception in the afterReplay above

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.util.AbstractMap.SimpleImmutableEntry;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -89,7 +89,9 @@ public class TestMasterMetricsWrapper {
}
assertEquals(regionServerCount - 1, info.getNumRegionServers());
assertEquals(1, info.getNumDeadRegionServers());
assertEquals(1, info.getNumWALFiles());
// now we do not expose this information as WALProcedureStore is not the only ProcedureStore
// implementation any more.
assertEquals(0, info.getNumWALFiles());
}
@Test

View File

@ -1,238 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({MasterTests.class, LargeTests.class})
@Ignore
public class TestMasterProcedureWalLease {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMasterProcedureWalLease.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureWalLease.class);
@Rule
public TestName name = new TestName();
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static void setupConf(Configuration conf) {
// don't waste time retrying with the roll, the test is already slow enough.
conf.setInt(WALProcedureStore.MAX_RETRIES_BEFORE_ROLL_CONF_KEY, 1);
conf.setInt(WALProcedureStore.WAIT_BEFORE_ROLL_CONF_KEY, 0);
conf.setInt(WALProcedureStore.ROLL_RETRIES_CONF_KEY, 1);
conf.setInt(WALProcedureStore.MAX_SYNC_FAILURE_ROLL_CONF_KEY, 1);
}
@Before
public void setup() throws Exception {
setupConf(UTIL.getConfiguration());
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(2).numRegionServers(3).numDataNodes(3).build();
UTIL.startMiniCluster(option);
}
@After
public void tearDown() throws Exception {
try {
UTIL.shutdownMiniCluster();
} catch (Exception e) {
LOG.warn("failure shutting down cluster", e);
}
}
@Test
public void testWalRecoverLease() throws Exception {
final ProcedureStore masterStore = getMasterProcedureExecutor().getStore();
assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore);
HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
// Abort Latch for the master store
final CountDownLatch masterStoreAbort = new CountDownLatch(1);
masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {}
@Override
public void abortProcess() {
LOG.debug("Abort store of Master");
masterStoreAbort.countDown();
}
});
// startup a fake master the new WAL store will take the lease
// and the active master should abort.
HMaster backupMaster3 = Mockito.mock(HMaster.class);
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
((WALProcedureStore)masterStore).getWALDir(),
null,
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
// Abort Latch for the test store
final CountDownLatch backupStore3Abort = new CountDownLatch(1);
backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() {
@Override
public void postSync() {}
@Override
public void abortProcess() {
LOG.debug("Abort store of backupMaster3");
backupStore3Abort.countDown();
backupStore3.stop(true);
}
});
backupStore3.start(1);
backupStore3.recoverLease();
// Try to trigger a command on the master (WAL lease expired on the active one)
TableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf(name.getMethodName()), "f");
RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, null);
LOG.debug("submit proc");
try {
getMasterProcedureExecutor().submitProcedure(
new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions));
fail("expected RuntimeException 'sync aborted'");
} catch (RuntimeException e) {
LOG.info("got " + e.getMessage());
}
LOG.debug("wait master store abort");
masterStoreAbort.await();
// Now the real backup master should start up
LOG.debug("wait backup master to startup");
MasterProcedureTestingUtility.waitBackupMaster(UTIL, firstMaster);
assertEquals(true, firstMaster.isStopped());
// wait the store in here to abort (the test will fail due to timeout if it doesn't)
LOG.debug("wait the store to abort");
backupStore3.getStoreTracker().setDeleted(1, false);
try {
backupStore3.delete(1);
fail("expected RuntimeException 'sync aborted'");
} catch (RuntimeException e) {
LOG.info("got " + e.getMessage());
}
backupStore3Abort.await();
}
/**
* Tests proper fencing in case the current WAL store is fenced
*/
@Test
public void testWALfencingWithoutWALRolling() throws IOException {
testWALfencing(false);
}
/**
* Tests proper fencing in case the current WAL store does not receive writes until after the
* new WAL does a couple of WAL rolls.
*/
@Test
public void testWALfencingWithWALRolling() throws IOException {
testWALfencing(true);
}
public void testWALfencing(boolean walRolls) throws IOException {
final ProcedureStore procStore = getMasterProcedureExecutor().getStore();
assertTrue("expected WALStore for this test", procStore instanceof WALProcedureStore);
HMaster firstMaster = UTIL.getHBaseCluster().getMaster();
// cause WAL rolling after a delete in WAL:
firstMaster.getConfiguration().setLong(WALProcedureStore.ROLL_THRESHOLD_CONF_KEY, 1);
HMaster backupMaster3 = Mockito.mock(HMaster.class);
Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
((WALProcedureStore)procStore).getWALDir(),
null,
new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
// start a second store which should fence the first one out
LOG.info("Starting new WALProcedureStore");
procStore2.start(1);
procStore2.recoverLease();
// before writing back to the WAL store, optionally do a couple of WAL rolls (which causes
// to delete the old WAL files).
if (walRolls) {
LOG.info("Inserting into second WALProcedureStore, causing WAL rolls");
for (int i = 0; i < 512; i++) {
// insert something to the second store then delete it, causing a WAL roll(s)
Procedure proc2 = new TestProcedure(i);
procStore2.insert(proc2, null);
procStore2.delete(proc2.getProcId()); // delete the procedure so that the WAL is removed later
}
}
// Now, insert something to the first store, should fail.
// If the store does a WAL roll and continue with another logId without checking higher logIds
// it will incorrectly succeed.
LOG.info("Inserting into first WALProcedureStore");
try {
procStore.insert(new TestProcedure(11), null);
fail("Inserting into Procedure Store should have failed");
} catch (Exception ex) {
LOG.info("Received expected exception", ex);
}
}
// ==========================================================================
// Helpers
// ==========================================================================
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
final class RegionProcedureStoreTestHelper {
private RegionProcedureStoreTestHelper() {
}
static RegionProcedureStore createStore(Configuration conf, ProcedureLoader loader)
throws IOException {
Server server = mock(Server.class);
when(server.getConfiguration()).thenReturn(conf);
when(server.getServerName())
.thenReturn(ServerName.valueOf("localhost", 12345, System.currentTimeMillis()));
RegionProcedureStore store = new RegionProcedureStore(server, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
}
});
store.start(1);
store.recoverLease();
store.load(loader);
return store;
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
public class RegionProcedureStoreTestProcedure extends NoopProcedure<Void> {
private static long SEQ_ID = 0;
public RegionProcedureStoreTestProcedure() {
setProcId(++SEQ_ID);
}
@Override
protected Procedure<Void>[] execute(Void env) {
return null;
}
@Override
protected void rollback(Void env) {
}
@Override
protected boolean abort(Void env) {
return false;
}
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
long procId = getProcId();
if (procId % 2 == 0) {
Int64Value.Builder builder = Int64Value.newBuilder().setValue(procId);
serializer.serialize(builder.build());
}
}
@Override
protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
long procId = getProcId();
if (procId % 2 == 0) {
Int64Value value = serializer.deserialize(Int64Value.class);
assertEquals(procId, value.getValue());
}
}
public void setParent(Procedure<?> proc) {
super.setParentProcId(proc.getProcId());
}
public void finish() {
setState(ProcedureState.SUCCESS);
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStore {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStore.class);
private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class);
private HBaseCommonTestingUtility htu;
private RegionProcedureStore store;
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
htu.getConfiguration().setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setWALRootDir(htu.getConfiguration(), testDir);
store = RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new LoadCounter());
}
@After
public void tearDown() throws IOException {
store.stop(true);
htu.cleanupTestDir();
}
private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception {
LOG.debug("expected: " + procIds);
LoadCounter loader = new LoadCounter();
ProcedureTestingUtility.storeRestart(store, true, loader);
assertEquals(procIds.size(), loader.getLoadedCount());
assertEquals(0, loader.getCorruptedCount());
}
@Test
public void testLoad() throws Exception {
Set<Long> procIds = new HashSet<>();
// Insert something in the log
RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
procIds.add(proc1.getProcId());
store.insert(proc1, null);
RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
proc3.setParent(proc2);
RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
proc4.setParent(proc2);
procIds.add(proc2.getProcId());
procIds.add(proc3.getProcId());
procIds.add(proc4.getProcId());
store.insert(proc2, new Procedure[] { proc3, proc4 });
// Verify that everything is there
verifyProcIdsOnRestart(procIds);
// Update and delete something
proc1.finish();
store.update(proc1);
proc4.finish();
store.update(proc4);
store.delete(proc4.getProcId());
procIds.remove(proc4.getProcId());
// Verify that everything is there
verifyProcIdsOnRestart(procIds);
}
@Test
public void testCleanup() throws Exception {
RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
store.insert(proc1, null);
RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure();
store.insert(proc2, null);
RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure();
store.insert(proc3, null);
LoadCounter loader = new LoadCounter();
store.load(loader);
assertEquals(proc3.getProcId(), loader.getMaxProcId());
assertEquals(3, loader.getRunnableCount());
store.delete(proc3.getProcId());
store.delete(proc2.getProcId());
loader = new LoadCounter();
store.load(loader);
assertEquals(proc3.getProcId(), loader.getMaxProcId());
assertEquals(1, loader.getRunnableCount());
// the row should still be there
assertTrue(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
assertTrue(store.region
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
// proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc
// id
store.cleanup();
assertTrue(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
assertFalse(store.region
.get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists());
RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure();
store.insert(proc4, null);
store.cleanup();
// proc3 should also be deleted as now proc4 holds the max proc id
assertFalse(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.procedure2.store.LeaseRecovery;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureLoader;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@SuppressWarnings("deprecation")
@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStoreMigration {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStoreMigration.class);
private HBaseCommonTestingUtility htu;
private RegionProcedureStore store;
private WALProcedureStore walStore;
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
Configuration conf = htu.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
Path testDir = htu.getDataTestDir();
CommonFSUtils.setWALRootDir(conf, testDir);
walStore = new WALProcedureStore(conf, new LeaseRecovery() {
@Override
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
}
});
walStore.start(1);
walStore.recoverLease();
walStore.load(new LoadCounter());
}
@After
public void tearDown() throws IOException {
if (store != null) {
store.stop(true);
}
walStore.stop(true);
htu.cleanupTestDir();
}
@Test
public void test() throws IOException {
List<RegionProcedureStoreTestProcedure> procs = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
walStore.insert(proc, null);
procs.add(proc);
}
for (int i = 5; i < 10; i++) {
walStore.delete(procs.get(i).getProcId());
}
walStore.stop(true);
SortedSet<RegionProcedureStoreTestProcedure> loadedProcs =
new TreeSet<>((p1, p2) -> Long.compare(p1.getProcId(), p2.getProcId()));
MutableLong maxProcIdSet = new MutableLong(0);
store =
RegionProcedureStoreTestHelper.createStore(htu.getConfiguration(), new ProcedureLoader() {
@Override
public void setMaxProcId(long maxProcId) {
maxProcIdSet.setValue(maxProcId);
}
@Override
public void load(ProcedureIterator procIter) throws IOException {
while (procIter.hasNext()) {
RegionProcedureStoreTestProcedure proc =
(RegionProcedureStoreTestProcedure) procIter.next();
loadedProcs.add(proc);
}
}
@Override
public void handleCorrupted(ProcedureIterator procIter) throws IOException {
if (procIter.hasNext()) {
fail("Found corrupted procedures");
}
}
});
assertEquals(10, maxProcIdSet.longValue());
assertEquals(5, loadedProcs.size());
int procId = 1;
for (RegionProcedureStoreTestProcedure proc : loadedProcs) {
assertEquals(procId, proc.getProcId());
procId++;
}
Path testDir = htu.getDataTestDir();
FileSystem fs = testDir.getFileSystem(htu.getConfiguration());
Path oldProcWALDir = new Path(testDir, WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
// make sure the old proc wal directory has been deleted.
assertFalse(fs.exists(oldProcWALDir));
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2.store.region;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveProcedureWALCleaner;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, MediumTests.class })
public class TestRegionProcedureStoreWALCleaner {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionProcedureStoreWALCleaner.class);
private HBaseCommonTestingUtility htu;
private FileSystem fs;
private RegionProcedureStore store;
private ChoreService choreService;
private DirScanPool dirScanPool;
private LogCleaner logCleaner;
private Path globalWALArchiveDir;
@Before
public void setUp() throws IOException {
htu = new HBaseCommonTestingUtility();
Configuration conf = htu.getConfiguration();
conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, false);
Path testDir = htu.getDataTestDir();
fs = testDir.getFileSystem(conf);
CommonFSUtils.setWALRootDir(conf, testDir);
globalWALArchiveDir = new Path(testDir, HConstants.HREGION_OLDLOGDIR_NAME);
choreService = new ChoreService("Region-Procedure-Store");
dirScanPool = new DirScanPool(conf);
conf.setLong(TimeToLiveProcedureWALCleaner.TTL_CONF_KEY, 5000);
conf.setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 1000);
logCleaner = new LogCleaner(1000, new Stoppable() {
private volatile boolean stopped = false;
@Override
public void stop(String why) {
stopped = true;
}
@Override
public boolean isStopped() {
return stopped;
}
}, conf, fs, globalWALArchiveDir, dirScanPool);
choreService.scheduleChore(logCleaner);
store = RegionProcedureStoreTestHelper.createStore(conf, new LoadCounter());
}
@After
public void tearDown() throws IOException {
store.stop(true);
logCleaner.cancel();
dirScanPool.shutdownNow();
choreService.shutdown();
htu.cleanupTestDir();
}
@Test
public void test() throws IOException, InterruptedException {
RegionProcedureStoreTestProcedure proc = new RegionProcedureStoreTestProcedure();
store.insert(proc, null);
store.region.flush(true);
// no archived wal files yet
assertFalse(fs.exists(globalWALArchiveDir));
store.walRoller.requestRollAll();
store.walRoller.waitUntilWalRollFinished();
// should have one
FileStatus[] files = fs.listStatus(globalWALArchiveDir);
assertEquals(1, files.length);
Thread.sleep(2000);
// should still be there
assertTrue(fs.exists(files[0].getPath()));
Thread.sleep(6000);
// should have been cleaned
assertEquals(0, fs.listStatus(globalWALArchiveDir).length);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -59,7 +60,7 @@ public class TestRegionServerCrashDisableWAL {
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
UTIL.getConfiguration().setBoolean("hbase.regionserver.hlog.enabled", false);
UTIL.getConfiguration().setBoolean(WALFactory.WAL_ENABLED, false);
UTIL.startMiniCluster(2);
UTIL.createTable(TABLE_NAME, CF);
UTIL.waitTableAvailable(TABLE_NAME);