HBASE-15118 Fix findbugs complaint in hbase-server
This commit is contained in:
parent
47fc696bc6
commit
9bf26f46d1
|
@ -60,7 +60,7 @@ public class JMXListener implements Coprocessor {
|
|||
* only 1 JMX instance is allowed, otherwise there is port conflict even if
|
||||
* we only load regionserver coprocessor on master
|
||||
*/
|
||||
private static JMXConnectorServer jmxCS = null;
|
||||
private static JMXConnectorServer JMX_CS = null;
|
||||
|
||||
public static JMXServiceURL buildJMXServiceURL(int rmiRegistryPort,
|
||||
int rmiConnectorPort) throws IOException {
|
||||
|
@ -137,8 +137,13 @@ public class JMXListener implements Coprocessor {
|
|||
|
||||
try {
|
||||
// Start the JMXListener with the connection string
|
||||
jmxCS = JMXConnectorServerFactory.newJMXConnectorServer(serviceUrl, jmxEnv, mbs);
|
||||
jmxCS.start();
|
||||
synchronized(JMXListener.class) {
|
||||
if (JMX_CS != null) {
|
||||
throw new RuntimeException("Started by another thread?");
|
||||
}
|
||||
JMX_CS = JMXConnectorServerFactory.newJMXConnectorServer(serviceUrl, jmxEnv, mbs);
|
||||
JMX_CS.start();
|
||||
}
|
||||
LOG.info("ConnectorServer started!");
|
||||
} catch (IOException e) {
|
||||
LOG.error("fail to start connector server!", e);
|
||||
|
@ -148,10 +153,10 @@ public class JMXListener implements Coprocessor {
|
|||
|
||||
public void stopConnectorServer() throws IOException {
|
||||
synchronized(JMXListener.class) {
|
||||
if (jmxCS != null) {
|
||||
jmxCS.stop();
|
||||
if (JMX_CS != null) {
|
||||
JMX_CS.stop();
|
||||
LOG.info("ConnectorServer stopped!");
|
||||
jmxCS = null;
|
||||
JMX_CS = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -186,7 +191,7 @@ public class JMXListener implements Coprocessor {
|
|||
}
|
||||
|
||||
synchronized(JMXListener.class) {
|
||||
if (jmxCS != null) {
|
||||
if (JMX_CS != null) {
|
||||
LOG.info("JMXListener has been started at Registry port " + rmiRegistryPort);
|
||||
}
|
||||
else {
|
||||
|
|
|
@ -30,8 +30,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
* See https://issues.apache.org/jira/browse/HBASE-13448
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS")
|
||||
public class SizeCachedKeyValue extends KeyValue {
|
||||
|
||||
private static final int HEAP_SIZE_OVERHEAD = Bytes.SIZEOF_SHORT + Bytes.SIZEOF_INT;
|
||||
|
||||
private short rowLen;
|
||||
|
|
|
@ -175,7 +175,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
|
|||
return;
|
||||
}
|
||||
Task task = findOrCreateOrphanTask(path);
|
||||
if (task.isOrphan() && (task.incarnation == 0)) {
|
||||
if (task.isOrphan() && (task.incarnation.get() == 0)) {
|
||||
LOG.info("resubmitting unassigned orphan task " + path);
|
||||
// ignore failure to resubmit. The timeout-monitor will handle it later
|
||||
// albeit in a more crude fashion
|
||||
|
@ -228,7 +228,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
|
|||
version = -1;
|
||||
}
|
||||
LOG.info("resubmitting task " + path);
|
||||
task.incarnation++;
|
||||
task.incarnation.incrementAndGet();
|
||||
boolean result = resubmit(this.details.getServerName(), path, version);
|
||||
if (!result) {
|
||||
task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
|
||||
|
|
|
@ -78,7 +78,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
|||
private TaskExecutor splitTaskExecutor;
|
||||
|
||||
private final Object taskReadyLock = new Object();
|
||||
volatile int taskReadySeq = 0;
|
||||
private AtomicInteger taskReadySeq = new AtomicInteger(0);
|
||||
private volatile String currentTask = null;
|
||||
private int currentVersion;
|
||||
private volatile boolean shouldStop = false;
|
||||
|
@ -106,7 +106,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
|||
if (path.equals(watcher.splitLogZNode)) {
|
||||
if (LOG.isTraceEnabled()) LOG.trace("tasks arrived or departed on " + path);
|
||||
synchronized (taskReadyLock) {
|
||||
taskReadySeq++;
|
||||
this.taskReadySeq.incrementAndGet();
|
||||
taskReadyLock.notify();
|
||||
}
|
||||
}
|
||||
|
@ -400,14 +400,14 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
|||
* policy puts an upper-limit on the number of simultaneous log splitting that could be happening
|
||||
* in a cluster.
|
||||
* <p>
|
||||
* Synchronization using {@link #taskReadyLock} ensures that it will try to grab every task that
|
||||
* has been put up
|
||||
* Synchronization using <code>taskReadyLock</code> ensures that it will try to grab every task
|
||||
* that has been put up
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Override
|
||||
public void taskLoop() throws InterruptedException {
|
||||
while (!shouldStop) {
|
||||
int seq_start = taskReadySeq;
|
||||
int seq_start = taskReadySeq.get();
|
||||
List<String> paths = null;
|
||||
paths = getTaskList();
|
||||
if (paths == null) {
|
||||
|
@ -441,7 +441,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
|||
}
|
||||
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
|
||||
synchronized (taskReadyLock) {
|
||||
while (seq_start == taskReadySeq) {
|
||||
while (seq_start == taskReadySeq.get()) {
|
||||
taskReadyLock.wait(checkInterval);
|
||||
if (server != null) {
|
||||
// check to see if we have stale recovering regions in our internal memory state
|
||||
|
@ -527,7 +527,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
|||
|
||||
@Override
|
||||
public int getTaskReadySeq() {
|
||||
return taskReadySeq;
|
||||
return taskReadySeq.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -149,6 +149,8 @@ public class JMXJsonServlet extends HttpServlet {
|
|||
* The servlet response we are creating
|
||||
*/
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="XSS_REQUEST_PARAMETER_TO_SERVLET_WRITER",
|
||||
justification="TODO: See HBASE-15122")
|
||||
public void doGet(HttpServletRequest request, HttpServletResponse response) {
|
||||
try {
|
||||
if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), request, response)) {
|
||||
|
|
|
@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
* it fallbacks to the archived path.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS",
|
||||
justification="To be fixed but warning suppressed for now")
|
||||
public class HFileLink extends FileLink {
|
||||
private static final Log LOG = LogFactory.getLog(HFileLink.class);
|
||||
|
||||
|
|
|
@ -488,6 +488,8 @@ public class HFile {
|
|||
* @return an appropriate instance of HFileReader
|
||||
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
|
||||
justification="Intentional")
|
||||
private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
|
||||
long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
|
||||
FixedFileTrailer trailer = null;
|
||||
|
@ -498,7 +500,7 @@ public class HFile {
|
|||
switch (trailer.getMajorVersion()) {
|
||||
case 2:
|
||||
LOG.debug("Opening HFile v2 with v3 reader");
|
||||
// Fall through.
|
||||
// Fall through. FindBugs: SF_SWITCH_FALLTHROUGH
|
||||
case 3 :
|
||||
return new HFileReaderImpl(path, trailer, fsdis, size, cacheConf, hfs, conf);
|
||||
default:
|
||||
|
|
|
@ -109,8 +109,8 @@ public class HFileBlock implements Cacheable {
|
|||
new byte[HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM];
|
||||
|
||||
// How to get the estimate correctly? if it is a singleBB?
|
||||
public static final int MULTI_BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
|
||||
new MultiByteBuff(ByteBuffer.wrap(new byte[0], 0, 0)).getClass(), false);
|
||||
public static final int MULTI_BYTE_BUFFER_HEAP_SIZE =
|
||||
(int)ClassSize.estimateBase(MultiByteBuff.class, false);
|
||||
|
||||
// meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
|
||||
public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
|
|||
import java.security.Key;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -473,7 +474,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
private int currMemstoreTSLen;
|
||||
private long currMemstoreTS;
|
||||
// Updated but never read?
|
||||
protected volatile int blockFetches;
|
||||
protected AtomicInteger blockFetches = new AtomicInteger(0);
|
||||
protected final HFile.Reader reader;
|
||||
private int currTagsLen;
|
||||
// buffer backed keyonlyKV
|
||||
|
@ -877,6 +878,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
* @return the next block, or null if there are no more data blocks
|
||||
* @throws IOException
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
|
||||
justification="Yeah, unnecessary null check; could do w/ clean up")
|
||||
protected HFileBlock readNextDataBlock() throws IOException {
|
||||
long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
|
||||
if (curBlock == null)
|
||||
|
@ -885,8 +888,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
HFileBlock block = this.curBlock;
|
||||
|
||||
do {
|
||||
if (block.getOffset() >= lastDataBlockOffset)
|
||||
if (block.getOffset() >= lastDataBlockOffset) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (block.getOffset() < 0) {
|
||||
throw new IOException("Invalid block file offset: " + block);
|
||||
|
@ -898,7 +902,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
+ block.getOnDiskSizeWithHeader(),
|
||||
block.getNextBlockOnDiskSizeWithHeader(), cacheBlocks, pread,
|
||||
isCompaction, true, null, getEffectiveDataBlockEncoding());
|
||||
if (block != null && !block.getBlockType().isData()) {
|
||||
if (block != null && !block.getBlockType().isData()) { // Findbugs: NP_NULL_ON_SOME_PATH
|
||||
// Whatever block we read we will be returning it unless
|
||||
// it is a datablock. Just in case the blocks are non data blocks
|
||||
reader.returnBlock(block);
|
||||
|
@ -1228,7 +1232,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
updateCurrBlockRef(newBlock);
|
||||
blockBuffer = newBlock.getBufferWithoutHeader();
|
||||
readKeyValueLen();
|
||||
blockFetches++;
|
||||
blockFetches.incrementAndGet();
|
||||
|
||||
// Reset the next indexed key
|
||||
this.nextIndexedKey = null;
|
||||
|
@ -1667,7 +1671,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
updateCurrBlockRef(newBlock);
|
||||
ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
|
||||
seeker.setCurrentBuffer(encodedBuffer);
|
||||
blockFetches++;
|
||||
blockFetches.incrementAndGet();
|
||||
|
||||
// Reset the next indexed key
|
||||
this.nextIndexedKey = null;
|
||||
|
|
|
@ -354,7 +354,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
// (table,startrow,hash) so can't be treated as plain byte arrays. Just skip
|
||||
// out without
|
||||
// trying to do this optimization.
|
||||
if (comparator != null && comparator instanceof MetaCellComparator) {
|
||||
if (comparator instanceof MetaCellComparator) {
|
||||
return right;
|
||||
}
|
||||
int diff = comparator.compareRows(left, right);
|
||||
|
|
|
@ -322,6 +322,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
private long responseBlockSize = 0;
|
||||
private boolean retryImmediatelySupported;
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
|
||||
justification="Can't figure why this complaint is happening... see below")
|
||||
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
|
||||
Message param, CellScanner cellScanner, Connection connection, Responder responder,
|
||||
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
|
||||
|
@ -339,15 +341,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
this.isError = false;
|
||||
this.size = size;
|
||||
this.tinfo = tinfo;
|
||||
this.user = connection == null ? null : connection.user;
|
||||
this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
|
||||
this.remoteAddress = remoteAddress;
|
||||
this.retryImmediatelySupported = connection.retryImmediatelySupported;
|
||||
this.retryImmediatelySupported =
|
||||
connection == null? null: connection.retryImmediatelySupported;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call is done. Execution happened and we returned results to client. It is now safe to
|
||||
* cleanup.
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="Presume the lock on processing request held by caller is protection enough")
|
||||
void done() {
|
||||
if (this.cellBlock != null && reservoir != null) {
|
||||
// Return buffer to reservoir now we are done with it.
|
||||
|
@ -599,7 +604,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setCallBack(RpcCallback callback) {
|
||||
public synchronized void setCallBack(RpcCallback callback) {
|
||||
this.callback = callback;
|
||||
}
|
||||
|
||||
|
@ -779,6 +784,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="selector access is not synchronized; seems fine but concerned changing " +
|
||||
"it will have per impact")
|
||||
public void run() {
|
||||
LOG.info(getName() + ": starting");
|
||||
while (running) {
|
||||
|
@ -1280,15 +1288,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
private boolean useWrap = false;
|
||||
// Fake 'call' for failed authorization response
|
||||
private static final int AUTHORIZATION_FAILED_CALLID = -1;
|
||||
private final Call authFailedCall =
|
||||
new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null,
|
||||
null);
|
||||
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
|
||||
null, null, this, null, 0, null, null);
|
||||
private ByteArrayOutputStream authFailedResponse =
|
||||
new ByteArrayOutputStream();
|
||||
// Fake 'call' for SASL context setup
|
||||
private static final int SASL_CALLID = -33;
|
||||
private final Call saslCall =
|
||||
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
|
||||
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
|
||||
0, null, null);
|
||||
|
||||
// was authentication allowed with a fallback to simple auth
|
||||
private boolean authenticatedWithFallback;
|
||||
|
@ -2177,7 +2184,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void refreshAuthManager(PolicyProvider pp) {
|
||||
public synchronized void refreshAuthManager(PolicyProvider pp) {
|
||||
// Ignore warnings that this should be accessed in a static way instead of via an instance;
|
||||
// it'll break if you go via static route.
|
||||
this.authManager.refresh(this.conf, pp);
|
||||
|
@ -2403,7 +2410,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
|||
* @throws org.apache.hadoop.security.authorize.AuthorizationException
|
||||
* when the client isn't authorized to talk the protocol
|
||||
*/
|
||||
public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
|
||||
public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
|
||||
InetAddress addr)
|
||||
throws AuthorizationException {
|
||||
if (authorize) {
|
||||
Class<?> c = getServiceInterface(services, connection.getServiceName());
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -135,17 +134,18 @@ public class HashTable extends Configured implements Tool {
|
|||
p.setProperty("endTimestamp", Long.toString(endTime));
|
||||
}
|
||||
|
||||
FSDataOutputStream out = fs.create(path);
|
||||
p.store(new OutputStreamWriter(out, Charsets.UTF_8), null);
|
||||
out.close();
|
||||
try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
|
||||
p.store(osw, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void readPropertiesFile(FileSystem fs, Path path) throws IOException {
|
||||
FSDataInputStream in = fs.open(path);
|
||||
Properties p = new Properties();
|
||||
p.load(new InputStreamReader(in, Charsets.UTF_8));
|
||||
in.close();
|
||||
|
||||
try (FSDataInputStream in = fs.open(path)) {
|
||||
try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
|
||||
p.load(isr);
|
||||
}
|
||||
}
|
||||
tableName = p.getProperty("table");
|
||||
families = p.getProperty("columnFamilies");
|
||||
batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
|
||||
|
|
|
@ -142,6 +142,8 @@ public class Import extends Configured implements Tool {
|
|||
}
|
||||
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
|
||||
justification="This is wrong, yes, but we should be purging Writables, not fixing them")
|
||||
public int compareTo(KeyValueWritableComparable o) {
|
||||
return CellComparator.COMPARATOR.compare(this.kv, ((KeyValueWritableComparable)o).kv);
|
||||
}
|
||||
|
@ -249,6 +251,8 @@ public class Import extends Configured implements Tool {
|
|||
/**
|
||||
* A mapper that just writes out KeyValues.
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
|
||||
justification="Writables are going away and this has been this way forever")
|
||||
public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
|
||||
private Map<byte[], byte[]> cfRenameMap;
|
||||
private Filter filter;
|
||||
|
|
|
@ -107,7 +107,7 @@ public class ImportTsv extends Configured implements Tool {
|
|||
* If table didn't exist and was created in dry-run mode, this flag is
|
||||
* flipped to delete it when MR ends.
|
||||
*/
|
||||
private static boolean dryRunTableCreated;
|
||||
private static boolean DRY_RUN_TABLE_CREATED;
|
||||
|
||||
public static class TsvParser {
|
||||
/**
|
||||
|
@ -475,118 +475,119 @@ public class ImportTsv extends Configured implements Tool {
|
|||
|
||||
// See if a non-default Mapper was set
|
||||
String mapperClassName = conf.get(MAPPER_CONF_KEY);
|
||||
Class mapperClass =
|
||||
mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;
|
||||
Class mapperClass = mapperClassName != null? Class.forName(mapperClassName): DEFAULT_MAPPER;
|
||||
|
||||
TableName tableName = TableName.valueOf(args[0]);
|
||||
Path inputDir = new Path(args[1]);
|
||||
String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
|
||||
job = Job.getInstance(conf, jobName);
|
||||
job.setJarByClass(mapperClass);
|
||||
FileInputFormat.setInputPaths(job, inputDir);
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
job.setMapperClass(mapperClass);
|
||||
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
||||
String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
|
||||
if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
|
||||
String fileLoc = conf.get(CREDENTIALS_LOCATION);
|
||||
Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
|
||||
job.getCredentials().addAll(cred);
|
||||
}
|
||||
TableName tableName = TableName.valueOf(args[0]);
|
||||
Path inputDir = new Path(args[1]);
|
||||
String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
|
||||
job = Job.getInstance(conf, jobName);
|
||||
job.setJarByClass(mapperClass);
|
||||
FileInputFormat.setInputPaths(job, inputDir);
|
||||
job.setInputFormatClass(TextInputFormat.class);
|
||||
job.setMapperClass(mapperClass);
|
||||
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
||||
String[] columns = conf.getStrings(COLUMNS_CONF_KEY);
|
||||
if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
|
||||
String fileLoc = conf.get(CREDENTIALS_LOCATION);
|
||||
Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
|
||||
job.getCredentials().addAll(cred);
|
||||
}
|
||||
|
||||
if (hfileOutPath != null) {
|
||||
if (!admin.tableExists(tableName)) {
|
||||
LOG.warn(format("Table '%s' does not exist.", tableName));
|
||||
if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
|
||||
// TODO: this is backwards. Instead of depending on the existence of a table,
|
||||
// create a sane splits file for HFileOutputFormat based on data sampling.
|
||||
createTable(admin, tableName, columns);
|
||||
if (isDryRun) {
|
||||
LOG.warn("Dry run: Table will be deleted at end of dry run.");
|
||||
dryRunTableCreated = true;
|
||||
}
|
||||
} else {
|
||||
String errorMsg =
|
||||
format("Table '%s' does not exist and '%s' is set to no.", tableName,
|
||||
CREATE_TABLE_CONF_KEY);
|
||||
LOG.error(errorMsg);
|
||||
throw new TableNotFoundException(errorMsg);
|
||||
}
|
||||
}
|
||||
try (Table table = connection.getTable(tableName);
|
||||
RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
|
||||
boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
|
||||
// if no.strict is false then check column family
|
||||
if(!noStrict) {
|
||||
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
||||
Set<String> cfSet = getColumnFamilies(columns);
|
||||
HTableDescriptor tDesc = table.getTableDescriptor();
|
||||
for (String cf : cfSet) {
|
||||
if(tDesc.getFamily(Bytes.toBytes(cf)) == null) {
|
||||
unmatchedFamilies.add(cf);
|
||||
}
|
||||
}
|
||||
if(unmatchedFamilies.size() > 0) {
|
||||
ArrayList<String> familyNames = new ArrayList<String>();
|
||||
for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) {
|
||||
familyNames.add(family.getNameAsString());
|
||||
}
|
||||
String msg =
|
||||
"Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
|
||||
+ " does not match with any of the table " + tableName
|
||||
+ " column families " + familyNames + ".\n"
|
||||
+ "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
|
||||
+ "=true.\n";
|
||||
usage(msg);
|
||||
System.exit(-1);
|
||||
if (hfileOutPath != null) {
|
||||
if (!admin.tableExists(tableName)) {
|
||||
LOG.warn(format("Table '%s' does not exist.", tableName));
|
||||
if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
|
||||
// TODO: this is backwards. Instead of depending on the existence of a table,
|
||||
// create a sane splits file for HFileOutputFormat based on data sampling.
|
||||
createTable(admin, tableName, columns);
|
||||
if (isDryRun) {
|
||||
LOG.warn("Dry run: Table will be deleted at end of dry run.");
|
||||
synchronized (ImportTsv.class) {
|
||||
DRY_RUN_TABLE_CREATED = true;
|
||||
}
|
||||
}
|
||||
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
||||
job.setMapOutputValueClass(Text.class);
|
||||
job.setReducerClass(TextSortReducer.class);
|
||||
} else {
|
||||
job.setMapOutputValueClass(Put.class);
|
||||
job.setCombinerClass(PutCombiner.class);
|
||||
job.setReducerClass(PutSortReducer.class);
|
||||
}
|
||||
if (!isDryRun) {
|
||||
Path outputDir = new Path(hfileOutPath);
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
|
||||
regionLocator);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (!admin.tableExists(tableName)) {
|
||||
String errorMsg = format("Table '%s' does not exist.", tableName);
|
||||
} else {
|
||||
String errorMsg =
|
||||
format("Table '%s' does not exist and '%s' is set to no.", tableName,
|
||||
CREATE_TABLE_CONF_KEY);
|
||||
LOG.error(errorMsg);
|
||||
throw new TableNotFoundException(errorMsg);
|
||||
}
|
||||
}
|
||||
try (Table table = connection.getTable(tableName);
|
||||
RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
|
||||
boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
|
||||
// if no.strict is false then check column family
|
||||
if(!noStrict) {
|
||||
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
||||
Set<String> cfSet = getColumnFamilies(columns);
|
||||
HTableDescriptor tDesc = table.getTableDescriptor();
|
||||
for (String cf : cfSet) {
|
||||
if(tDesc.getFamily(Bytes.toBytes(cf)) == null) {
|
||||
unmatchedFamilies.add(cf);
|
||||
}
|
||||
}
|
||||
if(unmatchedFamilies.size() > 0) {
|
||||
ArrayList<String> familyNames = new ArrayList<String>();
|
||||
for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) {
|
||||
familyNames.add(family.getNameAsString());
|
||||
}
|
||||
String msg =
|
||||
"Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
|
||||
+ " does not match with any of the table " + tableName
|
||||
+ " column families " + familyNames + ".\n"
|
||||
+ "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
|
||||
+ "=true.\n";
|
||||
usage(msg);
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
||||
usage(TsvImporterTextMapper.class.toString()
|
||||
+ " should not be used for non bulkloading case. use "
|
||||
+ TsvImporterMapper.class.toString()
|
||||
+ " or custom mapper whose value type is Put.");
|
||||
System.exit(-1);
|
||||
job.setMapOutputValueClass(Text.class);
|
||||
job.setReducerClass(TextSortReducer.class);
|
||||
} else {
|
||||
job.setMapOutputValueClass(Put.class);
|
||||
job.setCombinerClass(PutCombiner.class);
|
||||
job.setReducerClass(PutSortReducer.class);
|
||||
}
|
||||
if (!isDryRun) {
|
||||
// No reducers. Just write straight to table. Call initTableReducerJob
|
||||
// to set up the TableOutputFormat.
|
||||
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
|
||||
Path outputDir = new Path(hfileOutPath);
|
||||
FileOutputFormat.setOutputPath(job, outputDir);
|
||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
|
||||
regionLocator);
|
||||
}
|
||||
job.setNumReduceTasks(0);
|
||||
}
|
||||
if (isDryRun) {
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
job.getConfiguration().setStrings("io.serializations",
|
||||
job.getConfiguration().get("io.serializations"),
|
||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||
KeyValueSerialization.class.getName());
|
||||
} else {
|
||||
if (!admin.tableExists(tableName)) {
|
||||
String errorMsg = format("Table '%s' does not exist.", tableName);
|
||||
LOG.error(errorMsg);
|
||||
throw new TableNotFoundException(errorMsg);
|
||||
}
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
||||
com.google.common.base.Function.class /* Guava used by TsvParser */);
|
||||
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
||||
usage(TsvImporterTextMapper.class.toString()
|
||||
+ " should not be used for non bulkloading case. use "
|
||||
+ TsvImporterMapper.class.toString()
|
||||
+ " or custom mapper whose value type is Put.");
|
||||
System.exit(-1);
|
||||
}
|
||||
if (!isDryRun) {
|
||||
// No reducers. Just write straight to table. Call initTableReducerJob
|
||||
// to set up the TableOutputFormat.
|
||||
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
|
||||
}
|
||||
job.setNumReduceTasks(0);
|
||||
}
|
||||
if (isDryRun) {
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
job.getConfiguration().setStrings("io.serializations",
|
||||
job.getConfiguration().get("io.serializations"),
|
||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||
KeyValueSerialization.class.getName());
|
||||
}
|
||||
TableMapReduceUtil.addDependencyJars(job);
|
||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
||||
com.google.common.base.Function.class /* Guava used by TsvParser */);
|
||||
}
|
||||
}
|
||||
return job;
|
||||
|
@ -616,7 +617,8 @@ public class ImportTsv extends Configured implements Tool {
|
|||
}
|
||||
admin.deleteTable(tableName);
|
||||
} catch (IOException e) {
|
||||
LOG.error(format("***Dry run: Failed to delete table '%s'.***\n%s", tableName, e.toString()));
|
||||
LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName,
|
||||
e.toString()));
|
||||
return;
|
||||
}
|
||||
LOG.info(format("Dry run: Deleted table '%s'.", tableName));
|
||||
|
@ -658,7 +660,7 @@ public class ImportTsv extends Configured implements Tool {
|
|||
"input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
|
||||
" designates that this column should be\n" +
|
||||
"used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
|
||||
TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional.\n" +
|
||||
TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" +
|
||||
"You must specify at most one column as timestamp key for each imported record.\n" +
|
||||
"Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
|
||||
"Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
|
||||
|
@ -767,10 +769,16 @@ public class ImportTsv extends Configured implements Tool {
|
|||
// system time
|
||||
getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
|
||||
|
||||
dryRunTableCreated = false;
|
||||
synchronized (ImportTsv.class) {
|
||||
DRY_RUN_TABLE_CREATED = false;
|
||||
}
|
||||
Job job = createSubmittableJob(getConf(), args);
|
||||
boolean success = job.waitForCompletion(true);
|
||||
if (dryRunTableCreated) {
|
||||
boolean delete = false;
|
||||
synchronized (ImportTsv.class) {
|
||||
delete = DRY_RUN_TABLE_CREATED;
|
||||
}
|
||||
if (delete) {
|
||||
deleteTable(getConf(), args);
|
||||
}
|
||||
return success ? 0 : 1;
|
||||
|
|
|
@ -110,9 +110,7 @@ public abstract class MultiTableInputFormatBase extends
|
|||
@Override
|
||||
public void close() throws IOException {
|
||||
trr.close();
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -145,9 +143,7 @@ public abstract class MultiTableInputFormatBase extends
|
|||
// If there is an exception make sure that all
|
||||
// resources are closed and released.
|
||||
trr.close();
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
connection.close();
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -230,6 +230,8 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
|||
}
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||
justification="Don't understand why FB is complaining about this one. We do throw exception")
|
||||
private class MapRunner implements Runnable {
|
||||
private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
|
||||
private Context subcontext;
|
||||
|
@ -280,7 +282,7 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
|||
Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
|
||||
Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
|
||||
subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
|
||||
} catch (Exception ee) {
|
||||
} catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
|
||||
// rethrow as IOE
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
|
|
@ -111,6 +111,8 @@ implements Configurable {
|
|||
* org.apache.hadoop.conf.Configuration)
|
||||
*/
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||
justification="Intentional")
|
||||
public void setConf(Configuration configuration) {
|
||||
this.conf = configuration;
|
||||
|
||||
|
|
|
@ -2174,6 +2174,9 @@ public class AssignmentManager {
|
|||
}
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
|
||||
justification="Worth fixing but not the end of the world.")
|
||||
private String onRegionFailedOpen(final RegionState current,
|
||||
final HRegionInfo hri, final ServerName serverName) {
|
||||
// The region must be opening on this server.
|
||||
|
@ -2189,6 +2192,7 @@ public class AssignmentManager {
|
|||
}
|
||||
|
||||
String encodedName = hri.getEncodedName();
|
||||
// FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
|
||||
AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
|
||||
if (failedOpenCount == null) {
|
||||
failedOpenCount = new AtomicInteger();
|
||||
|
|
|
@ -61,6 +61,8 @@ public class ExpiredMobFileCleanerChore extends ScheduledChore {
|
|||
}
|
||||
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||
justification="Intentional")
|
||||
protected void chore() {
|
||||
try {
|
||||
TableDescriptors htds = master.getTableDescriptors();
|
||||
|
|
|
@ -424,8 +424,6 @@ public class RegionStates {
|
|||
if (!serverName.equals(oldServerName)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
|
||||
} else {
|
||||
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
|
||||
}
|
||||
addToServerHoldings(serverName, hri);
|
||||
addToReplicaMapping(hri);
|
||||
|
|
|
@ -643,7 +643,7 @@ public class SplitLogManager {
|
|||
public volatile ServerName cur_worker_name;
|
||||
public volatile TaskBatch batch;
|
||||
public volatile TerminationStatus status;
|
||||
public volatile int incarnation;
|
||||
public volatile AtomicInteger incarnation = new AtomicInteger(0);
|
||||
public final AtomicInteger unforcedResubmits = new AtomicInteger();
|
||||
public volatile boolean resubmitThresholdReached;
|
||||
|
||||
|
@ -655,7 +655,6 @@ public class SplitLogManager {
|
|||
}
|
||||
|
||||
public Task() {
|
||||
incarnation = 0;
|
||||
last_version = -1;
|
||||
status = IN_PROGRESS;
|
||||
setUnassigned();
|
||||
|
|
|
@ -65,12 +65,15 @@ import com.google.common.collect.Sets;
|
|||
* is likely to go aways anyways.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="TODO: synchronize access on nsTable but it is done in tiers above and this " +
|
||||
"class is going away/shrinking")
|
||||
public class TableNamespaceManager {
|
||||
private static final Log LOG = LogFactory.getLog(TableNamespaceManager.class);
|
||||
|
||||
private Configuration conf;
|
||||
private MasterServices masterServices;
|
||||
private Table nsTable = null;
|
||||
private Table nsTable = null; // FindBugs: IS2_INCONSISTENT_SYNC TODO: Access is not synchronized
|
||||
private ZKNamespaceManager zkNamespaceManager;
|
||||
private boolean initialized;
|
||||
|
||||
|
|
|
@ -554,17 +554,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
switch (action.type) {
|
||||
case NULL: break;
|
||||
case ASSIGN_REGION:
|
||||
// FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
|
||||
assert action instanceof AssignRegionAction: action.getClass();
|
||||
AssignRegionAction ar = (AssignRegionAction) action;
|
||||
regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
|
||||
regionMoved(ar.region, -1, ar.server);
|
||||
break;
|
||||
case MOVE_REGION:
|
||||
assert action instanceof MoveRegionAction: action.getClass();
|
||||
MoveRegionAction mra = (MoveRegionAction) action;
|
||||
regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
|
||||
regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
|
||||
regionMoved(mra.region, mra.fromServer, mra.toServer);
|
||||
break;
|
||||
case SWAP_REGIONS:
|
||||
assert action instanceof SwapRegionsAction: action.getClass();
|
||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||
regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
|
||||
regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
|
||||
|
@ -1095,7 +1099,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setClusterStatus(ClusterStatus st) {
|
||||
public synchronized void setClusterStatus(ClusterStatus st) {
|
||||
this.clusterStatus = st;
|
||||
regionFinder.setClusterStatus(st);
|
||||
}
|
||||
|
|
|
@ -94,6 +94,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
* so that the balancer gets the full picture of all loads on the cluster.</p>
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="Complaint is about costFunctions not being synchronized; not end of the world")
|
||||
public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||
|
||||
protected static final String STEPS_PER_REGION_KEY =
|
||||
|
@ -119,7 +121,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
private CandidateGenerator[] candidateGenerators;
|
||||
private CostFromRegionLoadFunction[] regionLoadFunctions;
|
||||
private CostFunction[] costFunctions;
|
||||
private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
|
||||
|
||||
// to save and report costs to JMX
|
||||
private Double curOverallCost = 0d;
|
||||
|
|
|
@ -96,7 +96,7 @@ public class HFileLinkCleaner extends BaseHFileCleanerDelegate {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
public synchronized void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
|
||||
// setup filesystem
|
||||
|
|
|
@ -154,6 +154,8 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
|||
* call should get implemented for each snapshot flavor.
|
||||
*/
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||
justification="Intentional")
|
||||
public void process() {
|
||||
String msg = "Running " + snapshot.getType() + " table snapshot " + snapshot.getName() + " "
|
||||
+ eventType + " on table " + snapshotTable;
|
||||
|
@ -205,7 +207,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
|||
status.markComplete(msg);
|
||||
LOG.info(msg);
|
||||
metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
|
||||
} catch (Exception e) {
|
||||
} catch (Exception e) { // FindBugs: REC_CATCH_EXCEPTION
|
||||
status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +
|
||||
snapshotTable + " because " + e.getMessage());
|
||||
String reason = "Failed taking snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot)
|
||||
|
|
|
@ -89,6 +89,8 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
|
|||
System.err.println(" familyName The column family name");
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||
justification="Intentional")
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length != 2) {
|
||||
printUsage();
|
||||
|
|
|
@ -98,7 +98,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* by this Handler.
|
||||
* @return a string representing the method call without parameters
|
||||
*/
|
||||
public String getRPC() {
|
||||
public synchronized String getRPC() {
|
||||
return getRPC(false);
|
||||
}
|
||||
|
||||
|
@ -166,7 +166,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* @return true if the monitored handler is currently servicing an RPC call
|
||||
* to a database command.
|
||||
*/
|
||||
public boolean isOperationRunning() {
|
||||
public synchronized boolean isOperationRunning() {
|
||||
if(!isRPCRunning()) {
|
||||
return false;
|
||||
}
|
||||
|
@ -212,7 +212,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
}
|
||||
|
||||
@Override
|
||||
public void markComplete(String status) {
|
||||
public synchronized void markComplete(String status) {
|
||||
super.markComplete(status);
|
||||
this.params = null;
|
||||
this.packet = null;
|
||||
|
|
|
@ -28,6 +28,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="FindBugs seems confused; says globalLimiter and lastUpdate " +
|
||||
"are mostly synchronized...but to me it looks like they are totally synchronized")
|
||||
public class QuotaState {
|
||||
protected long lastUpdate = 0;
|
||||
protected long lastQuery = 0;
|
||||
|
@ -77,7 +80,7 @@ public class QuotaState {
|
|||
* Setup the global quota information.
|
||||
* (This operation is part of the QuotaState setup)
|
||||
*/
|
||||
public void setQuotas(final Quotas quotas) {
|
||||
public synchronized void setQuotas(final Quotas quotas) {
|
||||
if (quotas.hasThrottle()) {
|
||||
globalLimiter = QuotaLimiterFactory.fromThrottle(quotas.getThrottle());
|
||||
} else {
|
||||
|
|
|
@ -45,6 +45,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="FindBugs seems confused; says limit and tlimit " +
|
||||
"are mostly synchronized...but to me it looks like they are totally synchronized")
|
||||
public abstract class RateLimiter {
|
||||
public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
|
||||
private long tunit = 1000; // Timeunit factor for translating to ms.
|
||||
|
@ -73,7 +76,7 @@ public abstract class RateLimiter {
|
|||
* @param limit The max value available resource units can be refilled to.
|
||||
* @param timeUnit Timeunit factor for translating to ms.
|
||||
*/
|
||||
public void set(final long limit, final TimeUnit timeUnit) {
|
||||
public synchronized void set(final long limit, final TimeUnit timeUnit) {
|
||||
switch (timeUnit) {
|
||||
case MILLISECONDS:
|
||||
tunit = 1;
|
||||
|
@ -99,10 +102,11 @@ public abstract class RateLimiter {
|
|||
|
||||
public String toString() {
|
||||
String rateLimiter = this.getClass().getSimpleName();
|
||||
if (limit == Long.MAX_VALUE) {
|
||||
if (getLimit() == Long.MAX_VALUE) {
|
||||
return rateLimiter + "(Bypass)";
|
||||
}
|
||||
return rateLimiter + "(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
|
||||
return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() +
|
||||
" tunit=" + getTimeUnitInMillis() + ")";
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -120,7 +124,7 @@ public abstract class RateLimiter {
|
|||
}
|
||||
|
||||
public synchronized boolean isBypass() {
|
||||
return limit == Long.MAX_VALUE;
|
||||
return getLimit() == Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
public synchronized long getLimit() {
|
||||
|
@ -131,7 +135,7 @@ public abstract class RateLimiter {
|
|||
return avail;
|
||||
}
|
||||
|
||||
protected long getTimeUnitInMillis() {
|
||||
protected synchronized long getTimeUnitInMillis() {
|
||||
return tunit;
|
||||
}
|
||||
|
||||
|
@ -195,7 +199,7 @@ public abstract class RateLimiter {
|
|||
*/
|
||||
public synchronized long waitInterval(final long amount) {
|
||||
// TODO Handle over quota?
|
||||
return (amount <= avail) ? 0 : getWaitInterval(limit, avail, amount);
|
||||
return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
|
||||
}
|
||||
|
||||
// These two method are for strictly testing purpose only
|
||||
|
|
|
@ -34,6 +34,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="FindBugs seems confused; says bypassGlobals, namepaceLimiters, and " +
|
||||
"tableLimiters are mostly synchronized...but to me it looks like they are totally synchronized")
|
||||
public class UserQuotaState extends QuotaState {
|
||||
private Map<String, QuotaLimiter> namespaceLimiters = null;
|
||||
private Map<TableName, QuotaLimiter> tableLimiters = null;
|
||||
|
@ -96,7 +99,7 @@ public class UserQuotaState extends QuotaState {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setQuotas(final Quotas quotas) {
|
||||
public synchronized void setQuotas(final Quotas quotas) {
|
||||
super.setQuotas(quotas);
|
||||
bypassGlobals = quotas.getBypassGlobals();
|
||||
}
|
||||
|
@ -105,7 +108,7 @@ public class UserQuotaState extends QuotaState {
|
|||
* Add the quota information of the specified table.
|
||||
* (This operation is part of the QuotaState setup)
|
||||
*/
|
||||
public void setQuotas(final TableName table, Quotas quotas) {
|
||||
public synchronized void setQuotas(final TableName table, Quotas quotas) {
|
||||
tableLimiters = setLimiter(tableLimiters, table, quotas);
|
||||
}
|
||||
|
||||
|
|
|
@ -393,7 +393,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
// Set when a flush has been requested.
|
||||
volatile boolean flushRequested = false;
|
||||
// Number of compactions running.
|
||||
volatile int compacting = 0;
|
||||
AtomicInteger compacting = new AtomicInteger(0);
|
||||
// Gets set in close. If set, cannot compact or flush again.
|
||||
volatile boolean writesEnabled = true;
|
||||
// Set if region is read-only
|
||||
|
@ -824,7 +824,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
|
||||
this.writestate.flushRequested = false;
|
||||
this.writestate.compacting = 0;
|
||||
this.writestate.compacting.set(0);
|
||||
|
||||
if (this.writestate.writesEnabled) {
|
||||
// Remove temporary data left over from old regions
|
||||
|
@ -1368,6 +1368,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
this.closing.set(closing);
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH",
|
||||
justification="I think FindBugs is confused")
|
||||
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
|
||||
throws IOException {
|
||||
if (isClosed()) {
|
||||
|
@ -1405,7 +1407,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
// block waiting for the lock for closing
|
||||
lock.writeLock().lock();
|
||||
lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
|
||||
this.closing.set(true);
|
||||
status.setStatus("Disabling writes for close");
|
||||
try {
|
||||
|
@ -1537,7 +1539,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
boolean interrupted = false;
|
||||
try {
|
||||
while (writestate.compacting > 0 || writestate.flushing) {
|
||||
while (writestate.compacting.get() > 0 || writestate.flushing) {
|
||||
LOG.debug("waiting for " + writestate.compacting + " compactions"
|
||||
+ (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
|
||||
try {
|
||||
|
@ -1894,7 +1896,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
synchronized (writestate) {
|
||||
if (writestate.writesEnabled) {
|
||||
wasStateSet = true;
|
||||
++writestate.compacting;
|
||||
writestate.compacting.incrementAndGet();
|
||||
} else {
|
||||
String msg = "NOT compacting region " + this + ". Writes disabled.";
|
||||
LOG.info(msg);
|
||||
|
@ -1920,8 +1922,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
} finally {
|
||||
if (wasStateSet) {
|
||||
synchronized (writestate) {
|
||||
--writestate.compacting;
|
||||
if (writestate.compacting <= 0) {
|
||||
writestate.compacting.decrementAndGet();
|
||||
if (writestate.compacting.get() <= 0) {
|
||||
writestate.notifyAll();
|
||||
}
|
||||
}
|
||||
|
@ -2164,6 +2166,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
|
||||
justification="FindBugs seems confused about trxId")
|
||||
protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
|
||||
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
|
||||
throws IOException {
|
||||
|
@ -2395,6 +2399,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return false;
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||
justification="Intentional; notify is about completed flush")
|
||||
protected FlushResult internalFlushCacheAndCommit(
|
||||
final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
|
||||
final Collection<Store> storesToFlush)
|
||||
|
@ -4448,6 +4454,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||
justification="Intentional; post memstore flush")
|
||||
void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
|
||||
MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
|
||||
|
||||
|
@ -4684,6 +4692,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return prepareFlushResult;
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||
justification="Intentional; cleared the memstore")
|
||||
void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
|
||||
checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
|
||||
"RegionEvent marker from WAL ", regionEvent);
|
||||
|
@ -4914,6 +4924,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
return refreshStoreFiles(false);
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||
justification="Notify is about post replay. Intentional")
|
||||
protected boolean refreshStoreFiles(boolean force) throws IOException {
|
||||
if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
|
||||
return false; // if primary nothing to do
|
||||
|
@ -7838,6 +7850,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
|
||||
justification="Intentional")
|
||||
public void startRegionOperation(Operation op) throws IOException {
|
||||
switch (op) {
|
||||
case GET: // read operations
|
||||
|
|
|
@ -189,7 +189,7 @@ implements HeapSize, Map<K,V> {
|
|||
*
|
||||
* @return currently available bytes
|
||||
*/
|
||||
public long getMemFree() {
|
||||
public synchronized long getMemFree() {
|
||||
return memFree;
|
||||
}
|
||||
|
||||
|
@ -208,7 +208,7 @@ implements HeapSize, Map<K,V> {
|
|||
* @return currently used memory in bytes
|
||||
*/
|
||||
public long getMemUsed() {
|
||||
return (memTotal - memFree); // FindBugs IS2_INCONSISTENT_SYNC
|
||||
return (memTotal - getMemFree()); // FindBugs IS2_INCONSISTENT_SYNC
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -227,7 +227,7 @@ implements HeapSize, Map<K,V> {
|
|||
*
|
||||
* @return number of misses
|
||||
*/
|
||||
public long getMissCount() {
|
||||
public synchronized long getMissCount() {
|
||||
return missCount; // FindBugs IS2_INCONSISTENT_SYNC
|
||||
}
|
||||
|
||||
|
@ -239,7 +239,7 @@ implements HeapSize, Map<K,V> {
|
|||
*/
|
||||
public double getHitRatio() {
|
||||
return (double)((double)hitCount/
|
||||
((double)(hitCount+missCount)));
|
||||
((double)(hitCount + getMissCount())));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -269,7 +269,7 @@ implements HeapSize, Map<K,V> {
|
|||
* @return memory usage of map in bytes
|
||||
*/
|
||||
public long heapSize() {
|
||||
return (memTotal - memFree);
|
||||
return (memTotal - getMemFree());
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
@ -824,6 +824,8 @@ implements HeapSize, Map<K,V> {
|
|||
*
|
||||
* @return Set of entries in hash
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||
justification="Unused debugging function that reads only")
|
||||
public Set<Entry<K,V>> entryTableSet() {
|
||||
Set<Entry<K,V>> entrySet = new HashSet<Entry<K,V>>();
|
||||
Entry [] table = entries; // FindBugs IS2_INCONSISTENT_SYNC
|
||||
|
|
|
@ -60,7 +60,7 @@ public class MemStoreChunkPool {
|
|||
final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
|
||||
|
||||
// Static reference to the MemStoreChunkPool
|
||||
private static MemStoreChunkPool globalInstance;
|
||||
private static MemStoreChunkPool GLOBAL_INSTANCE;
|
||||
/** Boolean whether we have disabled the memstore chunk pool entirely. */
|
||||
static boolean chunkPoolDisabled = false;
|
||||
|
||||
|
@ -179,12 +179,14 @@ public class MemStoreChunkPool {
|
|||
* @param conf
|
||||
* @return the global MemStoreChunkPool instance
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DC_DOUBLECHECK",
|
||||
justification="Intentional")
|
||||
static MemStoreChunkPool getPool(Configuration conf) {
|
||||
if (globalInstance != null) return globalInstance;
|
||||
if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
|
||||
|
||||
synchronized (MemStoreChunkPool.class) {
|
||||
if (chunkPoolDisabled) return null;
|
||||
if (globalInstance != null) return globalInstance;
|
||||
if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
|
||||
float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
|
||||
if (poolSizePercentage <= 0) {
|
||||
chunkPoolDisabled = true;
|
||||
|
@ -210,8 +212,8 @@ public class MemStoreChunkPool {
|
|||
int initialCount = (int) (initialCountPercentage * maxCount);
|
||||
LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
|
||||
+ ", max count " + maxCount + ", initial count " + initialCount);
|
||||
globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
|
||||
return globalInstance;
|
||||
GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
|
||||
return GLOBAL_INSTANCE;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -278,6 +278,8 @@ public class RegionServerCoprocessorHost extends
|
|||
|
||||
private RegionServerServices regionServerServices;
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
|
||||
justification="Intentional; FB has trouble detecting isAssignableFrom")
|
||||
public RegionServerEnvironment(final Class<?> implClass,
|
||||
final Coprocessor impl, final int priority, final int seq,
|
||||
final Configuration conf, final RegionServerServices services) {
|
||||
|
@ -285,7 +287,7 @@ public class RegionServerCoprocessorHost extends
|
|||
this.regionServerServices = services;
|
||||
for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
|
||||
Class<?> c = (Class<?>) itf;
|
||||
if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
|
||||
if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST
|
||||
this.regionServerServices.registerService(
|
||||
((SingletonCoprocessorService) impl).getService());
|
||||
break;
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.util.SortedSet;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -38,11 +37,11 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -51,6 +50,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
|
|||
import org.apache.hadoop.hbase.io.hfile.BlockType;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
|
@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
|
@ -623,6 +622,8 @@ public class StoreFile {
|
|||
return false;
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
|
||||
justification="Will not overflow")
|
||||
public static class WriterBuilder {
|
||||
private final Configuration conf;
|
||||
private final CacheConfig cacheConf;
|
||||
|
@ -635,7 +636,6 @@ public class StoreFile {
|
|||
private Path filePath;
|
||||
private InetSocketAddress[] favoredNodes;
|
||||
private HFileContext fileContext;
|
||||
private boolean shouldDropCacheBehind = false;
|
||||
|
||||
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
|
||||
FileSystem fs) {
|
||||
|
@ -703,8 +703,8 @@ public class StoreFile {
|
|||
return this;
|
||||
}
|
||||
|
||||
public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
|
||||
this.shouldDropCacheBehind = shouldDropCacheBehind;
|
||||
public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind/*NOT USED!!*/) {
|
||||
// TODO: HAS NO EFFECT!!! FIX!!
|
||||
return this;
|
||||
}
|
||||
/**
|
||||
|
@ -807,9 +807,6 @@ public class StoreFile {
|
|||
private Cell lastDeleteFamilyCell = null;
|
||||
private long deleteFamilyCnt = 0;
|
||||
|
||||
/** Bytes per Checksum */
|
||||
protected int bytesPerChecksum;
|
||||
|
||||
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
|
||||
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
|
||||
* When flushing a memstore, we set TimeRange and use this variable to
|
||||
|
|
|
@ -91,6 +91,8 @@ public class TimeRangeTracker implements Writable {
|
|||
* If required, update the current TimestampRange to include timestamp
|
||||
* @param timestamp the timestamp value to include
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||
justification="Intentional")
|
||||
void includeTimestamp(final long timestamp) {
|
||||
// Do test outside of synchronization block. Synchronization in here can be problematic
|
||||
// when many threads writing one Store -- they can all pile up trying to add in here.
|
||||
|
|
|
@ -332,7 +332,7 @@ public abstract class Compactor {
|
|||
|
||||
throughputController.start(compactionName);
|
||||
KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
|
||||
int minFilesToCompact = Math.max(2,
|
||||
long minFilesToCompact = Math.max(2L,
|
||||
conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY,
|
||||
/* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3)));
|
||||
long shippedCallSizeLimit = (long) minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE;
|
||||
|
|
|
@ -43,18 +43,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
|||
* target cluster is an HBase cluster.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||
justification="Thinks zkw needs to be synchronized access but should be fine as is.")
|
||||
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||
implements Abortable {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
|
||||
|
||||
private ZooKeeperWatcher zkw = null;
|
||||
private ZooKeeperWatcher zkw = null; // FindBugs: MT_CORRECTNESS
|
||||
|
||||
private List<ServerName> regionServers = new ArrayList<ServerName>(0);
|
||||
private volatile long lastRegionServerUpdate;
|
||||
private long lastRegionServerUpdate;
|
||||
|
||||
protected void disconnect() {
|
||||
if (zkw != null){
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
|
@ -181,7 +183,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
* Set the list of region servers for that peer
|
||||
* @param regionServers list of addresses for the region servers
|
||||
*/
|
||||
public void setRegionServers(List<ServerName> regionServers) {
|
||||
public synchronized void setRegionServers(List<ServerName> regionServers) {
|
||||
this.regionServers = regionServers;
|
||||
lastRegionServerUpdate = System.currentTimeMillis();
|
||||
}
|
||||
|
|
|
@ -115,7 +115,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
|||
* Skips the entries which has original seqId. Only entries persisted via distributed log replay
|
||||
* have their original seq Id fields set.
|
||||
*/
|
||||
private class SkipReplayedEditsFilter extends BaseWALEntryFilter {
|
||||
private static class SkipReplayedEditsFilter extends BaseWALEntryFilter {
|
||||
@Override
|
||||
public Entry filter(Entry entry) {
|
||||
// if orig seq id is set, skip replaying the entry
|
||||
|
|
|
@ -280,7 +280,6 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
|||
Configuration conf = env.getConfiguration();
|
||||
fs = FileSystem.get(conf);
|
||||
for(Pair<byte[], String> el: familyPaths) {
|
||||
Path p = new Path(el.getSecond());
|
||||
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
|
||||
if(!fs.exists(stageFamily)) {
|
||||
fs.mkdirs(stageFamily);
|
||||
|
|
|
@ -336,12 +336,6 @@ public class TableAuthManager implements Closeable {
|
|||
return false;
|
||||
}
|
||||
|
||||
private boolean authorize(List<TablePermission> perms,
|
||||
TableName table, byte[] family,
|
||||
Permission.Action action) {
|
||||
return authorize(perms, table, family, null, action);
|
||||
}
|
||||
|
||||
private boolean authorize(List<TablePermission> perms,
|
||||
TableName table, byte[] family,
|
||||
byte[] qualifier, Permission.Action action) {
|
||||
|
|
|
@ -124,7 +124,7 @@ public class AuthenticationTokenSecretManager
|
|||
}
|
||||
|
||||
@Override
|
||||
protected byte[] createPassword(AuthenticationTokenIdentifier identifier) {
|
||||
protected synchronized byte[] createPassword(AuthenticationTokenIdentifier identifier) {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
AuthenticationKey secretKey = currentKey;
|
||||
identifier.setKeyId(secretKey.getKeyId());
|
||||
|
@ -229,7 +229,7 @@ public class AuthenticationTokenSecretManager
|
|||
return true;
|
||||
}
|
||||
|
||||
AuthenticationKey getCurrentKey() {
|
||||
synchronized AuthenticationKey getCurrentKey() {
|
||||
return currentKey;
|
||||
}
|
||||
|
||||
|
@ -338,8 +338,11 @@ public class AuthenticationTokenSecretManager
|
|||
|
||||
// clear any expired
|
||||
removeExpiredKeys();
|
||||
|
||||
if (lastKeyUpdate + keyUpdateInterval < now) {
|
||||
long localLastKeyUpdate;
|
||||
synchronized (this) {
|
||||
localLastKeyUpdate = lastKeyUpdate;
|
||||
}
|
||||
if (localLastKeyUpdate + keyUpdateInterval < now) {
|
||||
// roll a new master key
|
||||
rollCurrentKey();
|
||||
}
|
||||
|
|
|
@ -274,8 +274,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
|
|||
// Read the entire labels table and populate the zk
|
||||
if (e.getEnvironment().getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
|
||||
this.labelsRegion = true;
|
||||
this.accessControllerAvailable = CoprocessorHost.getLoadedCoprocessors()
|
||||
synchronized (this) {
|
||||
this.accessControllerAvailable = CoprocessorHost.getLoadedCoprocessors()
|
||||
.contains(AccessController.class.getName());
|
||||
}
|
||||
// Defer the init of VisibilityLabelService on labels region until it is in recovering state.
|
||||
if (!e.getEnvironment().getRegion().isRecovering()) {
|
||||
initVisibilityLabelService(e.getEnvironment());
|
||||
|
|
|
@ -643,7 +643,6 @@ public class ExportSnapshot extends Configured implements Tool {
|
|||
@Override
|
||||
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
|
||||
Configuration conf = context.getConfiguration();
|
||||
String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
|
||||
Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
|
||||
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
|
||||
|
||||
|
|
|
@ -280,6 +280,8 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
private SnapshotManifest snapshotManifest;
|
||||
|
||||
@Override
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||
justification="Intentional")
|
||||
public int run(String[] args) throws IOException, InterruptedException {
|
||||
final Configuration conf = getConf();
|
||||
boolean listSnapshots = false;
|
||||
|
@ -317,7 +319,7 @@ public final class SnapshotInfo extends Configured implements Tool {
|
|||
printUsageAndExit();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
printUsageAndExit();
|
||||
printUsageAndExit(); // FindBugs: REC_CATCH_EXCEPTION
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3564,9 +3564,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
|||
this.metaEntry = metaEntry;
|
||||
}
|
||||
|
||||
public int getReplicaId() {
|
||||
if (metaEntry != null) return metaEntry.getReplicaId();
|
||||
return deployedReplicaId;
|
||||
public synchronized int getReplicaId() {
|
||||
return metaEntry != null? metaEntry.getReplicaId(): deployedReplicaId;
|
||||
}
|
||||
|
||||
public synchronized void addServer(HRegionInfo hri, ServerName server) {
|
||||
|
|
|
@ -68,12 +68,17 @@ public class IdReadWriteLock {
|
|||
/** For testing */
|
||||
@VisibleForTesting
|
||||
int purgeAndGetEntryPoolSize() {
|
||||
System.gc();
|
||||
gc();
|
||||
Threads.sleep(200);
|
||||
lockPool.purge();
|
||||
return lockPool.size();
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
|
||||
private void gc() {
|
||||
System.gc();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
|
||||
for (ReentrantReadWriteLock readWriteLock;;) {
|
||||
|
|
|
@ -104,11 +104,8 @@ public class MetaUtils {
|
|||
* @return HRegion for meta region
|
||||
* @throws IOException e
|
||||
*/
|
||||
public HRegion getMetaRegion() throws IOException {
|
||||
if (this.metaRegion == null) {
|
||||
openMetaRegion();
|
||||
}
|
||||
return this.metaRegion;
|
||||
public synchronized HRegion getMetaRegion() throws IOException {
|
||||
return this.metaRegion == null? openMetaRegion(): this.metaRegion;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -455,9 +455,11 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
}
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
|
||||
justification="FB is wrong; its size is read")
|
||||
private void unloadRegions(Admin admin, String server, ArrayList<String> regionServers,
|
||||
boolean ack, List<HRegionInfo> movedRegions) throws Exception {
|
||||
List<HRegionInfo> regionsToMove = new ArrayList<HRegionInfo>();
|
||||
List<HRegionInfo> regionsToMove = new ArrayList<HRegionInfo>();// FindBugs: DLS_DEAD_LOCAL_STORE
|
||||
regionsToMove = getRegions(this.conf, server);
|
||||
if (regionsToMove.size() == 0) {
|
||||
LOG.info("No Regions to move....Quitting now");
|
||||
|
@ -597,7 +599,7 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
* Move Regions without Acknowledging.Usefule in case of RS shutdown as we might want to shut the
|
||||
* RS down anyways and not abort on a stuck region. Improves movement performance
|
||||
*/
|
||||
private class MoveWithoutAck implements Callable<Boolean> {
|
||||
private static class MoveWithoutAck implements Callable<Boolean> {
|
||||
private Admin admin;
|
||||
private HRegionInfo region;
|
||||
private String targetServer;
|
||||
|
@ -764,7 +766,7 @@ public class RegionMover extends AbstractHBaseTool {
|
|||
try {
|
||||
br = new BufferedReader(new FileReader(f));
|
||||
while ((line = br.readLine()) != null) {
|
||||
line.trim();
|
||||
line = line.trim();
|
||||
if (!line.equals("")) {
|
||||
excludeServers.add(line);
|
||||
}
|
||||
|
|
|
@ -46,6 +46,8 @@ import java.util.RandomAccess;
|
|||
* <p>
|
||||
* Iterators are read-only. They cannot be used to remove elements.
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UG_SYNC_SET_UNSYNC_GET",
|
||||
justification="TODO: synchronization in here needs review!!!")
|
||||
public class SortedList<E> implements List<E>, RandomAccess {
|
||||
private volatile List<E> list;
|
||||
private final Comparator<? super E> comparator;
|
||||
|
@ -80,7 +82,7 @@ public class SortedList<E> implements List<E>, RandomAccess {
|
|||
* method to get a reference for iterating over using the RandomAccess
|
||||
* pattern.
|
||||
*/
|
||||
public List<E> get() {
|
||||
public List<E> get() { // FindBugs: UG_SYNC_SET_UNSYNC_GET complaint. Fix!!
|
||||
return list;
|
||||
}
|
||||
|
||||
|
@ -185,7 +187,7 @@ public class SortedList<E> implements List<E>, RandomAccess {
|
|||
}
|
||||
|
||||
@Override
|
||||
public E get(int index) {
|
||||
public synchronized E get(int index) {
|
||||
return list.get(index);
|
||||
}
|
||||
|
||||
|
|
|
@ -345,7 +345,7 @@ public class TestSplitLogManager {
|
|||
assertTrue(task == task2);
|
||||
LOG.debug("task = " + task);
|
||||
assertEquals(1L, tot_mgr_resubmit.get());
|
||||
assertEquals(1, task.incarnation);
|
||||
assertEquals(1, task.incarnation.get());
|
||||
assertEquals(0, task.unforcedResubmits.get());
|
||||
assertTrue(task.isOrphan());
|
||||
assertTrue(task.isUnassigned());
|
||||
|
|
Loading…
Reference in New Issue