HBASE-19809 Fix findbugs and error-prone warnings in hbase-procedure (branch-2)
This commit is contained in:
parent
b4f6ae86b6
commit
0561312bc4
|
@ -331,10 +331,12 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
super(key, 0);
|
super(key, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public TRemote getKey() {
|
public TRemote getKey() {
|
||||||
return getObject();
|
return getObject();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized void add(final RemoteProcedure operation) {
|
public synchronized void add(final RemoteProcedure operation) {
|
||||||
if (this.operations == null) {
|
if (this.operations == null) {
|
||||||
this.operations = new HashSet<>();
|
this.operations = new HashSet<>();
|
||||||
|
@ -348,6 +350,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public synchronized void dispatch() {
|
public synchronized void dispatch() {
|
||||||
if (operations != null) {
|
if (operations != null) {
|
||||||
remoteDispatch(getKey(), operations);
|
remoteDispatch(getKey(), operations);
|
||||||
|
|
|
@ -149,6 +149,7 @@ public class ProcedureWALPrettyPrinter extends Configured implements Tool {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* Thrown upon file system errors etc.
|
* Thrown upon file system errors etc.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public int run(final String[] args) throws IOException {
|
public int run(final String[] args) throws IOException {
|
||||||
// create options
|
// create options
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
|
|
|
@ -81,11 +81,13 @@ public class ByteSlot extends OutputStream {
|
||||||
buf[offset] = (byte)b;
|
buf[offset] = (byte)b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void write(int b) {
|
public void write(int b) {
|
||||||
ensureCapacity(size + 1);
|
ensureCapacity(size + 1);
|
||||||
buf[size++] = (byte)b;
|
buf[size++] = (byte)b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void write(byte[] b, int off, int len) {
|
public void write(byte[] b, int off, int len) {
|
||||||
ensureCapacity(size + len);
|
ensureCapacity(size + len);
|
||||||
System.arraycopy(b, off, buf, size, len);
|
System.arraycopy(b, off, buf, size, len);
|
||||||
|
|
|
@ -160,7 +160,7 @@ public class TestProcedureEvents {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean setTimeoutFailure(final TestProcEnv env) {
|
protected synchronized boolean setTimeoutFailure(final TestProcEnv env) {
|
||||||
int n = ntimeouts.incrementAndGet();
|
int n = ntimeouts.incrementAndGet();
|
||||||
LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
|
LOG.info("HANDLE TIMEOUT " + this + " ntimeouts=" + n);
|
||||||
setState(ProcedureState.RUNNABLE);
|
setState(ProcedureState.RUNNABLE);
|
||||||
|
|
|
@ -186,5 +186,5 @@ public class TestProcedureExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class TestProcEnv { }
|
private static class TestProcEnv { }
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,17 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -33,10 +36,6 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
@Category({MasterTests.class, SmallTests.class})
|
@Category({MasterTests.class, SmallTests.class})
|
||||||
public class TestProcedureInMemoryChore {
|
public class TestProcedureInMemoryChore {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestProcedureInMemoryChore.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestProcedureInMemoryChore.class);
|
||||||
|
@ -49,6 +48,7 @@ public class TestProcedureInMemoryChore {
|
||||||
|
|
||||||
private HBaseCommonTestingUtility htu;
|
private HBaseCommonTestingUtility htu;
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
htu = new HBaseCommonTestingUtility();
|
htu = new HBaseCommonTestingUtility();
|
||||||
|
|
|
@ -132,8 +132,9 @@ public class TestProcedureReplayOrder {
|
||||||
public void run() {
|
public void run() {
|
||||||
for (int i = 0; i < nprocPerThread; ++i) {
|
for (int i = 0; i < nprocPerThread; ++i) {
|
||||||
try {
|
try {
|
||||||
procExecutor.submitProcedure((Procedure)procClazz.newInstance());
|
procExecutor.submitProcedure((Procedure)
|
||||||
} catch (InstantiationException|IllegalAccessException e) {
|
procClazz.getDeclaredConstructor().newInstance());
|
||||||
|
} catch (Exception e) {
|
||||||
LOG.error("unable to instantiate the procedure", e);
|
LOG.error("unable to instantiate the procedure", e);
|
||||||
fail("failure during the proc.newInstance(): " + e.getMessage());
|
fail("failure during the proc.newInstance(): " + e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,6 +152,7 @@ public class TestStateMachineProcedure {
|
||||||
public enum TestSMProcedureState { STEP_1, STEP_2 };
|
public enum TestSMProcedureState { STEP_1, STEP_2 };
|
||||||
public static class TestSMProcedure
|
public static class TestSMProcedure
|
||||||
extends StateMachineProcedure<TestProcEnv, TestSMProcedureState> {
|
extends StateMachineProcedure<TestProcEnv, TestSMProcedureState> {
|
||||||
|
@Override
|
||||||
protected Flow executeFromState(TestProcEnv env, TestSMProcedureState state) {
|
protected Flow executeFromState(TestProcEnv env, TestSMProcedureState state) {
|
||||||
LOG.info("EXEC " + state + " " + this);
|
LOG.info("EXEC " + state + " " + this);
|
||||||
env.execCount.incrementAndGet();
|
env.execCount.incrementAndGet();
|
||||||
|
@ -168,25 +169,30 @@ public class TestStateMachineProcedure {
|
||||||
return Flow.HAS_MORE_STATE;
|
return Flow.HAS_MORE_STATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected void rollbackState(TestProcEnv env, TestSMProcedureState state) {
|
protected void rollbackState(TestProcEnv env, TestSMProcedureState state) {
|
||||||
LOG.info("ROLLBACK " + state + " " + this);
|
LOG.info("ROLLBACK " + state + " " + this);
|
||||||
env.rollbackCount.incrementAndGet();
|
env.rollbackCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected TestSMProcedureState getState(int stateId) {
|
protected TestSMProcedureState getState(int stateId) {
|
||||||
return TestSMProcedureState.values()[stateId];
|
return TestSMProcedureState.values()[stateId];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected int getStateId(TestSMProcedureState state) {
|
protected int getStateId(TestSMProcedureState state) {
|
||||||
return state.ordinal();
|
return state.ordinal();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
protected TestSMProcedureState getInitialState() {
|
protected TestSMProcedureState getInitialState() {
|
||||||
return TestSMProcedureState.STEP_1;
|
return TestSMProcedureState.STEP_1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SimpleChildProcedure extends NoopProcedure<TestProcEnv> {
|
public static class SimpleChildProcedure extends NoopProcedure<TestProcEnv> {
|
||||||
|
@Override
|
||||||
protected Procedure[] execute(TestProcEnv env) {
|
protected Procedure[] execute(TestProcEnv env) {
|
||||||
LOG.info("EXEC " + this);
|
LOG.info("EXEC " + this);
|
||||||
env.execCount.incrementAndGet();
|
env.execCount.incrementAndGet();
|
||||||
|
@ -203,7 +209,7 @@ public class TestStateMachineProcedure {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TestProcEnv {
|
public static class TestProcEnv {
|
||||||
AtomicInteger execCount = new AtomicInteger(0);
|
AtomicInteger execCount = new AtomicInteger(0);
|
||||||
AtomicInteger rollbackCount = new AtomicInteger(0);
|
AtomicInteger rollbackCount = new AtomicInteger(0);
|
||||||
boolean triggerChildRollback = false;
|
boolean triggerChildRollback = false;
|
||||||
|
|
|
@ -186,7 +186,7 @@ public class TestYieldProcedures {
|
||||||
extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
|
extends StateMachineProcedure<TestProcEnv, TestStateMachineProcedure.State> {
|
||||||
enum State { STATE_1, STATE_2, STATE_3 }
|
enum State { STATE_1, STATE_2, STATE_3 }
|
||||||
|
|
||||||
public class ExecutionInfo {
|
public static class ExecutionInfo {
|
||||||
private final boolean rollback;
|
private final boolean rollback;
|
||||||
private final long timestamp;
|
private final long timestamp;
|
||||||
private final State step;
|
private final State step;
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
||||||
private WALProcedureStore store;
|
private WALProcedureStore store;
|
||||||
static byte[] serializedState;
|
static byte[] serializedState;
|
||||||
|
|
||||||
private class LoadCounter implements ProcedureStore.ProcedureLoader {
|
private static class LoadCounter implements ProcedureStore.ProcedureLoader {
|
||||||
public LoadCounter() {}
|
public LoadCounter() {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -165,8 +165,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
|
||||||
private void writeWals() throws IOException {
|
private void writeWals() throws IOException {
|
||||||
List<Integer> procStates = shuffleProcWriteSequence();
|
List<Integer> procStates = shuffleProcWriteSequence();
|
||||||
TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used.
|
TestProcedure[] procs = new TestProcedure[numProcs + 1]; // 0 is not used.
|
||||||
int numProcsPerWal = numWals > 0 ? (int)Math.ceil(procStates.size() / numWals)
|
int numProcsPerWal = numWals > 0 ? procStates.size() / numWals : Integer.MAX_VALUE;
|
||||||
: Integer.MAX_VALUE;
|
|
||||||
long startTime = currentTimeMillis();
|
long startTime = currentTimeMillis();
|
||||||
long lastTime = startTime;
|
long lastTime = startTime;
|
||||||
for (int i = 0; i < procStates.size(); ++i) {
|
for (int i = 0; i < procStates.size(); ++i) {
|
||||||
|
|
|
@ -243,7 +243,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class NoSyncWalProcedureStore extends WALProcedureStore {
|
private static class NoSyncWalProcedureStore extends WALProcedureStore {
|
||||||
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
|
public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
|
||||||
super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
|
super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue