HBASE-20069 fix existing findbugs errors in hbase-server

This commit is contained in:
Michael Stack 2018-02-24 13:01:02 -08:00
parent 73028d5bd9
commit b11e506664
16 changed files with 147 additions and 82 deletions

View File

@ -228,6 +228,7 @@ public class EncodedDataBlock {
*/
public byte[] encodeData() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte [] baosBytes = null;
try {
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
DataOutputStream out = new DataOutputStream(baos);
@ -255,25 +256,17 @@ public class EncodedDataBlock {
kv.setSequenceId(memstoreTS);
this.dataBlockEncoder.encode(kv, encodingCtx, out);
}
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
baos.writeTo(stream);
this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, stream.ourBytes);
// Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far.
baos.flush();
baosBytes = baos.toByteArray();
this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Bug in encoding part of algorithm %s. " +
"Probably it requested more bytes than are available.",
toString()), e);
}
return baos.toByteArray();
}
private static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
private byte[] ourBytes;
@Override
public synchronized void write(byte[] b, int off, int len) {
this.ourBytes = b;
}
return baosBytes;
}
@Override

View File

@ -282,7 +282,7 @@ public class MultiByteBuff extends ByteBuff {
return ByteBufferUtils.toShort(item, offsetInItem);
}
if (items.length - 1 == itemIndex) {
// means cur item is the last one and we wont be able to read a int. Throw exception
// means cur item is the last one and we wont be able to read a short. Throw exception
throw new BufferUnderflowException();
}
ByteBuffer nextItem = items[itemIndex + 1];
@ -294,7 +294,7 @@ public class MultiByteBuff extends ByteBuff {
}
for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
l = (short) (l << 8);
l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF));
l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF));
}
return l;
}

View File

