HBASE-19763 Fixed Checkstyle errors in hbase-procedure
This commit is contained in:
parent
268bcce76f
commit
5b01e613fb
|
@ -50,6 +50,13 @@
|
|||
<groupId>net.revelc.code</groupId>
|
||||
<artifactId>warbucks-maven-plugin</artifactId>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<configuration>
|
||||
<failOnViolation>true</failOnViolation>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
|
|
@ -15,13 +15,12 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -279,7 +278,6 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
|||
push(procedure, /* addFront= */ true, /* notify= */false);
|
||||
}
|
||||
|
||||
|
||||
// ==========================================================================
|
||||
// Internal helpers
|
||||
// ==========================================================================
|
||||
|
|
|
@ -15,12 +15,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.HBaseIOException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
|
|
|
@ -15,13 +15,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class LockedResource {
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -209,11 +208,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
* of the execution.
|
||||
* @param env the environment passed to the ProcedureExecutor
|
||||
* @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
|
||||
* procedure is done.
|
||||
* @throws ProcedureYieldException the procedure will be added back to the queue and retried later.
|
||||
* procedure is done.
|
||||
* @throws ProcedureYieldException the procedure will be added back to the queue and retried
|
||||
* later.
|
||||
* @throws InterruptedException the procedure will be added back to the queue and retried later.
|
||||
* @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself and
|
||||
* has set itself up waiting for an external event to wake it back up again.
|
||||
* @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
|
||||
* and has set itself up waiting for an external event to wake it back up again.
|
||||
*/
|
||||
protected abstract Procedure<TEnvironment>[] execute(TEnvironment env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
|
||||
|
@ -468,7 +468,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
sb.append(getParentProcId());
|
||||
}
|
||||
|
||||
/**
|
||||
/*
|
||||
* TODO
|
||||
* Enable later when this is being used.
|
||||
* Currently owner not used.
|
||||
|
@ -710,7 +710,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
/**
|
||||
* Will only be called when loading procedures from procedure store, where we need to record
|
||||
* whether the procedure has already held a lock. Later we will call
|
||||
* {@link #restoreLock(Object, ProcedureStore)} to actually acquire the lock.
|
||||
* {@link #restoreLock(Object)} to actually acquire the lock.
|
||||
*/
|
||||
final void lockedWhenLoading() {
|
||||
this.lockedWhenLoading = true;
|
||||
|
@ -764,7 +764,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
|
||||
/**
|
||||
* @return true if the procedure is finished. The Procedure may be completed successfully or
|
||||
* rolledback.
|
||||
* rolledback.
|
||||
*/
|
||||
public synchronized boolean isFinished() {
|
||||
return isSuccess() || state == ProcedureState.ROLLEDBACK;
|
||||
|
|
|
@ -16,14 +16,12 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Type class.
|
||||
* For conceptual purpose only. Seeing ProcedureDeque as type instead of just ArrayDeque gives
|
||||
|
|
|
@ -15,12 +15,11 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import org.apache.hadoop.hbase.exceptions.HBaseException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.exceptions.HBaseException;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Stable
|
||||
|
|
|
@ -433,23 +433,21 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
}
|
||||
|
||||
// add the nonce to the map
|
||||
if (nonceKey != null) {
|
||||
nonceKeysToProcIdsMap.put(nonceKey, procId);
|
||||
nonceKeysToProcIdsMap.put(nonceKey, procId); // add the nonce to the map
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Initialize the stacks
|
||||
// In the old implementation, for procedures in FAILED state, we will push it into the
|
||||
// ProcedureScheduler directly to execute the rollback. But this does not work after we
|
||||
// introduce the restore lock stage.
|
||||
// For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and
|
||||
// then when a procedure which has lock access, for example, a sub procedure of the procedure
|
||||
// which has the xlock, is pushed into the scheduler, we will add the queue back to let the
|
||||
// workers poll from it. The assumption here is that, the procedure which has the xlock should
|
||||
// have been polled out already, so when loading we can not add the procedure to scheduler first
|
||||
// and then call acquireLock, since the procedure is still in the queue, and since we will
|
||||
// remove the queue from runQueue, then no one can poll it out, then there is a dead lock
|
||||
// 2. Initialize the stacks: In the old implementation, for procedures in FAILED state, we will
|
||||
// push it into the ProcedureScheduler directly to execute the rollback. But this does not work
|
||||
// after we introduce the restore lock stage. For now, when we acquire a xlock, we will remove
|
||||
// the queue from runQueue in scheduler, and then when a procedure which has lock access, for
|
||||
// example, a sub procedure of the procedure which has the xlock, is pushed into the scheduler,
|
||||
// we will add the queue back to let the workers poll from it. The assumption here is that, the
|
||||
// procedure which has the xlock should have been polled out already, so when loading we can not
|
||||
// add the procedure to scheduler first and then call acquireLock, since the procedure is still
|
||||
// in the queue, and since we will remove the queue from runQueue, then no one can poll it out,
|
||||
// then there is a dead lock
|
||||
List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount);
|
||||
List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount);
|
||||
List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount);
|
||||
|
@ -464,9 +462,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
@SuppressWarnings("unchecked")
|
||||
Procedure<TEnvironment> proc = procIter.next();
|
||||
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
|
||||
|
||||
LOG.debug("Loading {}", proc);
|
||||
|
||||
Long rootProcId = getRootProcedureId(proc);
|
||||
// The orphan procedures will be passed to handleCorrupted, so add an assert here
|
||||
assert rootProcId != null;
|
||||
|
@ -508,14 +504,12 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// 3. Check the waiting procedures to see if some of them can be added to runnable.
|
||||
waitingList.forEach(proc -> {
|
||||
if (!proc.hasChildren()) {
|
||||
// Normally, WAITING procedures should be waken by its children.
|
||||
// But, there is a case that, all the children are successful and before
|
||||
// they can wake up their parent procedure, the master was killed.
|
||||
// So, during recovering the procedures from ProcedureWal, its children
|
||||
// are not loaded because of their SUCCESS state.
|
||||
// So we need to continue to run this WAITING procedure. But before
|
||||
// executing, we need to set its state to RUNNABLE, otherwise, a exception
|
||||
// will throw:
|
||||
// Normally, WAITING procedures should be waken by its children. But, there is a case that,
|
||||
// all the children are successful and before they can wake up their parent procedure, the
|
||||
// master was killed. So, during recovering the procedures from ProcedureWal, its children
|
||||
// are not loaded because of their SUCCESS state. So we need to continue to run this WAITING
|
||||
// procedure. But before executing, we need to set its state to RUNNABLE, otherwise, a
|
||||
// exception will throw:
|
||||
// Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE,
|
||||
// "NOT RUNNABLE! " + procedure.toString());
|
||||
proc.setState(ProcedureState.RUNNABLE);
|
||||
|
@ -743,9 +737,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// Nonce Procedure helpers
|
||||
// ==========================================================================
|
||||
/**
|
||||
* Create a NoneKey from the specified nonceGroup and nonce.
|
||||
* @param nonceGroup
|
||||
* @param nonce
|
||||
* Create a NonceKey from the specified nonceGroup and nonce.
|
||||
* @param nonceGroup the group to use for the {@link NonceKey}
|
||||
* @param nonce the nonce to use in the {@link NonceKey}
|
||||
* @return the generated NonceKey
|
||||
*/
|
||||
public NonceKey createNonceKey(final long nonceGroup, final long nonce) {
|
||||
|
@ -764,7 +758,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* @return the procId associated with the nonce, if any otherwise an invalid procId.
|
||||
*/
|
||||
public long registerNonce(final NonceKey nonceKey) {
|
||||
if (nonceKey == null) return -1;
|
||||
if (nonceKey == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// check if we have already a Reserved ID for the nonce
|
||||
Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey);
|
||||
|
@ -773,7 +769,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// and the procedure submitted with the specified nonce will use this ID.
|
||||
final long newProcId = nextProcId();
|
||||
oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId);
|
||||
if (oldProcId == null) return -1;
|
||||
if (oldProcId == null) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// we found a registered nonce, but the procedure may not have been submitted yet.
|
||||
|
@ -795,10 +793,14 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
* @param nonceKey A unique identifier for this operation from the client or process.
|
||||
*/
|
||||
public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) {
|
||||
if (nonceKey == null) return;
|
||||
if (nonceKey == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Long procId = nonceKeysToProcIdsMap.get(nonceKey);
|
||||
if (procId == null) return;
|
||||
if (procId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// if the procedure was not submitted, remove the nonce
|
||||
if (!(procedures.containsKey(procId) || completed.containsKey(procId))) {
|
||||
|
@ -1295,8 +1297,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
if (procId < 0) {
|
||||
while (!lastProcId.compareAndSet(procId, 0)) {
|
||||
procId = lastProcId.get();
|
||||
if (procId >= 0)
|
||||
if (procId >= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (procedures.containsKey(procId)) {
|
||||
procId = lastProcId.incrementAndGet();
|
||||
|
@ -2003,7 +2006,6 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// A worker thread which can be added when core workers are stuck. Will timeout after
|
||||
// keepAliveTime if there is no procedure to run.
|
||||
private final class KeepAliveWorkerThread extends WorkerThread {
|
||||
|
||||
public KeepAliveWorkerThread(ThreadGroup group) {
|
||||
super(group, "KeepAlivePEWorker-");
|
||||
}
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.util.Iterator;
|
||||
|
@ -117,7 +116,8 @@ public interface ProcedureScheduler {
|
|||
List<LockedResource> getLocks();
|
||||
|
||||
/**
|
||||
* @return {@link LockedResource} for resource of specified type & name. null if resource is not locked.
|
||||
* @return {@link LockedResource} for resource of specified type & name. null if resource is not
|
||||
* locked.
|
||||
*/
|
||||
LockedResource getLockResource(LockedResourceType resourceType, String resourceName);
|
||||
|
||||
|
|
|
@ -23,7 +23,9 @@ import java.lang.reflect.Constructor;
|
|||
import java.lang.reflect.Modifier;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.util.NonceKey;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Any;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
|
||||
|
@ -31,9 +33,9 @@ import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferExce
|
|||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.util.NonceKey;
|
||||
|
||||
/**
|
||||
* Helper to convert to/from ProcedureProtos
|
||||
|
|
|
@ -155,7 +155,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
|||
* @param key the node identifier
|
||||
*/
|
||||
public void addOperationToNode(final TRemote key, RemoteProcedure rp)
|
||||
throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
|
||||
throws NullTargetServerDispatchException, NoServerDispatchException,
|
||||
NoNodeDispatchException {
|
||||
if (key == null) {
|
||||
throw new NullTargetServerDispatchException(rp.toString());
|
||||
}
|
||||
|
@ -188,7 +189,10 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
|||
*/
|
||||
public boolean removeNode(final TRemote key) {
|
||||
final BufferNode node = nodeMap.remove(key);
|
||||
if (node == null) return false;
|
||||
if (node == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
node.abortOperationsInQueue();
|
||||
return true;
|
||||
}
|
||||
|
@ -256,7 +260,6 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
|||
default boolean storeInDispatchedQueue() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.procedure2;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage;
|
||||
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
||||
|
||||
/**
|
||||
* A RemoteProcedureException is an exception from another thread or process.
|
||||
|
@ -40,7 +41,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
|||
@InterfaceStability.Evolving
|
||||
@SuppressWarnings("serial")
|
||||
public class RemoteProcedureException extends ProcedureException {
|
||||
|
||||
/**
|
||||
* Name of the throwable's source such as a host or thread name. Must be non-null.
|
||||
*/
|
||||
|
@ -49,8 +49,8 @@ public class RemoteProcedureException extends ProcedureException {
|
|||
/**
|
||||
* Create a new RemoteProcedureException that can be serialized.
|
||||
* It is assumed that this came form a local source.
|
||||
* @param source
|
||||
* @param cause
|
||||
* @param source the host or thread name of the source
|
||||
* @param cause the actual cause of the exception
|
||||
*/
|
||||
public RemoteProcedureException(String source, Throwable cause) {
|
||||
super(cause);
|
||||
|
@ -104,9 +104,9 @@ public class RemoteProcedureException extends ProcedureException {
|
|||
|
||||
/**
|
||||
* Takes a series of bytes and tries to generate an RemoteProcedureException instance for it.
|
||||
* @param bytes
|
||||
* @param bytes the bytes to generate the {@link RemoteProcedureException} from
|
||||
* @return the ForeignExcpetion instance
|
||||
* @throws InvalidProtocolBufferException if there was deserialization problem this is thrown.
|
||||
* @throws IOException if there was deserialization problem this is thrown.
|
||||
*/
|
||||
public static RemoteProcedureException deserialize(byte[] bytes) throws IOException {
|
||||
return fromProto(ForeignExceptionMessage.parseFrom(bytes));
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -43,7 +42,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
class RootProcedureState<TEnvironment> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RootProcedureState.class);
|
||||
|
||||
private enum State {
|
||||
|
@ -181,7 +179,9 @@ class RootProcedureState<TEnvironment> {
|
|||
int diff = (1 + stackIndexes[stackIndexes.length - 1]) - subprocStack.size();
|
||||
if (diff > 0) {
|
||||
subprocStack.ensureCapacity(1 + stackIndexes[stackIndexes.length - 1]);
|
||||
while (diff-- > 0) subprocStack.add(null);
|
||||
while (diff-- > 0) {
|
||||
subprocStack.add(null);
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < stackIndexes.length; ++i) {
|
||||
subprocStack.set(stackIndexes[i], proc);
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -87,7 +86,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
* Flow.HAS_MORE_STATE if there is another step.
|
||||
*/
|
||||
protected abstract Flow executeFromState(TEnvironment env, TState state)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException;
|
||||
|
||||
/**
|
||||
* called to perform the rollback of the specified state
|
||||
|
@ -156,19 +155,25 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
}
|
||||
for (int i = 0; i < len; ++i) {
|
||||
Procedure<TEnvironment> proc = subProcedure[i];
|
||||
if (!proc.hasOwner()) proc.setOwner(getOwner());
|
||||
if (!proc.hasOwner()) {
|
||||
proc.setOwner(getOwner());
|
||||
}
|
||||
|
||||
subProcList.add(proc);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(final TEnvironment env)
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
|
||||
updateTimestamp();
|
||||
try {
|
||||
failIfAborted();
|
||||
|
||||
if (!hasMoreState() || isFailed()) return null;
|
||||
if (!hasMoreState() || isFailed()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
TState state = getCurrentState();
|
||||
if (stateCount == 0) {
|
||||
setNextState(getStateId(state));
|
||||
|
@ -187,7 +192,10 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
|
||||
LOG.trace("{}", this);
|
||||
stateFlow = executeFromState(env, state);
|
||||
if (!hasMoreState()) setNextState(EOF_STATE);
|
||||
if (!hasMoreState()) {
|
||||
setNextState(EOF_STATE);
|
||||
}
|
||||
|
||||
if (subProcList != null && !subProcList.isEmpty()) {
|
||||
Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]);
|
||||
subProcList = null;
|
||||
|
@ -202,7 +210,10 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
@Override
|
||||
protected void rollback(final TEnvironment env)
|
||||
throws IOException, InterruptedException {
|
||||
if (isEofState()) stateCount--;
|
||||
if (isEofState()) {
|
||||
stateCount--;
|
||||
}
|
||||
|
||||
try {
|
||||
updateTimestamp();
|
||||
rollbackState(env, getCurrentState());
|
||||
|
|
|
@ -15,14 +15,13 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2.store;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
|
||||
/**
|
||||
* The ProcedureStore is used by the executor to persist the state of each procedure execution.
|
||||
|
@ -149,7 +148,7 @@ public interface ProcedureStore {
|
|||
|
||||
/**
|
||||
* Start/Open the procedure store
|
||||
* @param numThreads
|
||||
* @param numThreads number of threads to be used by the procedure store
|
||||
*/
|
||||
void start(int numThreads) throws IOException;
|
||||
|
||||
|
|
|
@ -26,7 +26,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class ProcedureStoreBase implements ProcedureStore {
|
||||
private final CopyOnWriteArrayList<ProcedureStoreListener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final CopyOnWriteArrayList<ProcedureStoreListener> listeners =
|
||||
new CopyOnWriteArrayList<>();
|
||||
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2.store;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -40,7 +39,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ProcedureStoreTracker {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
|
||||
|
||||
// Key is procedure id corresponding to first bit of the bitmap.
|
||||
|
@ -68,7 +66,8 @@ public class ProcedureStoreTracker {
|
|||
|
||||
public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
|
||||
reset();
|
||||
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
|
||||
for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode :
|
||||
trackerProtoBuf.getNodeList()) {
|
||||
final BitSetNode node = new BitSetNode(protoNode);
|
||||
map.put(node.getStart(), node);
|
||||
}
|
||||
|
@ -252,7 +251,10 @@ public class ProcedureStoreTracker {
|
|||
* @return the node that may contains the procId or null
|
||||
*/
|
||||
private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
|
||||
if (node != null && node.contains(procId)) return node;
|
||||
if (node != null && node.contains(procId)) {
|
||||
return node;
|
||||
}
|
||||
|
||||
final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
|
||||
return entry != null ? entry.getValue() : null;
|
||||
}
|
||||
|
|
|
@ -111,7 +111,10 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
|
|||
}
|
||||
|
||||
public void close() {
|
||||
if (stream == null) return;
|
||||
if (stream == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
stream.close();
|
||||
} catch (IOException e) {
|
||||
|
@ -192,8 +195,14 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (!(o instanceof ProcedureWALFile)) return false;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!(o instanceof ProcedureWALFile)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return compareTo((ProcedureWALFile)o) == 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -659,7 +659,10 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
|
||||
@Override
|
||||
public void delete(final long[] procIds, final int offset, final int count) {
|
||||
if (count == 0) return;
|
||||
if (count == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (offset == 0 && count == procIds.length) {
|
||||
delete(procIds);
|
||||
} else if (count == 1) {
|
||||
|
@ -946,7 +949,9 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
|
||||
private boolean rollWriterWithRetries() {
|
||||
for (int i = 0; i < rollRetries && isRunning(); ++i) {
|
||||
if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
|
||||
if (i > 0) {
|
||||
Threads.sleepWithoutInterrupt(waitBeforeRoll * i);
|
||||
}
|
||||
|
||||
try {
|
||||
if (rollWriter()) {
|
||||
|
@ -1084,7 +1089,7 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
// to provide.
|
||||
final String durability = useHsync ? "hsync" : "hflush";
|
||||
if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) {
|
||||
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
|
||||
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
|
||||
" for proper operation during component failures, but the underlying filesystem does " +
|
||||
"not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
|
||||
"' to set the desired level of robustness and ensure the config value of '" +
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2.util;
|
||||
|
||||
import java.util.Objects;
|
||||
|
@ -23,9 +22,9 @@ import java.util.concurrent.DelayQueue;
|
|||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
// FIX namings. TODO.
|
||||
@InterfaceAudience.Private
|
||||
|
@ -138,8 +137,14 @@ public final class DelayedUtil {
|
|||
|
||||
@Override
|
||||
public boolean equals(final Object other) {
|
||||
if (other == this) return true;
|
||||
if (!(other instanceof DelayedContainer)) return false;
|
||||
if (other == this) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!(other instanceof DelayedContainer)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return Objects.equals(getObject(), ((DelayedContainer)other).getObject());
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2.util;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -56,10 +55,22 @@ public final class StringUtils {
|
|||
}
|
||||
|
||||
public static String humanSize(double size) {
|
||||
if (size >= (1L << 40)) return String.format("%.1fT", size / (1L << 40));
|
||||
if (size >= (1L << 30)) return String.format("%.1fG", size / (1L << 30));
|
||||
if (size >= (1L << 20)) return String.format("%.1fM", size / (1L << 20));
|
||||
if (size >= (1L << 10)) return String.format("%.1fK", size / (1L << 10));
|
||||
if (size >= (1L << 40)) {
|
||||
return String.format("%.1fT", size / (1L << 40));
|
||||
}
|
||||
|
||||
if (size >= (1L << 30)) {
|
||||
return String.format("%.1fG", size / (1L << 30));
|
||||
}
|
||||
|
||||
if (size >= (1L << 20)) {
|
||||
return String.format("%.1fM", size / (1L << 20));
|
||||
}
|
||||
|
||||
if (size >= (1L << 10)) {
|
||||
return String.format("%.1fK", size / (1L << 10));
|
||||
}
|
||||
|
||||
return String.format("%.0f", size);
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -46,7 +45,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.BytesValue;
|
|||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
|
||||
public class ProcedureTestingUtility {
|
||||
public final class ProcedureTestingUtility {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestingUtility.class);
|
||||
|
||||
private ProcedureTestingUtility() {
|
||||
|
@ -219,7 +218,10 @@ public class ProcedureTestingUtility {
|
|||
|
||||
private static <TEnv> void assertSingleExecutorForKillTests(
|
||||
final ProcedureExecutor<TEnv> procExecutor) {
|
||||
if (procExecutor.testing == null) return;
|
||||
if (procExecutor.testing == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (procExecutor.testing.killBeforeStoreUpdate ||
|
||||
procExecutor.testing.toggleKillBeforeStoreUpdate) {
|
||||
assertEquals("expected only one executor running during test with kill/restart",
|
||||
|
@ -412,7 +414,9 @@ public class ProcedureTestingUtility {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(TEnv env) { return false; }
|
||||
protected boolean abort(TEnv env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureExecution {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureExecution.class);
|
||||
|
@ -223,7 +222,9 @@ public class TestProcedureExecution {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(Void env) { return false; }
|
||||
protected boolean abort(Void env) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureMetrics {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureMetrics.class);
|
||||
|
@ -61,7 +60,6 @@ public class TestProcedureMetrics {
|
|||
private static int successCount = 0;
|
||||
private static int failedCount = 0;
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
htu = new HBaseCommonTestingUtility();
|
||||
|
@ -237,7 +235,6 @@ public class TestProcedureMetrics {
|
|||
|
||||
@Override
|
||||
protected void rollback(TestProcEnv env) throws IOException, InterruptedException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -246,14 +243,12 @@ public class TestProcedureMetrics {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void updateMetricsOnFinish(final TestProcEnv env, final long time,
|
||||
boolean success) {
|
||||
protected void updateMetricsOnFinish(final TestProcEnv env, final long time, boolean success) {
|
||||
if (success) {
|
||||
successCount++;
|
||||
} else {
|
||||
failedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureNonce {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureNonce.class);
|
||||
|
@ -125,7 +124,9 @@ public class TestProcedureNonce {
|
|||
TestSingleStepProcedure proc = new TestSingleStepProcedure();
|
||||
procEnv.setWaitLatch(latch);
|
||||
long procId = procExecutor.submitProcedure(proc, nonceKey);
|
||||
while (proc.step != 1) Threads.sleep(25);
|
||||
while (proc.step != 1) {
|
||||
Threads.sleep(25);
|
||||
}
|
||||
|
||||
// try to register a procedure with the same nonce
|
||||
// we should get back the old procId
|
||||
|
@ -239,8 +240,14 @@ public class TestProcedureNonce {
|
|||
}
|
||||
};
|
||||
|
||||
for (int i = 0; i < threads.length; ++i) threads[i].start();
|
||||
for (int i = 0; i < threads.length; ++i) Threads.shutdown(threads[i]);
|
||||
for (int i = 0; i < threads.length; ++i) {
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
for (int i = 0; i < threads.length; ++i) {
|
||||
Threads.shutdown(threads[i]);
|
||||
}
|
||||
|
||||
ProcedureTestingUtility.waitNoProcedureRunning(procExecutor);
|
||||
assertEquals(null, t1Exception.get());
|
||||
assertEquals(null, t2Exception.get());
|
||||
|
@ -265,7 +272,9 @@ public class TestProcedureNonce {
|
|||
protected void rollback(TestProcEnv env) { }
|
||||
|
||||
@Override
|
||||
protected boolean abort(TestProcEnv env) { return true; }
|
||||
protected boolean abort(TestProcEnv env) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestProcEnv {
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Int32Value;
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureRecovery {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureRecovery.class);
|
||||
|
@ -114,7 +113,9 @@ public class TestProcedureRecovery {
|
|||
protected void rollback(TestProcEnv env) { }
|
||||
|
||||
@Override
|
||||
protected boolean abort(TestProcEnv env) { return true; }
|
||||
protected boolean abort(TestProcEnv env) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class BaseTestStepProcedure extends SequentialProcedure<TestProcEnv> {
|
||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
|
|||
@Ignore
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestProcedureReplayOrder {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureReplayOrder.class);
|
||||
|
@ -202,7 +201,9 @@ public class TestProcedureReplayOrder {
|
|||
protected void rollback(TestProcedureEnv env) { }
|
||||
|
||||
@Override
|
||||
protected boolean abort(TestProcedureEnv env) { return true; }
|
||||
protected boolean abort(TestProcedureEnv env) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||
|
|
|
@ -17,10 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentSkipListSet;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -39,7 +35,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestProcedureSchedulerConcurrency {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureSchedulerConcurrency.class);
|
||||
|
@ -124,7 +119,9 @@ public class TestProcedureSchedulerConcurrency {
|
|||
public void run() {
|
||||
while (true) {
|
||||
TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
|
||||
if (proc == null) continue;
|
||||
if (proc == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
proc.getEvent().suspend();
|
||||
waitQueue.add(proc);
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestProcedureSuspended {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureSuspended.class);
|
||||
|
@ -136,7 +135,10 @@ public class TestProcedureSuspended {
|
|||
procExecutor.submitProcedure(p2);
|
||||
|
||||
// try to execute a bunch of yield on p1, p2 should be blocked
|
||||
while (p1.getTimestamps().size() < 100) Threads.sleep(10);
|
||||
while (p1.getTimestamps().size() < 100) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
|
||||
assertEquals(0, p2.getTimestamps().size());
|
||||
|
||||
// wait until p1 is completed
|
||||
|
@ -144,7 +146,10 @@ public class TestProcedureSuspended {
|
|||
ProcedureTestingUtility.waitProcedure(procExecutor, p1);
|
||||
|
||||
// try to execute a bunch of yield on p2
|
||||
while (p2.getTimestamps().size() < 100) Threads.sleep(10);
|
||||
while (p2.getTimestamps().size() < 100) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
|
||||
assertEquals(p1.getTimestamps().get(p1.getTimestamps().size() - 1).longValue() + 1,
|
||||
p2.getTimestamps().get(0).longValue());
|
||||
|
||||
|
@ -155,7 +160,10 @@ public class TestProcedureSuspended {
|
|||
|
||||
private void waitAndAssertTimestamp(TestLockProcedure proc, int size, int lastTs) {
|
||||
final ArrayList<Long> timestamps = proc.getTimestamps();
|
||||
while (timestamps.size() < size) Threads.sleep(10);
|
||||
while (timestamps.size() < size) {
|
||||
Threads.sleep(10);
|
||||
}
|
||||
|
||||
LOG.info(proc + " -> " + timestamps);
|
||||
assertEquals(size, timestamps.size());
|
||||
if (size > 0) {
|
||||
|
@ -216,7 +224,8 @@ public class TestProcedureSuspended {
|
|||
|
||||
@Override
|
||||
protected LockState acquireLock(final TestProcEnv env) {
|
||||
if ((hasLock = lock.compareAndSet(false, true))) {
|
||||
hasLock = lock.compareAndSet(false, true);
|
||||
if (hasLock) {
|
||||
LOG.info("ACQUIRE LOCK " + this + " " + (hasLock));
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
}
|
||||
|
@ -245,7 +254,9 @@ public class TestProcedureSuspended {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected boolean abort(TestProcEnv env) { return false; }
|
||||
protected boolean abort(TestProcEnv env) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serializeStateData(ProcedureStateSerializer serializer)
|
||||
|
|
|
@ -87,11 +87,9 @@ public class TestProcedureToString {
|
|||
|
||||
/**
|
||||
* Test that I can override the toString for its state value.
|
||||
* @throws ProcedureYieldException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testBasicToString() throws ProcedureYieldException, InterruptedException {
|
||||
public void testBasicToString() {
|
||||
BasicProcedure p = new BasicProcedure();
|
||||
ProcedureState state = ProcedureState.RUNNABLE;
|
||||
p.setState(state);
|
||||
|
@ -108,10 +106,11 @@ public class TestProcedureToString {
|
|||
* Do-nothing SimpleMachineProcedure for checking its toString.
|
||||
*/
|
||||
static class SimpleStateMachineProcedure
|
||||
extends StateMachineProcedure<BasicProcedureEnv, ServerCrashState> {
|
||||
extends StateMachineProcedure<BasicProcedureEnv, ServerCrashState> {
|
||||
@Override
|
||||
protected org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow executeFromState(BasicProcedureEnv env,
|
||||
ServerCrashState state) throws ProcedureYieldException, InterruptedException {
|
||||
protected org.apache.hadoop.hbase.procedure2.StateMachineProcedure.Flow executeFromState(
|
||||
BasicProcedureEnv env, ServerCrashState state)
|
||||
throws ProcedureYieldException, InterruptedException {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestStateMachineProcedure {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestStateMachineProcedure.class);
|
||||
|
@ -53,8 +52,14 @@ public class TestStateMachineProcedure {
|
|||
|
||||
@Override
|
||||
public boolean equals(final Object other) {
|
||||
if (this == other) return true;
|
||||
if (!(other instanceof Exception)) return false;
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!(other instanceof Exception)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// we are going to serialize the exception in the test,
|
||||
// so the instance comparison will not match
|
||||
return getMessage().equals(((Exception)other).getMessage());
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestYieldProcedures {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestYieldProcedures.class);
|
||||
|
@ -203,9 +202,17 @@ public class TestYieldProcedures {
|
|||
this.rollback = isRollback;
|
||||
}
|
||||
|
||||
public State getStep() { return step; }
|
||||
public long getTimestamp() { return timestamp; }
|
||||
public boolean isRollback() { return rollback; }
|
||||
public State getStep() {
|
||||
return step;
|
||||
}
|
||||
|
||||
public long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
public boolean isRollback() {
|
||||
return rollback;
|
||||
}
|
||||
}
|
||||
|
||||
private final ArrayList<ExecutionInfo> executionInfo = new ArrayList<>();
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
@Category({MasterTests.class, MediumTests.class})
|
||||
public class TestProcedureStoreTracker {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestProcedureStoreTracker.class);
|
||||
|
@ -161,7 +160,9 @@ public class TestProcedureStoreTracker {
|
|||
int count = 0;
|
||||
while (count < NPROCEDURES) {
|
||||
long procId = rand.nextLong();
|
||||
if (procId < 1) continue;
|
||||
if (procId < 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
tracker.setDeleted(procId, i % 2 == 0);
|
||||
count++;
|
||||
|
|
|
@ -15,9 +15,10 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||
|
||||
import static java.lang.System.currentTimeMillis;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -39,8 +40,6 @@ import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
|||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
|
||||
|
||||
import static java.lang.System.currentTimeMillis;
|
||||
|
||||
public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
||||
protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
|
||||
|
||||
|
@ -135,8 +134,8 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
|||
|
||||
/**
|
||||
* @return a list of shuffled integers which represent state of proc id. First occurrence of a
|
||||
* number denotes insert state, consecutive occurrences denote update states, and -ve value
|
||||
* denotes delete state.
|
||||
* number denotes insert state, consecutive occurrences denote update states, and -ve
|
||||
* value denotes delete state.
|
||||
*/
|
||||
private List<Integer> shuffleProcWriteSequence() {
|
||||
Random rand = new Random();
|
||||
|
@ -207,11 +206,11 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
|||
System.out.println("Load time : " + (timeTaken / 1000.0f) + "sec");
|
||||
System.out.println("******************************************");
|
||||
System.out.println("Raw format for scripts");
|
||||
System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
|
||||
System.out.println(String.format("RESULT [%s=%s, %s=%s, %s=%s, %s=%s, %s=%s, "
|
||||
+ "total_time_ms=%s]",
|
||||
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length,
|
||||
UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(),
|
||||
deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
|
||||
NUM_PROCS_OPTION.getOpt(), numProcs, STATE_SIZE_OPTION.getOpt(), serializedState.length,
|
||||
UPDATES_PER_PROC_OPTION.getOpt(), updatesPerProc, DELETE_PROCS_FRACTION_OPTION.getOpt(),
|
||||
deleteProcsFraction, NUM_WALS_OPTION.getOpt(), numWals, timeTaken));
|
||||
}
|
||||
|
||||
public void tearDownProcedureStore() {
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.procedure2.store.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -27,12 +26,13 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.fs.*;
|
||||
import org.apache.hadoop.conf.*;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.util.*;
|
||||
|
||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||
|
||||
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
|
||||
|
|
|
@ -113,7 +113,9 @@ public class TestStressWALProcedureStore {
|
|||
procStore.insert(proc, null);
|
||||
// Update
|
||||
for (int i = 0, nupdates = rand.nextInt(10); i <= nupdates; ++i) {
|
||||
try { Thread.sleep(0, rand.nextInt(15)); } catch (InterruptedException e) {}
|
||||
try {
|
||||
Thread.sleep(0, rand.nextInt(15));
|
||||
} catch (InterruptedException e) {}
|
||||
procStore.update(proc);
|
||||
}
|
||||
// Delete
|
||||
|
|
|
@ -62,7 +62,6 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Int64Value;
|
|||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
public class TestWALProcedureStore {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestWALProcedureStore.class);
|
||||
|
@ -628,19 +627,19 @@ public class TestWALProcedureStore {
|
|||
|
||||
// simulate another active master removing the wals
|
||||
procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
|
||||
new WALProcedureStore.LeaseRecovery() {
|
||||
private int count = 0;
|
||||
new WALProcedureStore.LeaseRecovery() {
|
||||
private int count = 0;
|
||||
|
||||
@Override
|
||||
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
|
||||
if (++count <= 2) {
|
||||
fs.delete(path, false);
|
||||
LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
|
||||
throw new FileNotFoundException("test file not found " + path);
|
||||
@Override
|
||||
public void recoverFileLease(FileSystem fs, Path path) throws IOException {
|
||||
if (++count <= 2) {
|
||||
fs.delete(path, false);
|
||||
LOG.debug("Simulate FileNotFound at count=" + count + " for " + path);
|
||||
throw new FileNotFoundException("test file not found " + path);
|
||||
}
|
||||
LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
|
||||
}
|
||||
LOG.debug("Simulate recoverFileLease() at count=" + count + " for " + path);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
final LoadCounter loader = new LoadCounter();
|
||||
procStore.start(PROCEDURE_STORE_SLOTS);
|
||||
|
|
Loading…
Reference in New Issue