HBASE-20069 fix existing findbugs errors in hbase-server
This commit is contained in:
parent
a312705dbc
commit
44544c7db0
|
@ -228,6 +228,7 @@ public class EncodedDataBlock {
|
|||
*/
|
||||
public byte[] encodeData() {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
byte [] baosBytes = null;
|
||||
try {
|
||||
baos.write(HConstants.HFILEBLOCK_DUMMY_HEADER);
|
||||
DataOutputStream out = new DataOutputStream(baos);
|
||||
|
@ -255,25 +256,17 @@ public class EncodedDataBlock {
|
|||
kv.setSequenceId(memstoreTS);
|
||||
this.dataBlockEncoder.encode(kv, encodingCtx, out);
|
||||
}
|
||||
BufferGrabbingByteArrayOutputStream stream = new BufferGrabbingByteArrayOutputStream();
|
||||
baos.writeTo(stream);
|
||||
this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, stream.ourBytes);
|
||||
// Below depends on BAOS internal behavior. toByteArray makes a copy of bytes so far.
|
||||
baos.flush();
|
||||
baosBytes = baos.toByteArray();
|
||||
this.dataBlockEncoder.endBlockEncoding(encodingCtx, out, baosBytes);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Bug in encoding part of algorithm %s. " +
|
||||
"Probably it requested more bytes than are available.",
|
||||
toString()), e);
|
||||
}
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
private static class BufferGrabbingByteArrayOutputStream extends ByteArrayOutputStream {
|
||||
private byte[] ourBytes;
|
||||
|
||||
@Override
|
||||
public synchronized void write(byte[] b, int off, int len) {
|
||||
this.ourBytes = b;
|
||||
}
|
||||
return baosBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -282,7 +282,7 @@ public class MultiByteBuff extends ByteBuff {
|
|||
return ByteBufferUtils.toShort(item, offsetInItem);
|
||||
}
|
||||
if (items.length - 1 == itemIndex) {
|
||||
// means cur item is the last one and we wont be able to read a int. Throw exception
|
||||
// means cur item is the last one and we wont be able to read a short. Throw exception
|
||||
throw new BufferUnderflowException();
|
||||
}
|
||||
ByteBuffer nextItem = items[itemIndex + 1];
|
||||
|
@ -294,7 +294,7 @@ public class MultiByteBuff extends ByteBuff {
|
|||
}
|
||||
for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) {
|
||||
l = (short) (l << 8);
|
||||
l = (short) (l ^ (ByteBufferUtils.toByte(item, i) & 0xFF));
|
||||
l = (short) (l ^ (ByteBufferUtils.toByte(nextItem, i) & 0xFF));
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
|
|
@ -43,6 +43,25 @@ public class TestMultiByteBuff {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMultiByteBuff.class);
|
||||
|
||||
/**
|
||||
* Test right answer though we span many sub-buffers.
|
||||
*/
|
||||
@Test
|
||||
public void testGetShort() {
|
||||
ByteBuffer bb1 = ByteBuffer.allocate(1);
|
||||
bb1.put((byte)1);
|
||||
ByteBuffer bb2 = ByteBuffer.allocate(1);
|
||||
bb2.put((byte)0);
|
||||
ByteBuffer bb3 = ByteBuffer.allocate(1);
|
||||
bb3.put((byte)2);
|
||||
ByteBuffer bb4 = ByteBuffer.allocate(1);
|
||||
bb4.put((byte)3);
|
||||
MultiByteBuff mbb = new MultiByteBuff(bb1, bb2, bb3, bb4);
|
||||
assertEquals(256, mbb.getShortAfterPosition(0));
|
||||
assertEquals(2, mbb.getShortAfterPosition(1));
|
||||
assertEquals(515, mbb.getShortAfterPosition(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWritesAndReads() {
|
||||
// Absolute reads
|
||||
|
|
|
@ -206,7 +206,6 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
|
||||
final long now = EnvironmentEdgeManager.currentTime();
|
||||
final Iterator<Map.Entry<Long, CompletedProcedureRetainer>> it = completed.entrySet().iterator();
|
||||
final boolean debugEnabled = LOG.isDebugEnabled();
|
||||
while (it.hasNext() && store.isRunning()) {
|
||||
final Map.Entry<Long, CompletedProcedureRetainer> entry = it.next();
|
||||
final CompletedProcedureRetainer retainer = entry.getValue();
|
||||
|
|
|
@ -209,7 +209,6 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
|
||||
@Override
|
||||
protected boolean abort(final TEnvironment env) {
|
||||
final TState state = getCurrentState();
|
||||
LOG.debug("Abort requested for {}", this);
|
||||
if (hasMoreState()) {
|
||||
aborted.set(true);
|
||||
|
|
|
@ -114,6 +114,9 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
+ Server.class.getName());
|
||||
protected SecretManager<TokenIdentifier> secretManager;
|
||||
protected final Map<String, String> saslProps;
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="Start is synchronized so authManager creation is single-threaded")
|
||||
protected ServiceAuthorizationManager authManager;
|
||||
|
||||
/** This is set to Call object before Handler invokes an RPC and ybdie
|
||||
|
|
|
@ -171,7 +171,6 @@ import org.apache.hadoop.hbase.util.Addressing;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CompressionTest;
|
||||
import org.apache.hadoop.hbase.util.EncryptionTest;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
|
@ -1207,8 +1206,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
|
||||
private void startProcedureExecutor() throws IOException {
|
||||
final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
|
||||
final Path rootDir = FSUtils.getRootDir(conf);
|
||||
|
||||
procedureStore = new WALProcedureStore(conf,
|
||||
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
|
||||
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
|
||||
|
|
|
@ -1298,7 +1298,6 @@ public class AssignmentManager implements ServerListener {
|
|||
final Set<ServerName> offlineServersWithOnlineRegions = new HashSet<>();
|
||||
int size = regionStates.getRegionStateNodes().size();
|
||||
final List<RegionInfo> offlineRegionsToAssign = new ArrayList<>(size);
|
||||
long startTime = System.currentTimeMillis();
|
||||
// If deadservers then its a failover, else, we are not sure yet.
|
||||
boolean failover = deadServers;
|
||||
for (RegionStateNode regionNode: regionStates.getRegionStateNodes()) {
|
||||
|
|
|
@ -432,6 +432,10 @@ public class SplitTableRegionProcedure
|
|||
}
|
||||
|
||||
RegionInfo parentHRI = node.getRegionInfo();
|
||||
if (parentHRI == null) {
|
||||
LOG.info("Unsplittable; parent region is null; node={}", node);
|
||||
return false;
|
||||
}
|
||||
// Lookup the parent HRI state from the AM, which has the latest updated info.
|
||||
// Protect against the case where concurrent SPLIT requests came in and succeeded
|
||||
// just before us.
|
||||
|
@ -457,8 +461,7 @@ public class SplitTableRegionProcedure
|
|||
// we are always able to split the region
|
||||
if (!env.getMasterServices().isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
|
||||
LOG.warn("pid=" + getProcId() + " split switch is off! skip split of " + parentHRI);
|
||||
setFailure(new IOException("Split region " +
|
||||
(parentHRI == null? "null": parentHRI.getRegionNameAsString()) +
|
||||
setFailure(new IOException("Split region " + parentHRI.getRegionNameAsString() +
|
||||
" failed due to split switch off"));
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory;
|
|||
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
|
||||
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
|
||||
justification="TODO: Fix. It is wonky have static pool initialized from instance")
|
||||
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
|
||||
implements ConfigurationObserver {
|
||||
|
||||
|
@ -67,8 +69,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
|
||||
// It may be waste resources for each cleaner chore own its pool,
|
||||
// so let's make pool for all cleaner chores.
|
||||
private static volatile ForkJoinPool chorePool;
|
||||
private static volatile int chorePoolSize;
|
||||
private static volatile ForkJoinPool CHOREPOOL;
|
||||
private static volatile int CHOREPOOLSIZE;
|
||||
|
||||
protected final FileSystem fs;
|
||||
private final Path oldFileDir;
|
||||
|
@ -102,15 +104,14 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
this.params = params;
|
||||
initCleanerChain(confKey);
|
||||
|
||||
if (chorePool == null) {
|
||||
if (CHOREPOOL == null) {
|
||||
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
|
||||
chorePoolSize = calculatePoolSize(poolSize);
|
||||
CHOREPOOLSIZE = calculatePoolSize(poolSize);
|
||||
// poolSize may be 0 or 0.0 from a careless configuration,
|
||||
// double check to make sure.
|
||||
chorePoolSize = chorePoolSize == 0 ?
|
||||
calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : chorePoolSize;
|
||||
this.chorePool = new ForkJoinPool(chorePoolSize);
|
||||
LOG.info("Cleaner pool size is {}", chorePoolSize);
|
||||
CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE;
|
||||
this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE);
|
||||
LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,11 +120,11 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
* @param poolSize size from configuration
|
||||
* @return size of pool after calculation
|
||||
*/
|
||||
int calculatePoolSize(String poolSize) {
|
||||
static int calculatePoolSize(String poolSize) {
|
||||
if (poolSize.matches("[1-9][0-9]*")) {
|
||||
// If poolSize is an integer, return it directly,
|
||||
// but upmost to the number of available processors.
|
||||
int size = Math.min(Integer.valueOf(poolSize), AVAIL_PROCESSORS);
|
||||
int size = Math.min(Integer.parseInt(poolSize), AVAIL_PROCESSORS);
|
||||
if (size == AVAIL_PROCESSORS) {
|
||||
LOG.warn("Use full core processors to scan dir, size={}", size);
|
||||
}
|
||||
|
@ -173,12 +174,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
@Override
|
||||
public void onConfigurationChange(Configuration conf) {
|
||||
int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
|
||||
if (updatedSize == chorePoolSize) {
|
||||
if (updatedSize == CHOREPOOLSIZE) {
|
||||
LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize);
|
||||
return;
|
||||
}
|
||||
chorePoolSize = updatedSize;
|
||||
if (chorePool.getPoolSize() == 0) {
|
||||
CHOREPOOLSIZE = updatedSize;
|
||||
if (CHOREPOOL.getPoolSize() == 0) {
|
||||
// Chore does not work now, update it directly.
|
||||
updateChorePoolSize(updatedSize);
|
||||
return;
|
||||
|
@ -188,9 +189,9 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
}
|
||||
|
||||
private void updateChorePoolSize(int updatedSize) {
|
||||
chorePool.shutdownNow();
|
||||
LOG.info("Update chore's pool size from {} to {}", chorePool.getParallelism(), updatedSize);
|
||||
chorePool = new ForkJoinPool(updatedSize);
|
||||
CHOREPOOL.shutdownNow();
|
||||
LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize);
|
||||
CHOREPOOL = new ForkJoinPool(updatedSize);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -226,7 +227,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
}
|
||||
// After each clean chore, checks if receives reconfigure notification while cleaning
|
||||
if (reconfig.compareAndSet(true, false)) {
|
||||
updateChorePoolSize(chorePoolSize);
|
||||
updateChorePoolSize(CHOREPOOLSIZE);
|
||||
}
|
||||
} else {
|
||||
LOG.debug("Cleaner chore disabled! Not cleaning.");
|
||||
|
@ -240,7 +241,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
public Boolean runCleaner() {
|
||||
preRunCleaner();
|
||||
CleanerTask task = new CleanerTask(this.oldFileDir, true);
|
||||
chorePool.submit(task);
|
||||
CHOREPOOL.submit(task);
|
||||
return task.join();
|
||||
}
|
||||
|
||||
|
@ -372,7 +373,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
|
||||
@VisibleForTesting
|
||||
int getChorePoolSize() {
|
||||
return chorePoolSize;
|
||||
return CHOREPOOLSIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -8111,13 +8111,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
|
||||
justification="Intentional")
|
||||
public void startRegionOperation(Operation op) throws IOException {
|
||||
switch (op) {
|
||||
case GET: // read operations
|
||||
case SCAN:
|
||||
checkReadsEnabled();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -89,6 +89,40 @@ class MemStoreFlusher implements FlushRequester {
|
|||
|
||||
private FlushType flushType;
|
||||
|
||||
/**
|
||||
* Singleton instance of this class inserted into flush queue.
|
||||
*/
|
||||
private static final WakeupFlushThread WAKEUPFLUSH_INSTANCE = new WakeupFlushThread();
|
||||
|
||||
/**
|
||||
* Marker class used as a token inserted into flush queue that ensures the flusher does not sleep.
|
||||
* Create a single instance only.
|
||||
*/
|
||||
private static final class WakeupFlushThread implements FlushQueueEntry {
|
||||
private WakeupFlushThread() {}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return obj == this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 42;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param conf
|
||||
* @param server
|
||||
|
@ -147,17 +181,18 @@ class MemStoreFlusher implements FlushRequester {
|
|||
|
||||
boolean flushedOne = false;
|
||||
while (!flushedOne) {
|
||||
// Find the biggest region that doesn't have too many storefiles
|
||||
// (might be null!)
|
||||
HRegion bestFlushableRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
|
||||
// Find the biggest region that doesn't have too many storefiles (might be null!)
|
||||
HRegion bestFlushableRegion =
|
||||
getBiggestMemStoreRegion(regionsBySize, excludedRegions, true);
|
||||
// Find the biggest region, total, even if it might have too many flushes.
|
||||
HRegion bestAnyRegion = getBiggestMemStoreRegion(
|
||||
regionsBySize, excludedRegions, false);
|
||||
HRegion bestAnyRegion = getBiggestMemStoreRegion(regionsBySize, excludedRegions, false);
|
||||
// Find the biggest region that is a secondary region
|
||||
HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize,
|
||||
excludedRegions);
|
||||
|
||||
if (bestAnyRegion == null && bestRegionReplica == null) {
|
||||
HRegion bestRegionReplica = getBiggestMemStoreOfRegionReplica(regionsBySize, excludedRegions);
|
||||
if (bestAnyRegion == null) {
|
||||
// If bestAnyRegion is null, assign replica. It may be null too. Next step is check for null
|
||||
bestAnyRegion = bestRegionReplica;
|
||||
}
|
||||
if (bestAnyRegion == null) {
|
||||
LOG.error("Above memory mark but there are no flushable regions!");
|
||||
return false;
|
||||
}
|
||||
|
@ -169,19 +204,20 @@ class MemStoreFlusher implements FlushRequester {
|
|||
case ABOVE_OFFHEAP_HIGHER_MARK:
|
||||
case ABOVE_OFFHEAP_LOWER_MARK:
|
||||
bestAnyRegionSize = bestAnyRegion.getMemStoreOffHeapSize();
|
||||
bestFlushableRegionSize = bestFlushableRegion.getMemStoreOffHeapSize();
|
||||
bestFlushableRegionSize = getMemStoreOffHeapSize(bestFlushableRegion);
|
||||
break;
|
||||
|
||||
case ABOVE_ONHEAP_HIGHER_MARK:
|
||||
case ABOVE_ONHEAP_LOWER_MARK:
|
||||
bestAnyRegionSize = bestAnyRegion.getMemStoreHeapSize();
|
||||
bestFlushableRegionSize = bestFlushableRegion.getMemStoreHeapSize();
|
||||
bestFlushableRegionSize = getMemStoreHeapSize(bestFlushableRegion);
|
||||
break;
|
||||
|
||||
default:
|
||||
bestAnyRegionSize = bestAnyRegion.getMemStoreDataSize();
|
||||
bestFlushableRegionSize = bestFlushableRegion.getMemStoreDataSize();
|
||||
bestFlushableRegionSize = getMemStoreDataSize(bestFlushableRegion);
|
||||
}
|
||||
if (bestFlushableRegion != null &&
|
||||
bestAnyRegionSize > 2 * bestFlushableRegionSize) {
|
||||
if (bestAnyRegionSize > 2 * bestFlushableRegionSize) {
|
||||
// Even if it's not supposed to be flushed, pick a region if it's more than twice
|
||||
// as big as the best flushable one - otherwise when we're under pressure we make
|
||||
// lots of little flushes and cause lots of compactions, etc, which just makes
|
||||
|
@ -211,21 +247,22 @@ class MemStoreFlusher implements FlushRequester {
|
|||
case ABOVE_OFFHEAP_HIGHER_MARK:
|
||||
case ABOVE_OFFHEAP_LOWER_MARK:
|
||||
regionToFlushSize = regionToFlush.getMemStoreOffHeapSize();
|
||||
bestRegionReplicaSize = bestRegionReplica.getMemStoreOffHeapSize();
|
||||
bestRegionReplicaSize = getMemStoreOffHeapSize(bestRegionReplica);
|
||||
break;
|
||||
|
||||
case ABOVE_ONHEAP_HIGHER_MARK:
|
||||
case ABOVE_ONHEAP_LOWER_MARK:
|
||||
regionToFlushSize = regionToFlush.getMemStoreHeapSize();
|
||||
bestRegionReplicaSize = bestRegionReplica.getMemStoreHeapSize();
|
||||
bestRegionReplicaSize = getMemStoreHeapSize(bestRegionReplica);
|
||||
break;
|
||||
|
||||
default:
|
||||
regionToFlushSize = regionToFlush.getMemStoreDataSize();
|
||||
bestRegionReplicaSize = bestRegionReplica.getMemStoreDataSize();
|
||||
bestRegionReplicaSize = getMemStoreDataSize(bestRegionReplica);
|
||||
}
|
||||
|
||||
Preconditions.checkState(
|
||||
(regionToFlush != null && regionToFlushSize > 0) ||
|
||||
(bestRegionReplica != null && bestRegionReplicaSize > 0));
|
||||
(regionToFlush != null && regionToFlushSize > 0) || bestRegionReplicaSize > 0);
|
||||
|
||||
if (regionToFlush == null ||
|
||||
(bestRegionReplica != null &&
|
||||
|
@ -266,6 +303,27 @@ class MemStoreFlusher implements FlushRequester {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return memstore offheap size or null if <code>r</code> is null
|
||||
*/
|
||||
private static long getMemStoreOffHeapSize(HRegion r) {
|
||||
return r == null? 0: r.getMemStoreOffHeapSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return memstore heap size or null if <code>r</code> is null
|
||||
*/
|
||||
private static long getMemStoreHeapSize(HRegion r) {
|
||||
return r == null? 0: r.getMemStoreHeapSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Return memstore data size or null if <code>r</code> is null
|
||||
*/
|
||||
private static long getMemStoreDataSize(HRegion r) {
|
||||
return r == null? 0: r.getMemStoreDataSize();
|
||||
}
|
||||
|
||||
private class FlushHandler extends HasThread {
|
||||
|
||||
private FlushHandler(String name) {
|
||||
|
@ -279,7 +337,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
try {
|
||||
wakeupPending.set(false); // allow someone to wake us up again
|
||||
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
|
||||
if (fqe == null || fqe instanceof WakeupFlushThread) {
|
||||
if (fqe == null || fqe == WAKEUPFLUSH_INSTANCE) {
|
||||
FlushType type = isAboveLowWaterMark();
|
||||
if (type != FlushType.NORMAL) {
|
||||
LOG.debug("Flush thread woke up because memory above low water="
|
||||
|
@ -332,7 +390,7 @@ class MemStoreFlusher implements FlushRequester {
|
|||
|
||||
private void wakeupFlushThread() {
|
||||
if (wakeupPending.compareAndSet(false, true)) {
|
||||
flushQueue.add(new WakeupFlushThread());
|
||||
flushQueue.add(WAKEUPFLUSH_INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -759,21 +817,6 @@ class MemStoreFlusher implements FlushRequester {
|
|||
interface FlushQueueEntry extends Delayed {
|
||||
}
|
||||
|
||||
/**
|
||||
* Token to insert into the flush queue that ensures that the flusher does not sleep
|
||||
*/
|
||||
static class WakeupFlushThread implements FlushQueueEntry {
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Datastructure used in the flush queue. Holds region and retry count.
|
||||
* Keeps tabs on how old this object is. Implements {@link Delayed}. On
|
||||
|
|
|
@ -1206,7 +1206,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
|
||||
}
|
||||
priority = createPriority();
|
||||
String hostname = initialIsa.getHostName();
|
||||
// Using Address means we don't get the IP too. Shorten it more even to just the host name
|
||||
// w/o the domain.
|
||||
String name = rs.getProcessName() + "/" +
|
||||
|
|
|
@ -1206,6 +1206,8 @@ public class RegionCoprocessorHost
|
|||
* @return true or false to return to client if default processing should be bypassed,
|
||||
* or null otherwise
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_BOOLEAN_RETURN_NULL",
|
||||
justification="TODO: Fix")
|
||||
public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family,
|
||||
final byte[] qualifier, final CompareOperator op, final ByteArrayComparable comparator,
|
||||
final Delete delete) throws IOException {
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
|
|||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.htrace.core.TraceScope;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -651,11 +652,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
|
|||
@Override
|
||||
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(nextWriter);
|
||||
waitForSafePoint();
|
||||
long oldFileLen = closeWriter();
|
||||
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
|
||||
this.writer = nextWriter;
|
||||
if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
|
||||
if (nextWriter instanceof AsyncProtobufLogWriter) {
|
||||
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
|
||||
}
|
||||
this.fileLengthAtLastSync = nextWriter.getLength();
|
||||
|
|
|
@ -336,7 +336,12 @@ public class MajorCompactor {
|
|||
"ERROR: Unable to parse command-line arguments " + Arrays.toString(args) + " due to: "
|
||||
+ parseException);
|
||||
printUsage(options);
|
||||
|
||||
return;
|
||||
}
|
||||
if (commandLine == null) {
|
||||
System.out.println("ERROR: Failed parse, empty commandLine; " + Arrays.toString(args));
|
||||
printUsage(options);
|
||||
return;
|
||||
}
|
||||
String tableName = commandLine.getOptionValue("table");
|
||||
String cf = commandLine.getOptionValue("cf", null);
|
||||
|
@ -353,7 +358,7 @@ public class MajorCompactor {
|
|||
String quorum =
|
||||
commandLine.getOptionValue("zk", configuration.get(HConstants.ZOOKEEPER_QUORUM));
|
||||
String rootDir = commandLine.getOptionValue("rootDir", configuration.get(HConstants.HBASE_DIR));
|
||||
long sleep = Long.valueOf(commandLine.getOptionValue("sleep", Long.toString(30000)));
|
||||
long sleep = Long.parseLong(commandLine.getOptionValue("sleep", Long.toString(30000)));
|
||||
|
||||
configuration.set(HConstants.HBASE_DIR, rootDir);
|
||||
configuration.set(HConstants.ZOOKEEPER_QUORUM, quorum);
|
||||
|
|
Loading…
Reference in New Issue