@ -43,6 +43,25 @@ public class TestMultiByteBuff {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMultiByteBuff.class);
/**
* Test right answer though we span many sub-buffers.
*/
@Test
public void testGetShort() {
ByteBuffer bb1 = ByteBuffer.allocate(1);
bb1.put((byte)1);
ByteBuffer bb2 = ByteBuffer.allocate(1);
bb2.put((byte)0);
ByteBuffer bb3 = ByteBuffer.allocate(1);
bb3.put((byte)2);
ByteBuffer bb4 = ByteBuffer.allocate(1);
bb4.put((byte)3);
MultiByteBuff mbb = new MultiByteBuff(bb1, bb2, bb3, bb4);
assertEquals(256, mbb.getShortAfterPosition(0));
assertEquals(2, mbb.getShortAfterPosition(1));
assertEquals(515, mbb.getShortAfterPosition(2));
}
@Test
public void testWritesAndReads() {
// Absolute reads

View File

@ -206,7 +206,6 @@ public class ProcedureExecutor<TEnvironment> {
final long now = EnvironmentEdgeManager.currentTime();
final Iterator<Map.Entry<Long, CompletedProcedureRetainer>> it = completed.entrySet().iterator();
final boolean debugEnabled = LOG.isDebugEnabled();
while (it.hasNext() && store.isRunning()) {
final Map.Entry<Long, CompletedProcedureRetainer> entry = it.next();
final CompletedProcedureRetainer retainer = entry.getValue();

View File

@ -209,7 +209,6 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
@Override
protected boolean abort(final TEnvironment env) {
final TState state = getCurrentState();
LOG.debug("Abort requested for {}", this);
if (hasMoreState()) {
aborted.set(true);

View File

@ -114,6 +114,9 @@ public abstract class RpcServer implements RpcServerInterface,
+ Server.class.getName());
protected SecretManager<TokenIdentifier> secretManager;
protected final Map<String, String> saslProps;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
justification="Start is synchronized so authManager creation is single-threaded")
protected ServiceAuthorizationManager authManager;
/** This is set to Call object before Handler invokes an RPC and ybdie

View File

@ -1200,7 +1200,6 @@ public class HMaster extends HRegionServer implements MasterServices {
private void startProcedureExecutor() throws IOException {
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
procedureStore = new WALProcedureStore(conf,
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));

View File

@ -1298,7 +1298,6 @@ public class AssignmentManager implements ServerListener {
final Set<ServerName> offlineServersWithOnlineRegions = new HashSet<>();
int size = regionStates.getRegionStateNodes().size();
final List<RegionInfo> offlineRegionsToAssign = new ArrayList<>(size);
long startTime = System.currentTimeMillis();
// If deadservers then its a failover, else, we are not sure yet.
boolean failover = deadServers;
for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) {

View File

@ -432,6 +432,10 @@ public class SplitTableRegionProcedure
}
RegionInfo parentHRI = node.getRegionInfo();
if (parentHRI == null) {
LOG.info("Unsplittable; parent region is null; node={}", node);
return false;
}
// Lookup the parent HRI state from the AM, which has the latest updated info.
// Protect against the case where concurrent SPLIT requests came in and succeeded
// just before us.
@ -457,8 +461,7 @@ public class SplitTableRegionProcedure
// we are always able to split the region
if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
LOG.warn("pid=" + getProcId() + " split switch is off! skip split of " + parentHRI);
setFailure(new IOException("Split region " +
(parentHRI == null? "null": parentHRI.getRegionNameAsString()) +
setFailure(new IOException("Split region " + parentHRI.getRegionNameAsString() +
" failed due to split switch off"));
return false;
}

View File

@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
justification="TODO: Fix. It is wonky have static pool initialized from instance")
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
implements ConfigurationObserver {
@ -67,8 +69,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
// It may be waste resources for each cleaner chore own its pool,
// so let's make pool for all cleaner chores.
private static volatile ForkJoinPool chorePool;
private static volatile int chorePoolSize;
private static volatile ForkJoinPool CHOREPOOL;
private static volatile int CHOREPOOLSIZE;
protected final FileSystem fs;
private final Path oldFileDir;
@ -102,15 +104,14 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
this.params = params;
initCleanerChain(confKey);
if (chorePool == null) {
if (CHOREPOOL == null) {
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
chorePoolSize = calculatePoolSize(poolSize);
CHOREPOOLSIZE = calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
chorePoolSize = chorePoolSize == 0 ?
calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : chorePoolSize;
this.chorePool = new ForkJoinPool(chorePoolSize);
LOG.info("Cleaner pool size is {}", chorePoolSize);
CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE;
this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE);
LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE);
}
}
@ -119,11 +120,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
* @param poolSize size from configuration
* @return size of pool after calculation
*/
int calculatePoolSize(String poolSize) {
static int calculatePoolSize(String poolSize) {
if (poolSize.matches("[1-9][0-9]*")) {
// If poolSize is an integer, return it directly,
// but upmost to the number of available processors.
int size = Math.min(Integer.valueOf(poolSize), AVAIL_PROCESSORS);
int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
if (size == AVAIL_PROCESSORS) {
LOG.warn("Use full core processors to scan dir, size={}", size);
}
@ -173,12 +174,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@Override
public void onConfigurationChange(Configuration conf) {
int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
if (updatedSize == chorePoolSize) {
if (updatedSize == CHOREPOOLSIZE) {
LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize);
return;
}
chorePoolSize = updatedSize;
if (chorePool.getPoolSize() == 0) {
CHOREPOOLSIZE = updatedSize;
if (CHOREPOOL.getPoolSize() == 0) {
// Chore does not work now, update it directly.
updateChorePoolSize(updatedSize);
return;
@ -188,9 +189,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
private void updateChorePoolSize(int updatedSize) {
chorePool.shutdownNow();
LOG.info("Update chore's pool size from {} to {}", chorePool.getParallelism(), updatedSize);
chorePool = new ForkJoinPool(updatedSize);
CHOREPOOL.shutdownNow();
LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize);
CHOREPOOL = new ForkJoinPool(updatedSize);
}
/**
@ -226,7 +227,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
// After each clean chore, checks if receives reconfigure notification while cleaning
if (reconfig.compareAndSet(true, false)) {
updateChorePoolSize(chorePoolSize);
updateChorePoolSize(CHOREPOOLSIZE);
}
} else {
LOG.debug("Cleaner chore disabled! Not cleaning.");
@ -240,7 +241,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
public Boolean runCleaner() {
preRunCleaner();
CleanerTask task = new CleanerTask(this.oldFileDir, true);
chorePool.submit(task);
CHOREPOOL.submit(task);
return task.join();
}
@ -372,7 +373,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
@VisibleForTesting
int getChorePoolSize() {
return chorePoolSize;
return CHOREPOOLSIZE;
}
/**

View File

@ -8111,13 +8111,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
justification="Intentional")
public void startRegionOperation(Operation op) throws IOException {
switch (op) {
case GET: // read operations
case SCAN:
checkReadsEnabled();
break;
default:
break;
}

View File

@ -89,6 +89,40 @@ class MemStoreFlusher implements FlushRequester {
private FlushType flushType;
/**
* Singleton instance of this class inserted into flush queue.
*/
private static final WakeupFlushThread WAKEUPFLUSH_INSTANCE = new WakeupFlushThread();
/**
* Marker class used as a token inserted into flush queue that ensures the flusher does not sleep.
* Create a single instance only.
*/
private static final class WakeupFlushThread implements FlushQueueEntry {
private WakeupFlushThread() {}
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
@Override
public int compareTo(Delayed o) {
return -1;
}
@Override
public boolean equals(Object obj) {
return obj == this;
}
@Override
public int hashCode() {
return 42;
}
}
/**
* @param conf
* @param server
@ -147,17 +181,18 @@ class MemStoreFlusher implements FlushRequester {
boolean flushedOne = false;
while (!flushedOne) {
// Find the biggest region that doesn't have too many storefiles
// (might be null!)
HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
// Find the biggest region that doesn't have too many storefiles (might be null!)
HRegion bestFlushableRegion =
getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
// Find the biggest region, total, even if it might have too many flushes.
HRegion bestAnyRegion = getBiggestMemStoreRegion(
regionsBySize, excludedRegions, false);
HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
// Find the biggest region that is a secondary region
HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize,
excludedRegions);
if (bestAnyRegion == null && bestRegionReplica == null) {
HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
if (bestAnyRegion == null) {
// If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
bestAnyRegion = bestRegionReplica;
}
if (bestAnyRegion == null) {
LOG.error("Above memory mark but there are no flushable regions!");
return false;
}
@ -169,19 +204,20 @@ class MemStoreFlusher implements FlushRequester {
case ABOVE_OFFHEAP_HIGHER_MARK:
case ABOVE_OFFHEAP_LOWER_MARK:
bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
bestFlushableRegionSize = bestFlushableRegion.getMemStoreOffHeapSize();
bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
break;
case ABOVE_ONHEAP_HIGHER_MARK:
case ABOVE_ONHEAP_LOWER_MARK:
bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
bestFlushableRegionSize = bestFlushableRegion.getMemStoreHeapSize();
bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
break;
default:
bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
bestFlushableRegionSize = bestFlushableRegion.getMemStoreDataSize();
bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
}
if (bestFlushableRegion != null &&
bestAnyRegionSize > 2 * bestFlushableRegionSize) {
if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
// Even if it's not supposed to be flushed, pick a region if it's more than twice
// as big as the best flushable one - otherwise when we're under pressure we make
// lots of little flushes and cause lots of compactions, etc, which just makes
@ -211,21 +247,22 @@ class MemStoreFlusher implements FlushRequester {
case ABOVE_OFFHEAP_HIGHER_MARK:
case ABOVE_OFFHEAP_LOWER_MARK:
regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
bestRegionReplicaSize = bestRegionReplica.getMemStoreOffHeapSize();
bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
break;
case ABOVE_ONHEAP_HIGHER_MARK:
case ABOVE_ONHEAP_LOWER_MARK:
regionToFlushSize = regionToFlush.getMemStoreHeapSize();
bestRegionReplicaSize = bestRegionReplica.getMemStoreHeapSize();
bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
break;
default:
regionToFlushSize = regionToFlush.getMemStoreDataSize();
bestRegionReplicaSize = bestRegionReplica.getMemStoreDataSize();
bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
}
Preconditions.checkState(
(regionToFlush != null && regionToFlushSize > 0) ||
(bestRegionReplica != null && bestRegionReplicaSize > 0));
(regionToFlush != null && regionToFlushSize > 0) || bestRegionReplicaSize > 0);
if (regionToFlush == null ||
(bestRegionReplica != null &&
@ -266,6 +303,27 @@ class MemStoreFlusher implements FlushRequester {
return true;
}
/**
* @return Return memstore offheap size or null if <code>r</code> is null
*/
private static long getMemStoreOffHeapSize(HRegion r) {
return r == null? 0: r.getMemStoreOffHeapSize();
}
/**
* @return Return memstore heap size or null if <code>r</code> is null
*/
private static long getMemStoreHeapSize(HRegion r) {
return r == null? 0: r.getMemStoreHeapSize();
}
/**
* @return Return memstore data size or null if <code>r</code> is null
*/
private static long getMemStoreDataSize(HRegion r) {
return r == null? 0: r.getMemStoreDataSize();
}
private class FlushHandler extends HasThread {
private FlushHandler(String name) {
@ -279,7 +337,7 @@ class MemStoreFlusher implements FlushRequester {
try {
wakeupPending.set(false); // allow someone to wake us up again
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (fqe == null || fqe instanceof WakeupFlushThread) {
if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
FlushType type = isAboveLowWaterMark();
if (type != FlushType.NORMAL) {
LOG.debug("Flush thread woke up because memory above low water="
@ -332,7 +390,7 @@ class MemStoreFlusher implements FlushRequester {
private void wakeupFlushThread() {
if (wakeupPending.compareAndSet(false, true)) {
flushQueue.add(new WakeupFlushThread());
flushQueue.add(WAKEUPFLUSH_INSTANCE);
}
}
@ -759,21 +817,6 @@ class MemStoreFlusher implements FlushRequester {
interface FlushQueueEntry extends Delayed {
}
/**
* Token to insert into the flush queue that ensures that the flusher does not sleep
*/
static class WakeupFlushThread implements FlushQueueEntry {
@Override
public long getDelay(TimeUnit unit) {
return 0;
}
@Override
public int compareTo(Delayed o) {
return -1;
}
}
/**
* Datastructure used in the flush queue. Holds region and retry count.
* Keeps tabs on how old this object is. Implements {@link Delayed}. On

View File

@ -1207,7 +1207,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
priority = createPriority();
String hostname = initialIsa.getHostName();
// Using Address means we don't get the IP too. Shorten it more even to just the host name
// w/o the domain.
String name = rs.getProcessName() + "/" +

View File

@ -1206,6 +1206,8 @@ public class RegionCoprocessorHost
* @return true or false to return to client if default processing should be bypassed,
* or null otherwise
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
justification="TODO: Fix")
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
final Delete delete) throws IOException {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -651,11 +652,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@Override
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter();
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
if (nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
}
this.fileLengthAtLastSync = nextWriter.getLength();

View File

@ -336,7 +336,12 @@ public class MajorCompactor {
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
+ parseException);
printUsage(options);
return;
}
if (commandLine == null) {
System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
printUsage(options);
return;
}
String tableName = commandLine.getOptionValue("table");
String cf = commandLine.getOptionValue("cf", null);
@ -353,7 +358,7 @@ public class MajorCompactor {
String quorum =
commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
long sleep = Long.valueOf(commandLine.getOptionValue("sleep", Long.toString(30000)));
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
configuration.set(HConstants.HBASE_DIR, rootDir);
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);