HBASE-12189 Fix new issues found by coverity static analysis

This commit is contained in:
stack 2014-10-09 20:51:04 -07:00
parent 8915130dd7
commit 06a8bb5bd1
19 changed files with 119 additions and 111 deletions

View File

@ -110,7 +110,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* Does RPC against a cluster. Manages connections per regionserver in the cluster.
* <p>See HBaseServer
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@InterfaceAudience.Private
public class RpcClient {
public static final Log LOG = LogFactory.getLog(RpcClient.class);
@ -374,13 +373,13 @@ public class RpcClient {
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@SuppressWarnings("SynchronizeOnNonFinalField")
protected class Connection extends Thread {
private ConnectionHeader header; // connection header
protected ConnectionId remoteId;
protected Socket socket = null; // connected socket
protected DataInputStream in;
protected DataOutputStream out; // Warning: can be locked inside a class level lock.
protected DataOutputStream out;
private Object outLock = new Object();
private InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name
private AuthMethod authMethod; // authentication method
@ -972,7 +971,9 @@ public class RpcClient {
}
}
this.in = new DataInputStream(new BufferedInputStream(inStream));
synchronized (this.outLock) {
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
}
// Now write out the connection header
writeConnectionHeader();
@ -1021,7 +1022,7 @@ public class RpcClient {
* Write the connection header.
*/
private synchronized void writeConnectionHeader() throws IOException {
synchronized (this.out) {
synchronized (this.outLock) {
this.out.writeInt(this.header.getSerializedSize());
this.header.writeTo(this.out);
this.out.flush();
@ -1042,8 +1043,8 @@ public class RpcClient {
}
// close the streams and therefore the socket
synchronized(this.outLock) {
if (this.out != null) {
synchronized(this.out) {
IOUtils.closeStream(out);
this.out = null;
}
@ -1105,7 +1106,7 @@ public class RpcClient {
// know where we stand, we have to close the connection.
checkIsOpen();
IOException writeException = null;
synchronized (this.out) {
synchronized (this.outLock) {
if (Thread.interrupted()) throw new InterruptedIOException();
calls.put(call.id, call); // We put first as we don't want the connection to become idle.

View File

@ -110,8 +110,10 @@ public class Sleeper {
woke = (woke == -1)? System.currentTimeMillis(): woke;
waitTime = this.period - (woke - startTime);
}
synchronized(sleepLock) {
triggerWake = false;
}
}
/**
* @return the sleep period in milliseconds

View File

@ -2280,6 +2280,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
@Override
protected Object clone() throws CloneNotSupportedException {
super.clone();
return new KVComparator();
}

View File

@ -46,9 +46,6 @@ public class SpanReceiverHost {
}
public static SpanReceiverHost getInstance(Configuration conf) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;
}
synchronized (SingletonHolder.INSTANCE.lock) {
if (SingletonHolder.INSTANCE.host != null) {
return SingletonHolder.INSTANCE.host;

View File

@ -52,9 +52,8 @@ public class JmxCacheBuster {
public static void clearJmxCache() {
//If there are more then 100 ms before the executor will run then everything should be merged.
if (fut == null || (!fut.isDone() && fut.getDelay(TimeUnit.MILLISECONDS) > 100)) return;
synchronized (lock) {
if (fut == null || (!fut.isDone() && fut.getDelay(TimeUnit.MILLISECONDS) > 100)) return;
fut = executor.getExecutor().schedule(new JmxCacheBusterRunnable(), 5, TimeUnit.SECONDS);
}
}

View File

@ -464,9 +464,13 @@ public class LocalHBaseCluster {
LocalHBaseCluster cluster = new LocalHBaseCluster(conf);
cluster.startup();
Admin admin = new HBaseAdmin(conf);
try {
HTableDescriptor htd =
new HTableDescriptor(TableName.valueOf(cluster.getClass().getName()));
admin.createTable(htd);
} finally {
admin.close();
}
cluster.shutdown();
}
}

View File

@ -264,7 +264,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
// might miss the watch-trigger that creation of RESCAN node provides.
// Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
// therefore this behavior is safe.
SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), this.recoveryMode);
SplitLogTask slt = new SplitLogTask.Done(this.details.getServerName(), getRecoveryMode());
this.watcher
.getRecoverableZooKeeper()
.getZooKeeper()
@ -424,7 +424,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
private void createNode(String path, Long retry_count) {
SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), this.recoveryMode);
SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
retry_count);
SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
@ -757,12 +757,12 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
@Override
public boolean isReplaying() {
public synchronized boolean isReplaying() {
return this.recoveryMode == RecoveryMode.LOG_REPLAY;
}
@Override
public boolean isSplitting() {
public synchronized boolean isSplitting() {
return this.recoveryMode == RecoveryMode.LOG_SPLITTING;
}
@ -774,15 +774,19 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
*/
@Override
public void setRecoveryMode(boolean isForInitialization) throws IOException {
synchronized(this) {
if (this.isDrainingDone) {
// when there is no outstanding splitlogtask after master start up, we already have up to date
// recovery mode
return;
}
}
if (this.watcher == null) {
// when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
synchronized(this) {
this.isDrainingDone = true;
this.recoveryMode = RecoveryMode.LOG_SPLITTING;
}
return;
}
boolean hasSplitLogTask = false;
@ -877,7 +881,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
try {
// blocking zk call but this is done from the timeout thread
SplitLogTask slt =
new SplitLogTask.Unassigned(this.details.getServerName(), this.recoveryMode);
new SplitLogTask.Unassigned(this.details.getServerName(), getRecoveryMode());
if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
LOG.debug("failed to resubmit task " + path + " version changed");
return false;
@ -1105,7 +1109,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
}
@Override
public RecoveryMode getRecoveryMode() {
public synchronized RecoveryMode getRecoveryMode() {
return recoveryMode;
}

View File

@ -23,7 +23,6 @@ import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -80,14 +79,15 @@ public class TimeoutExceptionInjector {
* For all time forward, do not throw an error because the process has completed.
*/
public void complete() {
// warn if the timer is already marked complete. This isn't going to be thread-safe, but should
// be good enough and its not worth locking just for a warning.
synchronized (this.timerTask) {
if (this.complete) {
LOG.warn("Timer already marked completed, ignoring!");
return;
}
LOG.debug("Marking timer as complete - no error notifications will be received for this timer.");
synchronized (this.timerTask) {
if (LOG.isDebugEnabled()) {
LOG.debug("Marking timer as complete - no error notifications will be received for " +
"this timer.");
}
this.complete = true;
}
this.timer.cancel();

View File

@ -129,6 +129,7 @@ public class VerifyReplication extends Configured implements Tool {
ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
// TODO: THis HTable doesn't get closed. Fix!
Table replicatedTable = new HTable(peerConf, tableName);
scan.setStartRow(value.getRow());
scan.setStopRow(tableSplit.getEndRow());

View File

@ -194,7 +194,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
clusterMap.remove(masterServerName);
}
boolean emptyRegionServerPresent = false;
long startTime = System.currentTimeMillis();
// construct a Cluster object with clusterMap and rest of the
@ -256,10 +255,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
regionsToMove.add(new RegionPlan(hri, sal.getServerName(), null));
numTaken++;
if (numTaken >= numToOffload) break;
// fetch in alternate order if there is new region server
if (emptyRegionServerPresent) {
fetchFromTail = !fetchFromTail;
}
}
serverBalanceInfo.put(sal.getServerName(),
new BalanceInfo(numToOffload, (-1)*numTaken));
@ -303,9 +298,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
if (numToTake == 0) continue;
addRegionPlan(regionsToMove, fetchFromTail, si, regionsToReturn);
if (emptyRegionServerPresent) {
fetchFromTail = !fetchFromTail;
}
underloadedServers.put(si, numToTake-1);
cnt++;
@ -381,9 +373,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
addRegionPlan(regionsToMove, fetchFromTail,
server.getKey().getServerName(), regionsToReturn);
numTaken++;
if (emptyRegionServerPresent) {
fetchFromTail = !fetchFromTail;
}
}
}
@ -401,9 +390,6 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
}
addRegionPlan(regionsToMove, fetchFromTail,
server.getKey().getServerName(), regionsToReturn);
if (emptyRegionServerPresent) {
fetchFromTail = !fetchFromTail;
}
if (regionsToMove.isEmpty()) {
break;
}

View File

@ -18,22 +18,11 @@
package org.apache.hadoop.hbase.quotas;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.collect.Sets;
/**
* In-Memory state of table or namespace quotas
*/
@ -53,7 +42,7 @@ public class QuotaState {
lastUpdate = updateTs;
}
public long getLastUpdate() {
public synchronized long getLastUpdate() {
return lastUpdate;
}
@ -64,7 +53,7 @@ public class QuotaState {
@Override
public synchronized String toString() {
StringBuilder builder = new StringBuilder();
builder.append("QuotaState(ts=" + lastUpdate);
builder.append("QuotaState(ts=" + getLastUpdate());
if (isBypass()) {
builder.append(" bypass");
} else {
@ -119,4 +108,12 @@ public class QuotaState {
lastQuery = EnvironmentEdgeManager.currentTime();
return globalLimiter;
}
/**
* Return the limiter associated with this quota without updating internal last query stats
* @return the quota limiter
*/
synchronized QuotaLimiter getGlobalLimiterWithoutUpdatingLastQuery() {
return globalLimiter;
}
}

View File

@ -23,25 +23,18 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.collect.Sets;
/**
* In-Memory state of the user quotas
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class UserQuotaState extends QuotaState {
private static final Log LOG = LogFactory.getLog(UserQuotaState.class);
private Map<String, QuotaLimiter> namespaceLimiters = null;
private Map<TableName, QuotaLimiter> tableLimiters = null;
private boolean bypassGlobals = false;
@ -57,13 +50,13 @@ public class UserQuotaState extends QuotaState {
@Override
public synchronized String toString() {
StringBuilder builder = new StringBuilder();
builder.append("UserQuotaState(ts=" + lastUpdate);
builder.append("UserQuotaState(ts=" + getLastUpdate());
if (bypassGlobals) builder.append(" bypass-globals");
if (isBypass()) {
builder.append(" bypass");
} else {
if (globalLimiter != NoopQuotaLimiter.get()) {
if (getGlobalLimiterWithoutUpdatingLastQuery() != NoopQuotaLimiter.get()) {
builder.append(" global-limiter");
}
@ -93,7 +86,7 @@ public class UserQuotaState extends QuotaState {
@Override
public synchronized boolean isBypass() {
return !bypassGlobals &&
globalLimiter == NoopQuotaLimiter.get() &&
getGlobalLimiterWithoutUpdatingLastQuery() == NoopQuotaLimiter.get() &&
(tableLimiters == null || tableLimiters.isEmpty()) &&
(namespaceLimiters == null || namespaceLimiters.isEmpty());
}
@ -204,6 +197,6 @@ public class UserQuotaState extends QuotaState {
QuotaLimiter limiter = namespaceLimiters.get(table.getNamespaceAsString());
if (limiter != null) return limiter;
}
return globalLimiter;
return getGlobalLimiterWithoutUpdatingLastQuery();
}
}

View File

@ -733,6 +733,11 @@ public class DefaultMemStore implements MemStore {
}
}
/**
* Lock on 'this' must be held by caller.
* @param it
* @return Next Cell
*/
private Cell getNext(Iterator<Cell> it) {
Cell startCell = theNext;
Cell v = null;

View File

@ -50,6 +50,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
* when MemStore clearing snapshot for flush
*/
@SuppressWarnings("javadoc")
@InterfaceAudience.Private
public class MemStoreChunkPool {
private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
@ -180,9 +181,9 @@ public class MemStoreChunkPool {
*/
static MemStoreChunkPool getPool(Configuration conf) {
if (globalInstance != null) return globalInstance;
if (chunkPoolDisabled) return null;
synchronized (MemStoreChunkPool.class) {
if (chunkPoolDisabled) return null;
if (globalInstance != null) return globalInstance;
float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
if (poolSizePercentage <= 0) {

View File

@ -35,6 +35,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -72,7 +73,7 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService
private static final Tag[] LABELS_TABLE_TAGS = new Tag[1];
private static final byte[] DUMMY_VALUE = new byte[0];
private volatile int ordinalCounter = -1;
private AtomicInteger ordinalCounter = new AtomicInteger(-1);
private Configuration conf;
private HRegion labelsRegion;
private VisibilityLabelsCache labelsCache;
@ -127,7 +128,7 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService
ordinal = i;
}
}
this.ordinalCounter = ordinal + 1;
this.ordinalCounter.set(ordinal + 1);
if (labels.size() > 0) {
// If there is no data need not write to zk
byte[] serialized = VisibilityUtils.getDataToWriteToZooKeeper(labels);
@ -239,13 +240,13 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService
finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
new LabelAlreadyExistsException("Label '" + labelStr + "' already exists"));
} else {
Put p = new Put(Bytes.toBytes(ordinalCounter));
Put p = new Put(Bytes.toBytes(ordinalCounter.get()));
p.addImmutable(LABELS_TABLE_FAMILY, LABEL_QUALIFIER, label, LABELS_TABLE_TAGS);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding the label " + labelStr);
}
puts.add(p);
ordinalCounter++;
ordinalCounter.incrementAndGet();
}
i++;
}
@ -350,6 +351,7 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService
s.setFilter(filter);
List<String> auths = new ArrayList<String>();
RegionScanner scanner = this.labelsRegion.getScanner(s);
try {
List<Cell> results = new ArrayList<Cell>(1);
while (true) {
scanner.next(results);
@ -362,6 +364,9 @@ public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService
}
results.clear();
}
} finally {
scanner.close();
}
return auths;
}

View File

@ -1076,10 +1076,11 @@ public abstract class FSUtils {
}
// compute percentage per table and store in result list
frags.put(FSUtils.getTableName(d).getNameAsString(),
Math.round((float) cfFrag / cfCount * 100));
cfCount == 0? 0: Math.round((float) cfFrag / cfCount * 100));
}
// set overall percentage for all tables
frags.put("-TOTAL-", Math.round((float) cfFragTotal / cfCountTotal * 100));
frags.put("-TOTAL-",
cfCountTotal == 0? 0: Math.round((float) cfFragTotal / cfCountTotal * 100));
return frags;
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch;
public class MultiHConnection {
private static final Log LOG = LogFactory.getLog(MultiHConnection.class);
private HConnection[] hConnections;
private final Object hConnectionsLock = new Object();
private int noOfConnections;
private ExecutorService batchPool;
@ -60,11 +61,13 @@ public class MultiHConnection {
public MultiHConnection(Configuration conf, int noOfConnections)
throws IOException {
this.noOfConnections = noOfConnections;
synchronized (this.hConnectionsLock) {
hConnections = new HConnection[noOfConnections];
for (int i = 0; i < noOfConnections; i++) {
HConnection conn = HConnectionManager.createConnection(conf);
hConnections[i] = conn;
}
}
createBatchPool(conf);
}
@ -72,8 +75,8 @@ public class MultiHConnection {
* Close the open connections and shutdown the batchpool
*/
public void close() {
synchronized (hConnectionsLock) {
if (hConnections != null) {
synchronized (hConnections) {
if (hConnections != null) {
for (Connection conn : hConnections) {
if (conn != null) {

View File

@ -379,10 +379,13 @@ public class RegionSplitter {
desc.addFamily(new HColumnDescriptor(Bytes.toBytes(cf)));
}
HBaseAdmin admin = new HBaseAdmin(conf);
try {
Preconditions.checkArgument(!admin.tableExists(tableName),
"Table already exists: " + tableName);
admin.createTable(desc, splitAlgo.split(splitCount));
} finally {
admin.close();
}
LOG.debug("Table created! Waiting for regions to show online in META...");
if (!conf.getBoolean("split.verify", true)) {
// NOTE: createTable is synchronous on the table, but not on the regions
@ -529,7 +532,11 @@ public class RegionSplitter {
byte[] split = dr.getSecond();
LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
try {
admin.split(table.getTableName(), split);
} finally {
admin.close();
}
LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
LinkedList<Pair<byte[], byte[]>> local_finished = Lists.newLinkedList();

View File

@ -644,11 +644,12 @@ public class TestSplitLogManager {
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
slm = new SplitLogManager(ds, testConf, stopper, master, DUMMY_MASTER);
LOG.info("Mode1=" + slm.getRecoveryMode());
assertTrue(slm.isLogSplitting());
zkw.getRecoverableZooKeeper().delete(ZKSplitLog.getEncodedNodeName(zkw, "testRecovery"), -1);
LOG.info("Mode2=" + slm.getRecoveryMode());
slm.setRecoveryMode(false);
assertTrue(slm.isLogReplaying());
LOG.info("Mode3=" + slm.getRecoveryMode());
assertTrue("Mode4=" + slm.getRecoveryMode(), slm.isLogReplaying());
}
}