HBASE-15118 Fix findbugs complaint in hbase-server
This commit is contained in:
parent
d9fd87d5a8
commit
d6654ea7ea
|
@ -30,13 +30,11 @@ public class PrettyPrinter {
|
||||||
NONE
|
NONE
|
||||||
}
|
}
|
||||||
|
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_BOXED_PRIMITIVE_FOR_PARSING",
|
|
||||||
justification="I don't get what FB is complaining about")
|
|
||||||
public static String format(final String value, final Unit unit) {
|
public static String format(final String value, final Unit unit) {
|
||||||
StringBuilder human = new StringBuilder();
|
StringBuilder human = new StringBuilder();
|
||||||
switch (unit) {
|
switch (unit) {
|
||||||
case TIME_INTERVAL:
|
case TIME_INTERVAL:
|
||||||
human.append(humanReadableTTL(Long.valueOf(value)));
|
human.append(humanReadableTTL(Long.parseLong(value)));
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
human.append(value);
|
human.append(value);
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class JMXListener implements Coprocessor {
|
||||||
* only 1 JMX instance is allowed, otherwise there is port conflict even if
|
* only 1 JMX instance is allowed, otherwise there is port conflict even if
|
||||||
* we only load regionserver coprocessor on master
|
* we only load regionserver coprocessor on master
|
||||||
*/
|
*/
|
||||||
private static JMXConnectorServer jmxCS = null;
|
private static JMXConnectorServer JMX_CS = null;
|
||||||
|
|
||||||
public static JMXServiceURL buildJMXServiceURL(int rmiRegistryPort,
|
public static JMXServiceURL buildJMXServiceURL(int rmiRegistryPort,
|
||||||
int rmiConnectorPort) throws IOException {
|
int rmiConnectorPort) throws IOException {
|
||||||
|
@ -137,8 +137,13 @@ public class JMXListener implements Coprocessor {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Start the JMXListener with the connection string
|
// Start the JMXListener with the connection string
|
||||||
jmxCS = JMXConnectorServerFactory.newJMXConnectorServer(serviceUrl, jmxEnv, mbs);
|
synchronized(JMXListener.class) {
|
||||||
jmxCS.start();
|
if (JMX_CS != null) {
|
||||||
|
throw new RuntimeException("Started by another thread?");
|
||||||
|
}
|
||||||
|
JMX_CS = JMXConnectorServerFactory.newJMXConnectorServer(serviceUrl, jmxEnv, mbs);
|
||||||
|
JMX_CS.start();
|
||||||
|
}
|
||||||
LOG.info("ConnectorServer started!");
|
LOG.info("ConnectorServer started!");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("fail to start connector server!", e);
|
LOG.error("fail to start connector server!", e);
|
||||||
|
@ -148,10 +153,10 @@ public class JMXListener implements Coprocessor {
|
||||||
|
|
||||||
public void stopConnectorServer() throws IOException {
|
public void stopConnectorServer() throws IOException {
|
||||||
synchronized(JMXListener.class) {
|
synchronized(JMXListener.class) {
|
||||||
if (jmxCS != null) {
|
if (JMX_CS != null) {
|
||||||
jmxCS.stop();
|
JMX_CS.stop();
|
||||||
LOG.info("ConnectorServer stopped!");
|
LOG.info("ConnectorServer stopped!");
|
||||||
jmxCS = null;
|
JMX_CS = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,7 +191,7 @@ public class JMXListener implements Coprocessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized(JMXListener.class) {
|
synchronized(JMXListener.class) {
|
||||||
if (jmxCS != null) {
|
if (JMX_CS != null) {
|
||||||
LOG.info("JMXListener has been started at Registry port " + rmiRegistryPort);
|
LOG.info("JMXListener has been started at Registry port " + rmiRegistryPort);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -175,7 +175,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Task task = findOrCreateOrphanTask(path);
|
Task task = findOrCreateOrphanTask(path);
|
||||||
if (task.isOrphan() && (task.incarnation == 0)) {
|
if (task.isOrphan() && (task.incarnation.get() == 0)) {
|
||||||
LOG.info("resubmitting unassigned orphan task " + path);
|
LOG.info("resubmitting unassigned orphan task " + path);
|
||||||
// ignore failure to resubmit. The timeout-monitor will handle it later
|
// ignore failure to resubmit. The timeout-monitor will handle it later
|
||||||
// albeit in a more crude fashion
|
// albeit in a more crude fashion
|
||||||
|
@ -228,7 +228,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
|
||||||
version = -1;
|
version = -1;
|
||||||
}
|
}
|
||||||
LOG.info("resubmitting task " + path);
|
LOG.info("resubmitting task " + path);
|
||||||
task.incarnation++;
|
task.incarnation.incrementAndGet();
|
||||||
boolean result = resubmit(this.details.getServerName(), path, version);
|
boolean result = resubmit(this.details.getServerName(), path, version);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
|
task.heartbeatNoDetails(EnvironmentEdgeManager.currentTime());
|
||||||
|
|
|
@ -140,6 +140,8 @@ public class ZKSplitTransactionCoordination implements SplitTransactionCoordinat
|
||||||
* the node is removed or is not in pending_split state any more, we abort the split.
|
* the node is removed or is not in pending_split state any more, we abort the split.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Intended")
|
||||||
public void waitForSplitTransaction(final RegionServerServices services, Region parent,
|
public void waitForSplitTransaction(final RegionServerServices services, Region parent,
|
||||||
HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
|
HRegionInfo hri_a, HRegionInfo hri_b, SplitTransactionDetails sptd) throws IOException {
|
||||||
ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
|
ZkSplitTransactionDetails zstd = (ZkSplitTransactionDetails) sptd;
|
||||||
|
|
|
@ -92,6 +92,8 @@ public class ZkRegionMergeCoordination implements RegionMergeCoordination {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Intended")
|
||||||
public void waitForRegionMergeTransaction(RegionServerServices services,
|
public void waitForRegionMergeTransaction(RegionServerServices services,
|
||||||
HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
|
HRegionInfo mergedRegionInfo, HRegion region_a, HRegion region_b, RegionMergeDetails details)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
||||||
private TaskExecutor splitTaskExecutor;
|
private TaskExecutor splitTaskExecutor;
|
||||||
|
|
||||||
private final Object taskReadyLock = new Object();
|
private final Object taskReadyLock = new Object();
|
||||||
volatile int taskReadySeq = 0;
|
private AtomicInteger taskReadySeq = new AtomicInteger(0);
|
||||||
private volatile String currentTask = null;
|
private volatile String currentTask = null;
|
||||||
private int currentVersion;
|
private int currentVersion;
|
||||||
private volatile boolean shouldStop = false;
|
private volatile boolean shouldStop = false;
|
||||||
|
@ -106,7 +106,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
||||||
if (path.equals(watcher.splitLogZNode)) {
|
if (path.equals(watcher.splitLogZNode)) {
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("tasks arrived or departed on " + path);
|
if (LOG.isTraceEnabled()) LOG.trace("tasks arrived or departed on " + path);
|
||||||
synchronized (taskReadyLock) {
|
synchronized (taskReadyLock) {
|
||||||
taskReadySeq++;
|
this.taskReadySeq.incrementAndGet();
|
||||||
taskReadyLock.notify();
|
taskReadyLock.notify();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -400,14 +400,14 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
||||||
* policy puts an upper-limit on the number of simultaneous log splitting that could be happening
|
* policy puts an upper-limit on the number of simultaneous log splitting that could be happening
|
||||||
* in a cluster.
|
* in a cluster.
|
||||||
* <p>
|
* <p>
|
||||||
* Synchronization using {@link #taskReadyLock} ensures that it will try to grab every task that
|
* Synchronization using <code>taskReadyLock</code> ensures that it will try to grab every task
|
||||||
* has been put up
|
* that has been put up
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void taskLoop() throws InterruptedException {
|
public void taskLoop() throws InterruptedException {
|
||||||
while (!shouldStop) {
|
while (!shouldStop) {
|
||||||
int seq_start = taskReadySeq;
|
int seq_start = taskReadySeq.get();
|
||||||
List<String> paths = null;
|
List<String> paths = null;
|
||||||
paths = getTaskList();
|
paths = getTaskList();
|
||||||
if (paths == null) {
|
if (paths == null) {
|
||||||
|
@ -441,7 +441,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
||||||
}
|
}
|
||||||
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
|
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
|
||||||
synchronized (taskReadyLock) {
|
synchronized (taskReadyLock) {
|
||||||
while (seq_start == taskReadySeq) {
|
while (seq_start == taskReadySeq.get()) {
|
||||||
taskReadyLock.wait(checkInterval);
|
taskReadyLock.wait(checkInterval);
|
||||||
if (server != null) {
|
if (server != null) {
|
||||||
// check to see if we have stale recovering regions in our internal memory state
|
// check to see if we have stale recovering regions in our internal memory state
|
||||||
|
@ -527,7 +527,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getTaskReadySeq() {
|
public int getTaskReadySeq() {
|
||||||
return taskReadySeq;
|
return taskReadySeq.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -152,6 +152,8 @@ public class JMXJsonServlet extends HttpServlet {
|
||||||
* The servlet response we are creating
|
* The servlet response we are creating
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="XSS_REQUEST_PARAMETER_TO_SERVLET_WRITER",
|
||||||
|
justification="TODO: See HBASE-15122")
|
||||||
public void doGet(HttpServletRequest request, HttpServletResponse response) {
|
public void doGet(HttpServletRequest request, HttpServletResponse response) {
|
||||||
try {
|
try {
|
||||||
if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), request, response)) {
|
if (!HttpServer.isInstrumentationAccessAllowed(getServletContext(), request, response)) {
|
||||||
|
|
|
@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.util.Pair;
|
||||||
* it fallbacks to the archived path.
|
* it fallbacks to the archived path.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_DOESNT_OVERRIDE_EQUALS",
|
||||||
|
justification="To be fixed but warning suppressed for now")
|
||||||
public class HFileLink extends FileLink {
|
public class HFileLink extends FileLink {
|
||||||
private static final Log LOG = LogFactory.getLog(HFileLink.class);
|
private static final Log LOG = LogFactory.getLog(HFileLink.class);
|
||||||
|
|
||||||
|
|
|
@ -20,14 +20,14 @@ package org.apache.hadoop.hbase.io.hfile;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.conf.Configurable;
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
|
@ -37,15 +37,12 @@ import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||||
* Common functionality needed by all versions of {@link HFile} readers.
|
* Common functionality needed by all versions of {@link HFile} readers.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
|
|
||||||
public abstract class AbstractHFileReader
|
public abstract class AbstractHFileReader
|
||||||
implements HFile.Reader, Configurable {
|
implements HFile.Reader, Configurable {
|
||||||
/** Stream to read from. Does checksum verifications in file system */
|
/** Stream to read from. Does checksum verifications in file system */
|
||||||
protected FSDataInputStream istream; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
|
|
||||||
|
|
||||||
/** The file system stream of the underlying {@link HFile} that
|
/** The file system stream of the underlying {@link HFile} that
|
||||||
* does not do checksum verification in the file system */
|
* does not do checksum verification in the file system */
|
||||||
protected FSDataInputStream istreamNoFsChecksum; // UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD
|
|
||||||
|
|
||||||
/** Data block index reader keeping the root data index in memory */
|
/** Data block index reader keeping the root data index in memory */
|
||||||
protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
|
protected HFileBlockIndex.BlockIndexReader dataBlockIndexReader;
|
||||||
|
@ -289,7 +286,7 @@ public abstract class AbstractHFileReader
|
||||||
protected int currMemstoreTSLen;
|
protected int currMemstoreTSLen;
|
||||||
protected long currMemstoreTS;
|
protected long currMemstoreTS;
|
||||||
|
|
||||||
protected int blockFetches;
|
protected AtomicInteger blockFetches = new AtomicInteger();
|
||||||
|
|
||||||
protected final HFile.Reader reader;
|
protected final HFile.Reader reader;
|
||||||
|
|
||||||
|
|
|
@ -468,6 +468,8 @@ public class HFile {
|
||||||
* @return an appropriate instance of HFileReader
|
* @return an appropriate instance of HFileReader
|
||||||
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
|
* @throws IOException If file is invalid, will throw CorruptHFileException flavored IOException
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
|
||||||
|
justification="Intentional")
|
||||||
private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
|
private static Reader pickReaderVersion(Path path, FSDataInputStreamWrapper fsdis,
|
||||||
long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
|
long size, CacheConfig cacheConf, HFileSystem hfs, Configuration conf) throws IOException {
|
||||||
FixedFileTrailer trailer = null;
|
FixedFileTrailer trailer = null;
|
||||||
|
|
|
@ -83,6 +83,8 @@ import com.google.common.base.Preconditions;
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="HE_EQUALS_USE_HASHCODE",
|
||||||
|
justification="Fix!!! Fine for now bug FIXXXXXXX!!!!")
|
||||||
public class HFileBlock implements Cacheable {
|
public class HFileBlock implements Cacheable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -25,7 +25,6 @@ import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -34,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||||
import org.apache.hadoop.hbase.NoTagsKeyValue;
|
import org.apache.hadoop.hbase.NoTagsKeyValue;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
||||||
|
@ -692,6 +692,8 @@ public class HFileReaderV2 extends AbstractHFileReader {
|
||||||
* @return the next block, or null if there are no more data blocks
|
* @return the next block, or null if there are no more data blocks
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
|
||||||
|
justification="Yeah, unnecessary null check; could do w/ clean up")
|
||||||
protected HFileBlock readNextDataBlock() throws IOException {
|
protected HFileBlock readNextDataBlock() throws IOException {
|
||||||
long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
|
long lastDataBlockOffset = reader.getTrailer().getLastDataBlockOffset();
|
||||||
if (block == null)
|
if (block == null)
|
||||||
|
@ -700,8 +702,9 @@ public class HFileReaderV2 extends AbstractHFileReader {
|
||||||
HFileBlock curBlock = block;
|
HFileBlock curBlock = block;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
if (curBlock.getOffset() >= lastDataBlockOffset)
|
if (curBlock.getOffset() >= lastDataBlockOffset) {
|
||||||
return null;
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
if (curBlock.getOffset() < 0) {
|
if (curBlock.getOffset() < 0) {
|
||||||
throw new IOException("Invalid block file offset: " + block);
|
throw new IOException("Invalid block file offset: " + block);
|
||||||
|
@ -943,7 +946,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
|
||||||
|
|
||||||
blockBuffer = block.getBufferWithoutHeader();
|
blockBuffer = block.getBufferWithoutHeader();
|
||||||
readKeyValueLen();
|
readKeyValueLen();
|
||||||
blockFetches++;
|
blockFetches.incrementAndGet();
|
||||||
|
|
||||||
// Reset the next indexed key
|
// Reset the next indexed key
|
||||||
this.nextIndexedKey = null;
|
this.nextIndexedKey = null;
|
||||||
|
@ -1205,7 +1208,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
|
seeker.setCurrentBuffer(getEncodedBuffer(newBlock));
|
||||||
blockFetches++;
|
blockFetches.incrementAndGet();
|
||||||
|
|
||||||
// Reset the next indexed key
|
// Reset the next indexed key
|
||||||
this.nextIndexedKey = null;
|
this.nextIndexedKey = null;
|
||||||
|
|
|
@ -321,6 +321,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
private long responseBlockSize = 0;
|
private long responseBlockSize = 0;
|
||||||
private boolean retryImmediatelySupported;
|
private boolean retryImmediatelySupported;
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
|
||||||
|
justification="Can't figure why this complaint is happening... see below")
|
||||||
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
|
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
|
||||||
Message param, CellScanner cellScanner, Connection connection, Responder responder,
|
Message param, CellScanner cellScanner, Connection connection, Responder responder,
|
||||||
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
|
long size, TraceInfo tinfo, final InetAddress remoteAddress) {
|
||||||
|
@ -338,15 +340,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
this.isError = false;
|
this.isError = false;
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.tinfo = tinfo;
|
this.tinfo = tinfo;
|
||||||
this.user = connection == null ? null : connection.user;
|
this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
|
||||||
this.remoteAddress = remoteAddress;
|
this.remoteAddress = remoteAddress;
|
||||||
this.retryImmediatelySupported = connection.retryImmediatelySupported;
|
this.retryImmediatelySupported =
|
||||||
|
connection == null? null: connection.retryImmediatelySupported;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Call is done. Execution happened and we returned results to client. It is now safe to
|
* Call is done. Execution happened and we returned results to client. It is now safe to
|
||||||
* cleanup.
|
* cleanup.
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="Presume the lock on processing request held by caller is protection enough")
|
||||||
void done() {
|
void done() {
|
||||||
if (this.cellBlock != null && reservoir != null) {
|
if (this.cellBlock != null && reservoir != null) {
|
||||||
// Return buffer to reservoir now we are done with it.
|
// Return buffer to reservoir now we are done with it.
|
||||||
|
@ -588,7 +593,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
return connection.getVersionInfo();
|
return connection.getVersionInfo();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isRetryImmediatelySupported() {
|
public boolean isRetryImmediatelySupported() {
|
||||||
return retryImmediatelySupported;
|
return retryImmediatelySupported;
|
||||||
|
@ -764,6 +768,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="selector access is not synchronized; seems fine but concerned changing " +
|
||||||
|
"it will have per impact")
|
||||||
public void run() {
|
public void run() {
|
||||||
LOG.info(getName() + ": starting");
|
LOG.info(getName() + ": starting");
|
||||||
while (running) {
|
while (running) {
|
||||||
|
@ -1265,15 +1272,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
private boolean useWrap = false;
|
private boolean useWrap = false;
|
||||||
// Fake 'call' for failed authorization response
|
// Fake 'call' for failed authorization response
|
||||||
private static final int AUTHORIZATION_FAILED_CALLID = -1;
|
private static final int AUTHORIZATION_FAILED_CALLID = -1;
|
||||||
private final Call authFailedCall =
|
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null, null,
|
||||||
new Call(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, null, 0, null,
|
null, null, this, null, 0, null, null);
|
||||||
null);
|
|
||||||
private ByteArrayOutputStream authFailedResponse =
|
private ByteArrayOutputStream authFailedResponse =
|
||||||
new ByteArrayOutputStream();
|
new ByteArrayOutputStream();
|
||||||
// Fake 'call' for SASL context setup
|
// Fake 'call' for SASL context setup
|
||||||
private static final int SASL_CALLID = -33;
|
private static final int SASL_CALLID = -33;
|
||||||
private final Call saslCall =
|
private final Call saslCall = new Call(SASL_CALLID, null, null, null, null, null, this, null,
|
||||||
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
|
0, null, null);
|
||||||
|
|
||||||
// was authentication allowed with a fallback to simple auth
|
// was authentication allowed with a fallback to simple auth
|
||||||
private boolean authenticatedWithFallback;
|
private boolean authenticatedWithFallback;
|
||||||
|
@ -2165,7 +2171,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void refreshAuthManager(PolicyProvider pp) {
|
public synchronized void refreshAuthManager(PolicyProvider pp) {
|
||||||
// Ignore warnings that this should be accessed in a static way instead of via an instance;
|
// Ignore warnings that this should be accessed in a static way instead of via an instance;
|
||||||
// it'll break if you go via static route.
|
// it'll break if you go via static route.
|
||||||
this.authManager.refresh(this.conf, pp);
|
this.authManager.refresh(this.conf, pp);
|
||||||
|
@ -2391,7 +2397,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
* @throws org.apache.hadoop.security.authorize.AuthorizationException
|
* @throws org.apache.hadoop.security.authorize.AuthorizationException
|
||||||
* when the client isn't authorized to talk the protocol
|
* when the client isn't authorized to talk the protocol
|
||||||
*/
|
*/
|
||||||
public void authorize(UserGroupInformation user, ConnectionHeader connection, InetAddress addr)
|
public synchronized void authorize(UserGroupInformation user, ConnectionHeader connection,
|
||||||
|
InetAddress addr)
|
||||||
throws AuthorizationException {
|
throws AuthorizationException {
|
||||||
if (authorize) {
|
if (authorize) {
|
||||||
Class<?> c = getServiceInterface(services, connection.getServiceName());
|
Class<?> c = getServiceInterface(services, connection.getServiceName());
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -135,17 +134,18 @@ public class HashTable extends Configured implements Tool {
|
||||||
p.setProperty("endTimestamp", Long.toString(endTime));
|
p.setProperty("endTimestamp", Long.toString(endTime));
|
||||||
}
|
}
|
||||||
|
|
||||||
FSDataOutputStream out = fs.create(path);
|
try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
|
||||||
p.store(new OutputStreamWriter(out, Charsets.UTF_8), null);
|
p.store(osw, null);
|
||||||
out.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void readPropertiesFile(FileSystem fs, Path path) throws IOException {
|
void readPropertiesFile(FileSystem fs, Path path) throws IOException {
|
||||||
FSDataInputStream in = fs.open(path);
|
|
||||||
Properties p = new Properties();
|
Properties p = new Properties();
|
||||||
p.load(new InputStreamReader(in, Charsets.UTF_8));
|
try (FSDataInputStream in = fs.open(path)) {
|
||||||
in.close();
|
try (InputStreamReader isr = new InputStreamReader(in, Charsets.UTF_8)) {
|
||||||
|
p.load(isr);
|
||||||
|
}
|
||||||
|
}
|
||||||
tableName = p.getProperty("table");
|
tableName = p.getProperty("table");
|
||||||
families = p.getProperty("columnFamilies");
|
families = p.getProperty("columnFamilies");
|
||||||
batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
|
batchSize = Long.parseLong(p.getProperty("targetBatchSize"));
|
||||||
|
|
|
@ -139,6 +139,8 @@ public class Import {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
|
||||||
|
justification="This is wrong, yes, but we should be purging Writables, not fixing them")
|
||||||
public int compareTo(KeyValueWritableComparable o) {
|
public int compareTo(KeyValueWritableComparable o) {
|
||||||
return cellComparator.compare(this.kv, ((KeyValueWritableComparable)o).kv);
|
return cellComparator.compare(this.kv, ((KeyValueWritableComparable)o).kv);
|
||||||
}
|
}
|
||||||
|
@ -244,6 +246,8 @@ public class Import {
|
||||||
/**
|
/**
|
||||||
* A mapper that just writes out KeyValues.
|
* A mapper that just writes out KeyValues.
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="EQ_COMPARETO_USE_OBJECT_EQUALS",
|
||||||
|
justification="Writables are going away and this has been this way forever")
|
||||||
public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
|
public static class KeyValueImporter extends TableMapper<ImmutableBytesWritable, KeyValue> {
|
||||||
private Map<byte[], byte[]> cfRenameMap;
|
private Map<byte[], byte[]> cfRenameMap;
|
||||||
private Filter filter;
|
private Filter filter;
|
||||||
|
|
|
@ -108,7 +108,7 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
* If table didn't exist and was created in dry-run mode, this flag is
|
* If table didn't exist and was created in dry-run mode, this flag is
|
||||||
* flipped to delete it when MR ends.
|
* flipped to delete it when MR ends.
|
||||||
*/
|
*/
|
||||||
private static boolean dryRunTableCreated;
|
private static boolean DRY_RUN_TABLE_CREATED;
|
||||||
|
|
||||||
public static class TsvParser {
|
public static class TsvParser {
|
||||||
/**
|
/**
|
||||||
|
@ -477,117 +477,119 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
// See if a non-default Mapper was set
|
// See if a non-default Mapper was set
|
||||||
String mapperClassName = conf.get(MAPPER_CONF_KEY);
|
String mapperClassName = conf.get(MAPPER_CONF_KEY);
|
||||||
Class mapperClass =
|
Class mapperClass =
|
||||||
mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;
|
mapperClassName != null ? Class.forName(mapperClassName) : DEFAULT_MAPPER;
|
||||||
|
|
||||||
TableName tableName = TableName.valueOf(args[0]);
|
TableName tableName = TableName.valueOf(args[0]);
|
||||||
Path inputDir = new Path(args[1]);
|
Path inputDir = new Path(args[1]);
|
||||||
String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
|
String jobName = conf.get(JOB_NAME_CONF_KEY,NAME + "_" + tableName.getNameAsString());
|
||||||
job = Job.getInstance(conf, jobName);
|
job = Job.getInstance(conf, jobName);
|
||||||
job.setJarByClass(mapperClass);
|
job.setJarByClass(mapperClass);
|
||||||
FileInputFormat.setInputPaths(job, inputDir);
|
FileInputFormat.setInputPaths(job, inputDir);
|
||||||
job.setInputFormatClass(TextInputFormat.class);
|
job.setInputFormatClass(TextInputFormat.class);
|
||||||
job.setMapperClass(mapperClass);
|
job.setMapperClass(mapperClass);
|
||||||
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
|
||||||
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
|
||||||
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
|
String columns[] = conf.getStrings(COLUMNS_CONF_KEY);
|
||||||
if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
|
if(StringUtils.isNotEmpty(conf.get(CREDENTIALS_LOCATION))) {
|
||||||
String fileLoc = conf.get(CREDENTIALS_LOCATION);
|
String fileLoc = conf.get(CREDENTIALS_LOCATION);
|
||||||
Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
|
Credentials cred = Credentials.readTokenStorageFile(new File(fileLoc), conf);
|
||||||
job.getCredentials().addAll(cred);
|
job.getCredentials().addAll(cred);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hfileOutPath != null) {
|
if (hfileOutPath != null) {
|
||||||
if (!admin.tableExists(tableName)) {
|
if (!admin.tableExists(tableName)) {
|
||||||
LOG.warn(format("Table '%s' does not exist.", tableName));
|
LOG.warn(format("Table '%s' does not exist.", tableName));
|
||||||
if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
|
if ("yes".equalsIgnoreCase(conf.get(CREATE_TABLE_CONF_KEY, "yes"))) {
|
||||||
// TODO: this is backwards. Instead of depending on the existence of a table,
|
// TODO: this is backwards. Instead of depending on the existence of a table,
|
||||||
// create a sane splits file for HFileOutputFormat based on data sampling.
|
// create a sane splits file for HFileOutputFormat based on data sampling.
|
||||||
createTable(admin, tableName, columns);
|
createTable(admin, tableName, columns);
|
||||||
if (isDryRun) {
|
if (isDryRun) {
|
||||||
LOG.warn("Dry run: Table will be deleted at end of dry run.");
|
LOG.warn("Dry run: Table will be deleted at end of dry run.");
|
||||||
dryRunTableCreated = true;
|
synchronized (ImportTsv.class) {
|
||||||
}
|
DRY_RUN_TABLE_CREATED = true;
|
||||||
} else {
|
|
||||||
String errorMsg =
|
|
||||||
format("Table '%s' does not exist and '%s' is set to no.", tableName,
|
|
||||||
CREATE_TABLE_CONF_KEY);
|
|
||||||
LOG.error(errorMsg);
|
|
||||||
throw new TableNotFoundException(errorMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try (Table table = connection.getTable(tableName);
|
|
||||||
RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
|
|
||||||
boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
|
|
||||||
// if no.strict is false then check column family
|
|
||||||
if(!noStrict) {
|
|
||||||
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
|
||||||
Set<String> cfSet = getColumnFamilies(columns);
|
|
||||||
HTableDescriptor tDesc = table.getTableDescriptor();
|
|
||||||
for (String cf : cfSet) {
|
|
||||||
if(tDesc.getFamily(Bytes.toBytes(cf)) == null) {
|
|
||||||
unmatchedFamilies.add(cf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(unmatchedFamilies.size() > 0) {
|
|
||||||
ArrayList<String> familyNames = new ArrayList<String>();
|
|
||||||
for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) {
|
|
||||||
familyNames.add(family.getNameAsString());
|
|
||||||
}
|
|
||||||
String msg =
|
|
||||||
"Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
|
|
||||||
+ " does not match with any of the table " + tableName
|
|
||||||
+ " column families " + familyNames + ".\n"
|
|
||||||
+ "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
|
|
||||||
+ "=true.\n";
|
|
||||||
usage(msg);
|
|
||||||
System.exit(-1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
} else {
|
||||||
job.setMapOutputValueClass(Text.class);
|
String errorMsg =
|
||||||
job.setReducerClass(TextSortReducer.class);
|
format("Table '%s' does not exist and '%s' is set to no.", tableName,
|
||||||
} else {
|
CREATE_TABLE_CONF_KEY);
|
||||||
job.setMapOutputValueClass(Put.class);
|
|
||||||
job.setCombinerClass(PutCombiner.class);
|
|
||||||
job.setReducerClass(PutSortReducer.class);
|
|
||||||
}
|
|
||||||
if (!isDryRun) {
|
|
||||||
Path outputDir = new Path(hfileOutPath);
|
|
||||||
FileOutputFormat.setOutputPath(job, outputDir);
|
|
||||||
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
|
|
||||||
regionLocator);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (!admin.tableExists(tableName)) {
|
|
||||||
String errorMsg = format("Table '%s' does not exist.", tableName);
|
|
||||||
LOG.error(errorMsg);
|
LOG.error(errorMsg);
|
||||||
throw new TableNotFoundException(errorMsg);
|
throw new TableNotFoundException(errorMsg);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
try (Table table = connection.getTable(tableName);
|
||||||
|
RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
|
||||||
|
boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false);
|
||||||
|
// if no.strict is false then check column family
|
||||||
|
if(!noStrict) {
|
||||||
|
ArrayList<String> unmatchedFamilies = new ArrayList<String>();
|
||||||
|
Set<String> cfSet = getColumnFamilies(columns);
|
||||||
|
HTableDescriptor tDesc = table.getTableDescriptor();
|
||||||
|
for (String cf : cfSet) {
|
||||||
|
if(tDesc.getFamily(Bytes.toBytes(cf)) == null) {
|
||||||
|
unmatchedFamilies.add(cf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(unmatchedFamilies.size() > 0) {
|
||||||
|
ArrayList<String> familyNames = new ArrayList<String>();
|
||||||
|
for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) {
|
||||||
|
familyNames.add(family.getNameAsString());
|
||||||
|
}
|
||||||
|
String msg =
|
||||||
|
"Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY
|
||||||
|
+ " does not match with any of the table " + tableName
|
||||||
|
+ " column families " + familyNames + ".\n"
|
||||||
|
+ "To disable column family check, use -D" + NO_STRICT_COL_FAMILY
|
||||||
|
+ "=true.\n";
|
||||||
|
usage(msg);
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
||||||
usage(TsvImporterTextMapper.class.toString()
|
job.setMapOutputValueClass(Text.class);
|
||||||
+ " should not be used for non bulkloading case. use "
|
job.setReducerClass(TextSortReducer.class);
|
||||||
+ TsvImporterMapper.class.toString()
|
} else {
|
||||||
+ " or custom mapper whose value type is Put.");
|
job.setMapOutputValueClass(Put.class);
|
||||||
System.exit(-1);
|
job.setCombinerClass(PutCombiner.class);
|
||||||
|
job.setReducerClass(PutSortReducer.class);
|
||||||
}
|
}
|
||||||
if (!isDryRun) {
|
if (!isDryRun) {
|
||||||
// No reducers. Just write straight to table. Call initTableReducerJob
|
Path outputDir = new Path(hfileOutPath);
|
||||||
// to set up the TableOutputFormat.
|
FileOutputFormat.setOutputPath(job, outputDir);
|
||||||
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
|
HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(),
|
||||||
|
regionLocator);
|
||||||
}
|
}
|
||||||
job.setNumReduceTasks(0);
|
|
||||||
}
|
}
|
||||||
if (isDryRun) {
|
} else {
|
||||||
job.setOutputFormatClass(NullOutputFormat.class);
|
if (!admin.tableExists(tableName)) {
|
||||||
job.getConfiguration().setStrings("io.serializations",
|
String errorMsg = format("Table '%s' does not exist.", tableName);
|
||||||
job.getConfiguration().get("io.serializations"),
|
LOG.error(errorMsg);
|
||||||
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
throw new TableNotFoundException(errorMsg);
|
||||||
KeyValueSerialization.class.getName());
|
|
||||||
}
|
}
|
||||||
TableMapReduceUtil.addDependencyJars(job);
|
if (mapperClass.equals(TsvImporterTextMapper.class)) {
|
||||||
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
usage(TsvImporterTextMapper.class.toString()
|
||||||
com.google.common.base.Function.class /* Guava used by TsvParser */);
|
+ " should not be used for non bulkloading case. use "
|
||||||
|
+ TsvImporterMapper.class.toString()
|
||||||
|
+ " or custom mapper whose value type is Put.");
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
if (!isDryRun) {
|
||||||
|
// No reducers. Just write straight to table. Call initTableReducerJob
|
||||||
|
// to set up the TableOutputFormat.
|
||||||
|
TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
|
||||||
|
}
|
||||||
|
job.setNumReduceTasks(0);
|
||||||
|
}
|
||||||
|
if (isDryRun) {
|
||||||
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
|
job.getConfiguration().setStrings("io.serializations",
|
||||||
|
job.getConfiguration().get("io.serializations"),
|
||||||
|
MutationSerialization.class.getName(), ResultSerialization.class.getName(),
|
||||||
|
KeyValueSerialization.class.getName());
|
||||||
|
}
|
||||||
|
TableMapReduceUtil.addDependencyJars(job);
|
||||||
|
TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
|
||||||
|
com.google.common.base.Function.class /* Guava used by TsvParser */);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return job;
|
return job;
|
||||||
|
@ -617,7 +619,8 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
admin.deleteTable(tableName);
|
admin.deleteTable(tableName);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error(format("***Dry run: Failed to delete table '%s'.***\n%s", tableName, e.toString()));
|
LOG.error(format("***Dry run: Failed to delete table '%s'.***%n%s", tableName,
|
||||||
|
e.toString()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info(format("Dry run: Deleted table '%s'.", tableName));
|
LOG.info(format("Dry run: Deleted table '%s'.", tableName));
|
||||||
|
@ -659,7 +662,7 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
"input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
|
"input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC +
|
||||||
" designates that this column should be\n" +
|
" designates that this column should be\n" +
|
||||||
"used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
|
"used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " +
|
||||||
TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional.\n" +
|
TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional." + "\n" +
|
||||||
"You must specify at most one column as timestamp key for each imported record.\n" +
|
"You must specify at most one column as timestamp key for each imported record.\n" +
|
||||||
"Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
|
"Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
|
||||||
"Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
|
"Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" +
|
||||||
|
@ -770,10 +773,16 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
// system time
|
// system time
|
||||||
getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
|
getConf().setLong(TIMESTAMP_CONF_KEY, timstamp);
|
||||||
|
|
||||||
dryRunTableCreated = false;
|
synchronized (ImportTsv.class) {
|
||||||
Job job = createSubmittableJob(getConf(), otherArgs);
|
DRY_RUN_TABLE_CREATED = false;
|
||||||
|
}
|
||||||
|
Job job = createSubmittableJob(getConf(), args);
|
||||||
boolean success = job.waitForCompletion(true);
|
boolean success = job.waitForCompletion(true);
|
||||||
if (dryRunTableCreated) {
|
boolean delete = false;
|
||||||
|
synchronized (ImportTsv.class) {
|
||||||
|
delete = DRY_RUN_TABLE_CREATED;
|
||||||
|
}
|
||||||
|
if (delete) {
|
||||||
deleteTable(getConf(), args);
|
deleteTable(getConf(), args);
|
||||||
}
|
}
|
||||||
return success ? 0 : 1;
|
return success ? 0 : 1;
|
||||||
|
|
|
@ -110,9 +110,7 @@ public abstract class MultiTableInputFormatBase extends
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
trr.close();
|
trr.close();
|
||||||
if (connection != null) {
|
connection.close();
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -145,9 +143,7 @@ public abstract class MultiTableInputFormatBase extends
|
||||||
// If there is an exception make sure that all
|
// If there is an exception make sure that all
|
||||||
// resources are closed and released.
|
// resources are closed and released.
|
||||||
trr.close();
|
trr.close();
|
||||||
if (connection != null) {
|
connection.close();
|
||||||
connection.close();
|
|
||||||
}
|
|
||||||
throw ioe;
|
throw ioe;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,6 +230,8 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Don't understand why FB is complaining about this one. We do throw exception")
|
||||||
private class MapRunner implements Runnable {
|
private class MapRunner implements Runnable {
|
||||||
private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
|
private Mapper<ImmutableBytesWritable, Result, K2,V2> mapper;
|
||||||
private Context subcontext;
|
private Context subcontext;
|
||||||
|
@ -280,7 +282,7 @@ public class MultithreadedTableMapper<K2, V2> extends TableMapper<K2, V2> {
|
||||||
Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
|
Class<?> wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper");
|
||||||
Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
|
Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class);
|
||||||
subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
|
subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc);
|
||||||
} catch (Exception ee) {
|
} catch (Exception ee) { // FindBugs: REC_CATCH_EXCEPTION
|
||||||
// rethrow as IOE
|
// rethrow as IOE
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,6 +111,8 @@ implements Configurable {
|
||||||
* org.apache.hadoop.conf.Configuration)
|
* org.apache.hadoop.conf.Configuration)
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Intentional")
|
||||||
public void setConf(Configuration configuration) {
|
public void setConf(Configuration configuration) {
|
||||||
this.conf = configuration;
|
this.conf = configuration;
|
||||||
|
|
||||||
|
|
|
@ -924,6 +924,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
* @param coordination coordination for opening region
|
* @param coordination coordination for opening region
|
||||||
* @param ord details about opening region
|
* @param ord details about opening region
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||||
|
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
|
||||||
|
justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!")
|
||||||
void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
|
void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
|
||||||
OpenRegionCoordination.OpenRegionDetails ord) {
|
OpenRegionCoordination.OpenRegionDetails ord) {
|
||||||
if (rt == null) {
|
if (rt == null) {
|
||||||
|
@ -1047,9 +1050,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// No need to use putIfAbsent, or extra synchronization since
|
// No need to use putIfAbsent, or extra synchronization since
|
||||||
// this whole handleRegion block is locked on the encoded region
|
// this whole handleRegion block is locked on the encoded region
|
||||||
// name, and failedOpenTracker is updated only in this block
|
// name, and failedOpenTracker is updated only in this block
|
||||||
|
// FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
|
||||||
failedOpenTracker.put(encodedName, failedOpenCount);
|
failedOpenTracker.put(encodedName, failedOpenCount);
|
||||||
}
|
}
|
||||||
if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
|
if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
|
||||||
|
// FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
|
||||||
regionStates.updateRegionState(rt, State.FAILED_OPEN);
|
regionStates.updateRegionState(rt, State.FAILED_OPEN);
|
||||||
// remove the tracking info to save memory, also reset
|
// remove the tracking info to save memory, also reset
|
||||||
// the count for next open initiative
|
// the count for next open initiative
|
||||||
|
@ -3637,18 +3642,24 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
|
EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
|
||||||
|
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
|
||||||
|
justification="Modification of Maps not ATOMIC!!!! FIX!!!")
|
||||||
private void onRegionFailedOpen(
|
private void onRegionFailedOpen(
|
||||||
final HRegionInfo hri, final ServerName sn) {
|
final HRegionInfo hri, final ServerName sn) {
|
||||||
String encodedName = hri.getEncodedName();
|
String encodedName = hri.getEncodedName();
|
||||||
|
// FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
|
||||||
AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
|
AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
|
||||||
if (failedOpenCount == null) {
|
if (failedOpenCount == null) {
|
||||||
failedOpenCount = new AtomicInteger();
|
failedOpenCount = new AtomicInteger();
|
||||||
// No need to use putIfAbsent, or extra synchronization since
|
// No need to use putIfAbsent, or extra synchronization since
|
||||||
// this whole handleRegion block is locked on the encoded region
|
// this whole handleRegion block is locked on the encoded region
|
||||||
// name, and failedOpenTracker is updated only in this block
|
// name, and failedOpenTracker is updated only in this block
|
||||||
|
// FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
|
||||||
failedOpenTracker.put(encodedName, failedOpenCount);
|
failedOpenTracker.put(encodedName, failedOpenCount);
|
||||||
}
|
}
|
||||||
if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
|
if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
|
||||||
|
// FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
|
||||||
regionStates.updateRegionState(hri, State.FAILED_OPEN);
|
regionStates.updateRegionState(hri, State.FAILED_OPEN);
|
||||||
// remove the tracking info to save memory, also reset
|
// remove the tracking info to save memory, also reset
|
||||||
// the count for next open initiative
|
// the count for next open initiative
|
||||||
|
|
|
@ -94,8 +94,7 @@ public class HMasterCommandLine extends ServerCommandLine {
|
||||||
|
|
||||||
if (cmd.hasOption("minRegionServers")) {
|
if (cmd.hasOption("minRegionServers")) {
|
||||||
String val = cmd.getOptionValue("minRegionServers");
|
String val = cmd.getOptionValue("minRegionServers");
|
||||||
getConf().setInt("hbase.regions.server.count.min",
|
getConf().setInt("hbase.regions.server.count.min", Integer.parseInt(val));
|
||||||
Integer.valueOf(val));
|
|
||||||
LOG.debug("minRegionServers set to " + val);
|
LOG.debug("minRegionServers set to " + val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,7 +102,7 @@ public class HMasterCommandLine extends ServerCommandLine {
|
||||||
if (cmd.hasOption("minServers")) {
|
if (cmd.hasOption("minServers")) {
|
||||||
String val = cmd.getOptionValue("minServers");
|
String val = cmd.getOptionValue("minServers");
|
||||||
getConf().setInt("hbase.regions.server.count.min",
|
getConf().setInt("hbase.regions.server.count.min",
|
||||||
Integer.valueOf(val));
|
Integer.parseInt(val));
|
||||||
LOG.debug("minServers set to " + val);
|
LOG.debug("minServers set to " + val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,13 +115,13 @@ public class HMasterCommandLine extends ServerCommandLine {
|
||||||
// master when we are in local/standalone mode. Useful testing)
|
// master when we are in local/standalone mode. Useful testing)
|
||||||
if (cmd.hasOption("localRegionServers")) {
|
if (cmd.hasOption("localRegionServers")) {
|
||||||
String val = cmd.getOptionValue("localRegionServers");
|
String val = cmd.getOptionValue("localRegionServers");
|
||||||
getConf().setInt("hbase.regionservers", Integer.valueOf(val));
|
getConf().setInt("hbase.regionservers", Integer.parseInt(val));
|
||||||
LOG.debug("localRegionServers set to " + val);
|
LOG.debug("localRegionServers set to " + val);
|
||||||
}
|
}
|
||||||
// How many masters to startup inside this process; useful testing
|
// How many masters to startup inside this process; useful testing
|
||||||
if (cmd.hasOption("masters")) {
|
if (cmd.hasOption("masters")) {
|
||||||
String val = cmd.getOptionValue("masters");
|
String val = cmd.getOptionValue("masters");
|
||||||
getConf().setInt("hbase.masters", Integer.valueOf(val));
|
getConf().setInt("hbase.masters", Integer.parseInt(val));
|
||||||
LOG.debug("masters set to " + val);
|
LOG.debug("masters set to " + val);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -464,8 +464,6 @@ public class RegionStates {
|
||||||
if (!serverName.equals(oldServerName)) {
|
if (!serverName.equals(oldServerName)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
|
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
|
||||||
} else {
|
|
||||||
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
|
|
||||||
}
|
}
|
||||||
addToServerHoldings(serverName, hri);
|
addToServerHoldings(serverName, hri);
|
||||||
addToReplicaMapping(hri);
|
addToReplicaMapping(hri);
|
||||||
|
|
|
@ -643,7 +643,7 @@ public class SplitLogManager {
|
||||||
public volatile ServerName cur_worker_name;
|
public volatile ServerName cur_worker_name;
|
||||||
public volatile TaskBatch batch;
|
public volatile TaskBatch batch;
|
||||||
public volatile TerminationStatus status;
|
public volatile TerminationStatus status;
|
||||||
public volatile int incarnation;
|
public volatile AtomicInteger incarnation = new AtomicInteger(0);
|
||||||
public final AtomicInteger unforcedResubmits = new AtomicInteger();
|
public final AtomicInteger unforcedResubmits = new AtomicInteger();
|
||||||
public volatile boolean resubmitThresholdReached;
|
public volatile boolean resubmitThresholdReached;
|
||||||
|
|
||||||
|
@ -655,7 +655,6 @@ public class SplitLogManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task() {
|
public Task() {
|
||||||
incarnation = 0;
|
|
||||||
last_version = -1;
|
last_version = -1;
|
||||||
status = IN_PROGRESS;
|
status = IN_PROGRESS;
|
||||||
setUnassigned();
|
setUnassigned();
|
||||||
|
|
|
@ -59,12 +59,15 @@ import com.google.common.collect.Sets;
|
||||||
* {@link org.apache.hadoop.hbase.ZKNamespaceManager}
|
* {@link org.apache.hadoop.hbase.ZKNamespaceManager}
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="TODO: synchronize access on nsTable but it is done in tiers above and this " +
|
||||||
|
"class is going away/shrinking")
|
||||||
public class TableNamespaceManager {
|
public class TableNamespaceManager {
|
||||||
private static final Log LOG = LogFactory.getLog(TableNamespaceManager.class);
|
private static final Log LOG = LogFactory.getLog(TableNamespaceManager.class);
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private MasterServices masterServices;
|
private MasterServices masterServices;
|
||||||
private Table nsTable = null;
|
private Table nsTable = null; // FindBugs: IS2_INCONSISTENT_SYNC TODO: Access is not synchronized
|
||||||
private ZKNamespaceManager zkNamespaceManager;
|
private ZKNamespaceManager zkNamespaceManager;
|
||||||
private boolean initialized;
|
private boolean initialized;
|
||||||
|
|
||||||
|
|
|
@ -552,17 +552,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
switch (action.type) {
|
switch (action.type) {
|
||||||
case NULL: break;
|
case NULL: break;
|
||||||
case ASSIGN_REGION:
|
case ASSIGN_REGION:
|
||||||
|
// FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings
|
||||||
|
assert action instanceof AssignRegionAction: action.getClass();
|
||||||
AssignRegionAction ar = (AssignRegionAction) action;
|
AssignRegionAction ar = (AssignRegionAction) action;
|
||||||
regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
|
regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region);
|
||||||
regionMoved(ar.region, -1, ar.server);
|
regionMoved(ar.region, -1, ar.server);
|
||||||
break;
|
break;
|
||||||
case MOVE_REGION:
|
case MOVE_REGION:
|
||||||
|
assert action instanceof MoveRegionAction: action.getClass();
|
||||||
MoveRegionAction mra = (MoveRegionAction) action;
|
MoveRegionAction mra = (MoveRegionAction) action;
|
||||||
regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
|
regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region);
|
||||||
regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
|
regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region);
|
||||||
regionMoved(mra.region, mra.fromServer, mra.toServer);
|
regionMoved(mra.region, mra.fromServer, mra.toServer);
|
||||||
break;
|
break;
|
||||||
case SWAP_REGIONS:
|
case SWAP_REGIONS:
|
||||||
|
assert action instanceof SwapRegionsAction: action.getClass();
|
||||||
SwapRegionsAction a = (SwapRegionsAction) action;
|
SwapRegionsAction a = (SwapRegionsAction) action;
|
||||||
regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
|
regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion);
|
||||||
regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
|
regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion);
|
||||||
|
@ -1080,7 +1084,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setClusterStatus(ClusterStatus st) {
|
public synchronized void setClusterStatus(ClusterStatus st) {
|
||||||
this.clusterStatus = st;
|
this.clusterStatus = st;
|
||||||
regionFinder.setClusterStatus(st);
|
regionFinder.setClusterStatus(st);
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,6 +94,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
* so that the balancer gets the full picture of all loads on the cluster.</p>
|
* so that the balancer gets the full picture of all loads on the cluster.</p>
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="Complaint is about costFunctions not being synchronized; not end of the world")
|
||||||
public class StochasticLoadBalancer extends BaseLoadBalancer {
|
public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
|
|
||||||
protected static final String STEPS_PER_REGION_KEY =
|
protected static final String STEPS_PER_REGION_KEY =
|
||||||
|
@ -119,7 +121,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
||||||
|
|
||||||
private CandidateGenerator[] candidateGenerators;
|
private CandidateGenerator[] candidateGenerators;
|
||||||
private CostFromRegionLoadFunction[] regionLoadFunctions;
|
private CostFromRegionLoadFunction[] regionLoadFunctions;
|
||||||
private CostFunction[] costFunctions;
|
private CostFunction[] costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
|
||||||
|
|
||||||
// to save and report costs to JMX
|
// to save and report costs to JMX
|
||||||
private Double curOverallCost = 0d;
|
private Double curOverallCost = 0d;
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class HFileLinkCleaner extends BaseHFileCleanerDelegate {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public synchronized void setConf(Configuration conf) {
|
||||||
super.setConf(conf);
|
super.setConf(conf);
|
||||||
|
|
||||||
// setup filesystem
|
// setup filesystem
|
||||||
|
|
|
@ -312,6 +312,8 @@ public class DisableTableProcedure
|
||||||
* Rollback of table state change in prepareDisable()
|
* Rollback of table state change in prepareDisable()
|
||||||
* @param env MasterProcedureEnv
|
* @param env MasterProcedureEnv
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Intended")
|
||||||
private void undoTableStateChange(final MasterProcedureEnv env) {
|
private void undoTableStateChange(final MasterProcedureEnv env) {
|
||||||
if (!skipTableStateCheck) {
|
if (!skipTableStateCheck) {
|
||||||
try {
|
try {
|
||||||
|
@ -322,6 +324,7 @@ public class DisableTableProcedure
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// Ignore exception.
|
// Ignore exception.
|
||||||
|
LOG.trace(e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,6 +154,8 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
||||||
* call should get implemented for each snapshot flavor.
|
* call should get implemented for each snapshot flavor.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Intentional")
|
||||||
public void process() {
|
public void process() {
|
||||||
String msg = "Running " + snapshot.getType() + " table snapshot " + snapshot.getName() + " "
|
String msg = "Running " + snapshot.getType() + " table snapshot " + snapshot.getName() + " "
|
||||||
+ eventType + " on table " + snapshotTable;
|
+ eventType + " on table " + snapshotTable;
|
||||||
|
@ -205,7 +207,7 @@ public abstract class TakeSnapshotHandler extends EventHandler implements Snapsh
|
||||||
status.markComplete(msg);
|
status.markComplete(msg);
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
|
metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) { // FindBugs: REC_CATCH_EXCEPTION
|
||||||
status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +
|
status.abort("Failed to complete snapshot " + snapshot.getName() + " on table " +
|
||||||
snapshotTable + " because " + e.getMessage());
|
snapshotTable + " because " + e.getMessage());
|
||||||
String reason = "Failed taking snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot)
|
String reason = "Failed taking snapshot " + ClientSnapshotDescriptionUtils.toString(snapshot)
|
||||||
|
|
|
@ -98,7 +98,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
||||||
* by this Handler.
|
* by this Handler.
|
||||||
* @return a string representing the method call without parameters
|
* @return a string representing the method call without parameters
|
||||||
*/
|
*/
|
||||||
public String getRPC() {
|
public synchronized String getRPC() {
|
||||||
return getRPC(false);
|
return getRPC(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +166,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
||||||
* @return true if the monitored handler is currently servicing an RPC call
|
* @return true if the monitored handler is currently servicing an RPC call
|
||||||
* to a database command.
|
* to a database command.
|
||||||
*/
|
*/
|
||||||
public boolean isOperationRunning() {
|
public synchronized boolean isOperationRunning() {
|
||||||
if(!isRPCRunning()) {
|
if(!isRPCRunning()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -212,7 +212,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void markComplete(String status) {
|
public synchronized void markComplete(String status) {
|
||||||
super.markComplete(status);
|
super.markComplete(status);
|
||||||
this.params = null;
|
this.params = null;
|
||||||
this.packet = null;
|
this.packet = null;
|
||||||
|
|
|
@ -21,6 +21,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="FindBugs seems confused; says globalLimiter and lastUpdate " +
|
||||||
|
"are mostly synchronized...but to me it looks like they are totally synchronized")
|
||||||
public class QuotaState {
|
public class QuotaState {
|
||||||
private long lastUpdate = 0;
|
private long lastUpdate = 0;
|
||||||
private long lastQuery = 0;
|
private long lastQuery = 0;
|
||||||
|
|
|
@ -38,6 +38,9 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="FindBugs seems confused; says limit and tlimit " +
|
||||||
|
"are mostly synchronized...but to me it looks like they are totally synchronized")
|
||||||
public abstract class RateLimiter {
|
public abstract class RateLimiter {
|
||||||
public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
|
public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
|
||||||
private long tunit = 1000; // Timeunit factor for translating to ms.
|
private long tunit = 1000; // Timeunit factor for translating to ms.
|
||||||
|
@ -66,7 +69,7 @@ public abstract class RateLimiter {
|
||||||
* @param limit The max value available resource units can be refilled to.
|
* @param limit The max value available resource units can be refilled to.
|
||||||
* @param timeUnit Timeunit factor for translating to ms.
|
* @param timeUnit Timeunit factor for translating to ms.
|
||||||
*/
|
*/
|
||||||
public void set(final long limit, final TimeUnit timeUnit) {
|
public synchronized void set(final long limit, final TimeUnit timeUnit) {
|
||||||
switch (timeUnit) {
|
switch (timeUnit) {
|
||||||
case MILLISECONDS:
|
case MILLISECONDS:
|
||||||
tunit = 1;
|
tunit = 1;
|
||||||
|
@ -92,10 +95,11 @@ public abstract class RateLimiter {
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String rateLimiter = this.getClass().getSimpleName();
|
String rateLimiter = this.getClass().getSimpleName();
|
||||||
if (limit == Long.MAX_VALUE) {
|
if (getLimit() == Long.MAX_VALUE) {
|
||||||
return rateLimiter + "(Bypass)";
|
return rateLimiter + "(Bypass)";
|
||||||
}
|
}
|
||||||
return rateLimiter + "(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
|
return rateLimiter + "(avail=" + getAvailable() + " limit=" + getLimit() +
|
||||||
|
" tunit=" + getTimeUnitInMillis() + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -113,7 +117,7 @@ public abstract class RateLimiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized boolean isBypass() {
|
public synchronized boolean isBypass() {
|
||||||
return limit == Long.MAX_VALUE;
|
return getLimit() == Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized long getLimit() {
|
public synchronized long getLimit() {
|
||||||
|
@ -124,7 +128,7 @@ public abstract class RateLimiter {
|
||||||
return avail;
|
return avail;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected long getTimeUnitInMillis() {
|
protected synchronized long getTimeUnitInMillis() {
|
||||||
return tunit;
|
return tunit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,7 +192,7 @@ public abstract class RateLimiter {
|
||||||
*/
|
*/
|
||||||
public synchronized long waitInterval(final long amount) {
|
public synchronized long waitInterval(final long amount) {
|
||||||
// TODO Handle over quota?
|
// TODO Handle over quota?
|
||||||
return (amount <= avail) ? 0 : getWaitInterval(limit, avail, amount);
|
return (amount <= avail) ? 0 : getWaitInterval(getLimit(), avail, amount);
|
||||||
}
|
}
|
||||||
|
|
||||||
// These two method are for strictly testing purpose only
|
// These two method are for strictly testing purpose only
|
||||||
|
|
|
@ -27,6 +27,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="FindBugs seems confused; says bypassGlobals, namepaceLimiters, and " +
|
||||||
|
"tableLimiters are mostly synchronized...but to me it looks like they are totally synchronized")
|
||||||
public class UserQuotaState extends QuotaState {
|
public class UserQuotaState extends QuotaState {
|
||||||
private Map<String, QuotaLimiter> namespaceLimiters = null;
|
private Map<String, QuotaLimiter> namespaceLimiters = null;
|
||||||
private Map<TableName, QuotaLimiter> tableLimiters = null;
|
private Map<TableName, QuotaLimiter> tableLimiters = null;
|
||||||
|
|
|
@ -389,7 +389,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
// Set when a flush has been requested.
|
// Set when a flush has been requested.
|
||||||
volatile boolean flushRequested = false;
|
volatile boolean flushRequested = false;
|
||||||
// Number of compactions running.
|
// Number of compactions running.
|
||||||
volatile int compacting = 0;
|
AtomicInteger compacting = new AtomicInteger(0);
|
||||||
// Gets set in close. If set, cannot compact or flush again.
|
// Gets set in close. If set, cannot compact or flush again.
|
||||||
volatile boolean writesEnabled = true;
|
volatile boolean writesEnabled = true;
|
||||||
// Set if region is read-only
|
// Set if region is read-only
|
||||||
|
@ -823,7 +823,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
|
this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this));
|
||||||
this.writestate.flushRequested = false;
|
this.writestate.flushRequested = false;
|
||||||
this.writestate.compacting = 0;
|
this.writestate.compacting.set(0);
|
||||||
|
|
||||||
if (this.writestate.writesEnabled) {
|
if (this.writestate.writesEnabled) {
|
||||||
// Remove temporary data left over from old regions
|
// Remove temporary data left over from old regions
|
||||||
|
@ -1367,6 +1367,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
this.closing.set(closing);
|
this.closing.set(closing);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK_EXCEPTION_PATH",
|
||||||
|
justification="I think FindBugs is confused")
|
||||||
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
|
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (isClosed()) {
|
if (isClosed()) {
|
||||||
|
@ -1404,7 +1406,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
// block waiting for the lock for closing
|
// block waiting for the lock for closing
|
||||||
lock.writeLock().lock();
|
lock.writeLock().lock(); // FindBugs: Complains UL_UNRELEASED_LOCK_EXCEPTION_PATH but seems fine
|
||||||
this.closing.set(true);
|
this.closing.set(true);
|
||||||
status.setStatus("Disabling writes for close");
|
status.setStatus("Disabling writes for close");
|
||||||
try {
|
try {
|
||||||
|
@ -1533,7 +1535,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
boolean interrupted = false;
|
boolean interrupted = false;
|
||||||
try {
|
try {
|
||||||
while (writestate.compacting > 0 || writestate.flushing) {
|
while (writestate.compacting.get() > 0 || writestate.flushing) {
|
||||||
LOG.debug("waiting for " + writestate.compacting + " compactions"
|
LOG.debug("waiting for " + writestate.compacting + " compactions"
|
||||||
+ (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
|
+ (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
|
||||||
try {
|
try {
|
||||||
|
@ -1890,7 +1892,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
synchronized (writestate) {
|
synchronized (writestate) {
|
||||||
if (writestate.writesEnabled) {
|
if (writestate.writesEnabled) {
|
||||||
wasStateSet = true;
|
wasStateSet = true;
|
||||||
++writestate.compacting;
|
writestate.compacting.incrementAndGet();
|
||||||
} else {
|
} else {
|
||||||
String msg = "NOT compacting region " + this + ". Writes disabled.";
|
String msg = "NOT compacting region " + this + ". Writes disabled.";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
@ -1916,8 +1918,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
} finally {
|
} finally {
|
||||||
if (wasStateSet) {
|
if (wasStateSet) {
|
||||||
synchronized (writestate) {
|
synchronized (writestate) {
|
||||||
--writestate.compacting;
|
writestate.compacting.decrementAndGet();
|
||||||
if (writestate.compacting <= 0) {
|
if (writestate.compacting.get() <= 0) {
|
||||||
writestate.notifyAll();
|
writestate.notifyAll();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2160,6 +2162,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
|
||||||
|
justification="FindBugs seems confused about trxId")
|
||||||
protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
|
protected PrepareFlushResult internalPrepareFlushCache(final WAL wal, final long myseqid,
|
||||||
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
|
final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -2391,6 +2395,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||||
|
justification="Intentional; notify is about completed flush")
|
||||||
protected FlushResult internalFlushCacheAndCommit(
|
protected FlushResult internalFlushCacheAndCommit(
|
||||||
final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
|
final WAL wal, MonitoredTask status, final PrepareFlushResult prepareResult,
|
||||||
final Collection<Store> storesToFlush)
|
final Collection<Store> storesToFlush)
|
||||||
|
@ -4460,6 +4466,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||||
|
justification="Intentional; post memstore flush")
|
||||||
void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
|
void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
|
||||||
MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
|
MonitoredTask status = TaskMonitor.get().createStatus("Committing flush " + this);
|
||||||
|
|
||||||
|
@ -4696,6 +4704,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
return prepareFlushResult;
|
return prepareFlushResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||||
|
justification="Intentional; cleared the memstore")
|
||||||
void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
|
void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
|
||||||
checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
|
checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
|
||||||
"RegionEvent marker from WAL ", regionEvent);
|
"RegionEvent marker from WAL ", regionEvent);
|
||||||
|
@ -4926,6 +4936,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
return refreshStoreFiles(false);
|
return refreshStoreFiles(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||||
|
justification="Notify is about post replay. Intentional")
|
||||||
protected boolean refreshStoreFiles(boolean force) throws IOException {
|
protected boolean refreshStoreFiles(boolean force) throws IOException {
|
||||||
if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
|
if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
|
||||||
return false; // if primary nothing to do
|
return false; // if primary nothing to do
|
||||||
|
@ -7922,6 +7934,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH",
|
||||||
|
justification="Intentional")
|
||||||
public void startRegionOperation(Operation op) throws IOException {
|
public void startRegionOperation(Operation op) throws IOException {
|
||||||
switch (op) {
|
switch (op) {
|
||||||
case GET: // read operations
|
case GET: // read operations
|
||||||
|
|
|
@ -1963,7 +1963,6 @@ public class HRegionServer extends HasThread implements
|
||||||
public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
|
public boolean reportRegionStateTransition(final RegionStateTransitionContext context) {
|
||||||
TransitionCode code = context.getCode();
|
TransitionCode code = context.getCode();
|
||||||
long openSeqNum = context.getOpenSeqNum();
|
long openSeqNum = context.getOpenSeqNum();
|
||||||
long masterSystemTime = context.getMasterSystemTime();
|
|
||||||
HRegionInfo[] hris = context.getHris();
|
HRegionInfo[] hris = context.getHris();
|
||||||
|
|
||||||
ReportRegionStateTransitionRequest.Builder builder =
|
ReportRegionStateTransitionRequest.Builder builder =
|
||||||
|
|
|
@ -189,7 +189,7 @@ implements HeapSize, Map<K,V> {
|
||||||
*
|
*
|
||||||
* @return currently available bytes
|
* @return currently available bytes
|
||||||
*/
|
*/
|
||||||
public long getMemFree() {
|
public synchronized long getMemFree() {
|
||||||
return memFree;
|
return memFree;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +208,7 @@ implements HeapSize, Map<K,V> {
|
||||||
* @return currently used memory in bytes
|
* @return currently used memory in bytes
|
||||||
*/
|
*/
|
||||||
public long getMemUsed() {
|
public long getMemUsed() {
|
||||||
return (memTotal - memFree); // FindBugs IS2_INCONSISTENT_SYNC
|
return (memTotal - getMemFree()); // FindBugs IS2_INCONSISTENT_SYNC
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -227,7 +227,7 @@ implements HeapSize, Map<K,V> {
|
||||||
*
|
*
|
||||||
* @return number of misses
|
* @return number of misses
|
||||||
*/
|
*/
|
||||||
public long getMissCount() {
|
public synchronized long getMissCount() {
|
||||||
return missCount; // FindBugs IS2_INCONSISTENT_SYNC
|
return missCount; // FindBugs IS2_INCONSISTENT_SYNC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +239,7 @@ implements HeapSize, Map<K,V> {
|
||||||
*/
|
*/
|
||||||
public double getHitRatio() {
|
public double getHitRatio() {
|
||||||
return (double)((double)hitCount/
|
return (double)((double)hitCount/
|
||||||
((double)(hitCount+missCount)));
|
((double)(hitCount + getMissCount())));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -269,7 +269,7 @@ implements HeapSize, Map<K,V> {
|
||||||
* @return memory usage of map in bytes
|
* @return memory usage of map in bytes
|
||||||
*/
|
*/
|
||||||
public long heapSize() {
|
public long heapSize() {
|
||||||
return (memTotal - memFree);
|
return (memTotal - getMemFree());
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------------------------------------------
|
//--------------------------------------------------------------------------
|
||||||
|
@ -824,6 +824,8 @@ implements HeapSize, Map<K,V> {
|
||||||
*
|
*
|
||||||
* @return Set of entries in hash
|
* @return Set of entries in hash
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
|
||||||
|
justification="Unused debugging function that reads only")
|
||||||
public Set<Entry<K,V>> entryTableSet() {
|
public Set<Entry<K,V>> entryTableSet() {
|
||||||
Set<Entry<K,V>> entrySet = new HashSet<Entry<K,V>>();
|
Set<Entry<K,V>> entrySet = new HashSet<Entry<K,V>>();
|
||||||
Entry [] table = entries; // FindBugs IS2_INCONSISTENT_SYNC
|
Entry [] table = entries; // FindBugs IS2_INCONSISTENT_SYNC
|
||||||
|
|
|
@ -60,7 +60,7 @@ public class MemStoreChunkPool {
|
||||||
final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
|
final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f;
|
||||||
|
|
||||||
// Static reference to the MemStoreChunkPool
|
// Static reference to the MemStoreChunkPool
|
||||||
private static MemStoreChunkPool globalInstance;
|
private static MemStoreChunkPool GLOBAL_INSTANCE;
|
||||||
/** Boolean whether we have disabled the memstore chunk pool entirely. */
|
/** Boolean whether we have disabled the memstore chunk pool entirely. */
|
||||||
static boolean chunkPoolDisabled = false;
|
static boolean chunkPoolDisabled = false;
|
||||||
|
|
||||||
|
@ -179,12 +179,14 @@ public class MemStoreChunkPool {
|
||||||
* @param conf
|
* @param conf
|
||||||
* @return the global MemStoreChunkPool instance
|
* @return the global MemStoreChunkPool instance
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DC_DOUBLECHECK",
|
||||||
|
justification="Intentional")
|
||||||
static MemStoreChunkPool getPool(Configuration conf) {
|
static MemStoreChunkPool getPool(Configuration conf) {
|
||||||
if (globalInstance != null) return globalInstance;
|
if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
|
||||||
|
|
||||||
synchronized (MemStoreChunkPool.class) {
|
synchronized (MemStoreChunkPool.class) {
|
||||||
if (chunkPoolDisabled) return null;
|
if (chunkPoolDisabled) return null;
|
||||||
if (globalInstance != null) return globalInstance;
|
if (GLOBAL_INSTANCE != null) return GLOBAL_INSTANCE;
|
||||||
float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
|
float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
|
||||||
if (poolSizePercentage <= 0) {
|
if (poolSizePercentage <= 0) {
|
||||||
chunkPoolDisabled = true;
|
chunkPoolDisabled = true;
|
||||||
|
@ -210,8 +212,8 @@ public class MemStoreChunkPool {
|
||||||
int initialCount = (int) (initialCountPercentage * maxCount);
|
int initialCount = (int) (initialCountPercentage * maxCount);
|
||||||
LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
|
LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
|
||||||
+ ", max count " + maxCount + ", initial count " + initialCount);
|
+ ", max count " + maxCount + ", initial count " + initialCount);
|
||||||
globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
|
GLOBAL_INSTANCE = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
|
||||||
return globalInstance;
|
return GLOBAL_INSTANCE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -277,7 +277,7 @@ public class RegionCoprocessorHost
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int priority = matcher.group(3).trim().isEmpty() ?
|
int priority = matcher.group(3).trim().isEmpty() ?
|
||||||
Coprocessor.PRIORITY_USER : Integer.valueOf(matcher.group(3));
|
Coprocessor.PRIORITY_USER : Integer.parseInt(matcher.group(3));
|
||||||
String cfgSpec = null;
|
String cfgSpec = null;
|
||||||
try {
|
try {
|
||||||
cfgSpec = matcher.group(4);
|
cfgSpec = matcher.group(4);
|
||||||
|
|
|
@ -278,6 +278,8 @@ public class RegionServerCoprocessorHost extends
|
||||||
|
|
||||||
private RegionServerServices regionServerServices;
|
private RegionServerServices regionServerServices;
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="BC_UNCONFIRMED_CAST",
|
||||||
|
justification="Intentional; FB has trouble detecting isAssignableFrom")
|
||||||
public RegionServerEnvironment(final Class<?> implClass,
|
public RegionServerEnvironment(final Class<?> implClass,
|
||||||
final Coprocessor impl, final int priority, final int seq,
|
final Coprocessor impl, final int priority, final int seq,
|
||||||
final Configuration conf, final RegionServerServices services) {
|
final Configuration conf, final RegionServerServices services) {
|
||||||
|
@ -285,7 +287,7 @@ public class RegionServerCoprocessorHost extends
|
||||||
this.regionServerServices = services;
|
this.regionServerServices = services;
|
||||||
for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
|
for (Object itf : ClassUtils.getAllInterfaces(implClass)) {
|
||||||
Class<?> c = (Class<?>) itf;
|
Class<?> c = (Class<?>) itf;
|
||||||
if (SingletonCoprocessorService.class.isAssignableFrom(c)) {
|
if (SingletonCoprocessorService.class.isAssignableFrom(c)) {// FindBugs: BC_UNCONFIRMED_CAST
|
||||||
this.regionServerServices.registerService(
|
this.regionServerServices.registerService(
|
||||||
((SingletonCoprocessorService) impl).getService());
|
((SingletonCoprocessorService) impl).getService());
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -570,6 +570,8 @@ public class StoreFile {
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
|
||||||
|
justification="Will not overflow")
|
||||||
public static class WriterBuilder {
|
public static class WriterBuilder {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final CacheConfig cacheConf;
|
private final CacheConfig cacheConf;
|
||||||
|
@ -582,7 +584,6 @@ public class StoreFile {
|
||||||
private Path filePath;
|
private Path filePath;
|
||||||
private InetSocketAddress[] favoredNodes;
|
private InetSocketAddress[] favoredNodes;
|
||||||
private HFileContext fileContext;
|
private HFileContext fileContext;
|
||||||
private boolean shouldDropCacheBehind = false;
|
|
||||||
|
|
||||||
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
|
public WriterBuilder(Configuration conf, CacheConfig cacheConf,
|
||||||
FileSystem fs) {
|
FileSystem fs) {
|
||||||
|
@ -650,8 +651,8 @@ public class StoreFile {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
|
public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind/*NOT USED!!*/) {
|
||||||
this.shouldDropCacheBehind = shouldDropCacheBehind;
|
// TODO: HAS NO EFFECT!!! FIX!!
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
|
@ -757,9 +758,6 @@ public class StoreFile {
|
||||||
private Cell lastDeleteFamilyCell = null;
|
private Cell lastDeleteFamilyCell = null;
|
||||||
private long deleteFamilyCnt = 0;
|
private long deleteFamilyCnt = 0;
|
||||||
|
|
||||||
/** Bytes per Checksum */
|
|
||||||
protected int bytesPerChecksum;
|
|
||||||
|
|
||||||
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
|
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
|
||||||
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
|
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
|
||||||
* When flushing a memstore, we set TimeRange and use this variable to
|
* When flushing a memstore, we set TimeRange and use this variable to
|
||||||
|
|
|
@ -91,6 +91,8 @@ public class TimeRangeTracker implements Writable {
|
||||||
* If required, update the current TimestampRange to include timestamp
|
* If required, update the current TimestampRange to include timestamp
|
||||||
* @param timestamp the timestamp value to include
|
* @param timestamp the timestamp value to include
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||||
|
justification="Intentional")
|
||||||
void includeTimestamp(final long timestamp) {
|
void includeTimestamp(final long timestamp) {
|
||||||
// Do test outside of synchronization block. Synchronization in here can be problematic
|
// Do test outside of synchronization block. Synchronization in here can be problematic
|
||||||
// when many threads writing one Store -- they can all pile up trying to add in here.
|
// when many threads writing one Store -- they can all pile up trying to add in here.
|
||||||
|
|
|
@ -43,18 +43,20 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
|
||||||
* target cluster is an HBase cluster.
|
* target cluster is an HBase cluster.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||||
|
justification="Thinks zkw needs to be synchronized access but should be fine as is.")
|
||||||
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||||
implements Abortable {
|
implements Abortable {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
|
private static final Log LOG = LogFactory.getLog(HBaseReplicationEndpoint.class);
|
||||||
|
|
||||||
private ZooKeeperWatcher zkw = null;
|
private ZooKeeperWatcher zkw = null; // FindBugs: MT_CORRECTNESS
|
||||||
|
|
||||||
private List<ServerName> regionServers = new ArrayList<ServerName>(0);
|
private List<ServerName> regionServers = new ArrayList<ServerName>(0);
|
||||||
private volatile long lastRegionServerUpdate;
|
private long lastRegionServerUpdate;
|
||||||
|
|
||||||
protected void disconnect() {
|
protected void disconnect() {
|
||||||
if (zkw != null){
|
if (zkw != null) {
|
||||||
zkw.close();
|
zkw.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,7 +183,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
||||||
* Set the list of region servers for that peer
|
* Set the list of region servers for that peer
|
||||||
* @param regionServers list of addresses for the region servers
|
* @param regionServers list of addresses for the region servers
|
||||||
*/
|
*/
|
||||||
public void setRegionServers(List<ServerName> regionServers) {
|
public synchronized void setRegionServers(List<ServerName> regionServers) {
|
||||||
this.regionServers = regionServers;
|
this.regionServers = regionServers;
|
||||||
lastRegionServerUpdate = System.currentTimeMillis();
|
lastRegionServerUpdate = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,7 +116,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
|
||||||
* Skips the entries which has original seqId. Only entries persisted via distributed log replay
|
* Skips the entries which has original seqId. Only entries persisted via distributed log replay
|
||||||
* have their original seq Id fields set.
|
* have their original seq Id fields set.
|
||||||
*/
|
*/
|
||||||
private class SkipReplayedEditsFilter extends BaseWALEntryFilter {
|
private static class SkipReplayedEditsFilter extends BaseWALEntryFilter {
|
||||||
@Override
|
@Override
|
||||||
public Entry filter(Entry entry) {
|
public Entry filter(Entry entry) {
|
||||||
// if orig seq id is set, skip replaying the entry
|
// if orig seq id is set, skip replaying the entry
|
||||||
|
|
|
@ -279,7 +279,6 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
|
||||||
Configuration conf = env.getConfiguration();
|
Configuration conf = env.getConfiguration();
|
||||||
fs = FileSystem.get(conf);
|
fs = FileSystem.get(conf);
|
||||||
for(Pair<byte[], String> el: familyPaths) {
|
for(Pair<byte[], String> el: familyPaths) {
|
||||||
Path p = new Path(el.getSecond());
|
|
||||||
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
|
Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst()));
|
||||||
if(!fs.exists(stageFamily)) {
|
if(!fs.exists(stageFamily)) {
|
||||||
fs.mkdirs(stageFamily);
|
fs.mkdirs(stageFamily);
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -107,7 +108,7 @@ public class TableAuthManager implements Closeable {
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private ZKPermissionWatcher zkperms;
|
private ZKPermissionWatcher zkperms;
|
||||||
private volatile long mtime;
|
private AtomicLong mtime = new AtomicLong(0);
|
||||||
|
|
||||||
private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
|
private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -219,7 +220,7 @@ public class TableAuthManager implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
globalCache = newCache;
|
globalCache = newCache;
|
||||||
mtime++;
|
mtime.incrementAndGet();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// Never happens
|
// Never happens
|
||||||
LOG.error("Error occured while updating the global cache", e);
|
LOG.error("Error occured while updating the global cache", e);
|
||||||
|
@ -247,7 +248,7 @@ public class TableAuthManager implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
tableCache.put(table, newTablePerms);
|
tableCache.put(table, newTablePerms);
|
||||||
mtime++;
|
mtime.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -271,7 +272,7 @@ public class TableAuthManager implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
nsCache.put(namespace, newTablePerms);
|
nsCache.put(namespace, newTablePerms);
|
||||||
mtime++;
|
mtime.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
private PermissionCache<TablePermission> getTablePermissions(TableName table) {
|
private PermissionCache<TablePermission> getTablePermissions(TableName table) {
|
||||||
|
@ -335,12 +336,6 @@ public class TableAuthManager implements Closeable {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean authorize(List<TablePermission> perms,
|
|
||||||
TableName table, byte[] family,
|
|
||||||
Permission.Action action) {
|
|
||||||
return authorize(perms, table, family, null, action);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean authorize(List<TablePermission> perms,
|
private boolean authorize(List<TablePermission> perms,
|
||||||
TableName table, byte[] family,
|
TableName table, byte[] family,
|
||||||
byte[] qualifier, Permission.Action action) {
|
byte[] qualifier, Permission.Action action) {
|
||||||
|
@ -740,7 +735,7 @@ public class TableAuthManager implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMTime() {
|
public long getMTime() {
|
||||||
return mtime;
|
return mtime.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
|
private static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
|
||||||
|
|
|
@ -124,7 +124,7 @@ public class AuthenticationTokenSecretManager
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected byte[] createPassword(AuthenticationTokenIdentifier identifier) {
|
protected synchronized byte[] createPassword(AuthenticationTokenIdentifier identifier) {
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
AuthenticationKey secretKey = currentKey;
|
AuthenticationKey secretKey = currentKey;
|
||||||
identifier.setKeyId(secretKey.getKeyId());
|
identifier.setKeyId(secretKey.getKeyId());
|
||||||
|
@ -229,7 +229,7 @@ public class AuthenticationTokenSecretManager
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
AuthenticationKey getCurrentKey() {
|
synchronized AuthenticationKey getCurrentKey() {
|
||||||
return currentKey;
|
return currentKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,8 +338,11 @@ public class AuthenticationTokenSecretManager
|
||||||
|
|
||||||
// clear any expired
|
// clear any expired
|
||||||
removeExpiredKeys();
|
removeExpiredKeys();
|
||||||
|
long localLastKeyUpdate;
|
||||||
if (lastKeyUpdate + keyUpdateInterval < now) {
|
synchronized (this) {
|
||||||
|
localLastKeyUpdate = lastKeyUpdate;
|
||||||
|
}
|
||||||
|
if (localLastKeyUpdate + keyUpdateInterval < now) {
|
||||||
// roll a new master key
|
// roll a new master key
|
||||||
rollCurrentKey();
|
rollCurrentKey();
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,8 @@ import com.google.protobuf.Service;
|
||||||
* visibility labels
|
* visibility labels
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
|
||||||
|
justification="FIX visibilityLabelService; Make Synchronized!!!")
|
||||||
public class VisibilityController extends BaseMasterAndRegionObserver implements
|
public class VisibilityController extends BaseMasterAndRegionObserver implements
|
||||||
VisibilityLabelsService.Interface, CoprocessorService {
|
VisibilityLabelsService.Interface, CoprocessorService {
|
||||||
|
|
||||||
|
@ -134,7 +136,7 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
|
||||||
private Map<InternalScanner,String> scannerOwners =
|
private Map<InternalScanner,String> scannerOwners =
|
||||||
new MapMaker().weakKeys().makeMap();
|
new MapMaker().weakKeys().makeMap();
|
||||||
|
|
||||||
private VisibilityLabelService visibilityLabelService;
|
private VisibilityLabelService visibilityLabelService; // FindBugs: MT_CORRECTNESS FIX!!!
|
||||||
|
|
||||||
/** if we are active, usually true, only not true if "hbase.security.authorization"
|
/** if we are active, usually true, only not true if "hbase.security.authorization"
|
||||||
has been set to false in site configuration */
|
has been set to false in site configuration */
|
||||||
|
@ -272,8 +274,10 @@ public class VisibilityController extends BaseMasterAndRegionObserver implements
|
||||||
// Read the entire labels table and populate the zk
|
// Read the entire labels table and populate the zk
|
||||||
if (e.getEnvironment().getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
|
if (e.getEnvironment().getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
|
||||||
this.labelsRegion = true;
|
this.labelsRegion = true;
|
||||||
this.accessControllerAvailable = CoprocessorHost.getLoadedCoprocessors()
|
synchronized (this) {
|
||||||
|
this.accessControllerAvailable = CoprocessorHost.getLoadedCoprocessors()
|
||||||
.contains(AccessController.class.getName());
|
.contains(AccessController.class.getName());
|
||||||
|
}
|
||||||
// Defer the init of VisibilityLabelService on labels region until it is in recovering state.
|
// Defer the init of VisibilityLabelService on labels region until it is in recovering state.
|
||||||
if (!e.getEnvironment().getRegion().isRecovering()) {
|
if (!e.getEnvironment().getRegion().isRecovering()) {
|
||||||
initVisibilityLabelService(e.getEnvironment());
|
initVisibilityLabelService(e.getEnvironment());
|
||||||
|
|
|
@ -270,7 +270,7 @@ public class ExportSnapshot extends Configured implements Tool {
|
||||||
InputStream in = openSourceFile(context, inputInfo);
|
InputStream in = openSourceFile(context, inputInfo);
|
||||||
int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
|
int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
|
||||||
if (Integer.MAX_VALUE != bandwidthMB) {
|
if (Integer.MAX_VALUE != bandwidthMB) {
|
||||||
in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
|
in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024L * 1024L);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -634,7 +634,6 @@ public class ExportSnapshot extends Configured implements Tool {
|
||||||
@Override
|
@Override
|
||||||
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
|
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
|
||||||
Configuration conf = context.getConfiguration();
|
Configuration conf = context.getConfiguration();
|
||||||
String snapshotName = conf.get(CONF_SNAPSHOT_NAME);
|
|
||||||
Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
|
Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR));
|
||||||
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
|
FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf);
|
||||||
|
|
||||||
|
|
|
@ -263,6 +263,8 @@ public final class SnapshotInfo extends Configured implements Tool {
|
||||||
private SnapshotManifest snapshotManifest;
|
private SnapshotManifest snapshotManifest;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Intentional")
|
||||||
public int run(String[] args) throws IOException, InterruptedException {
|
public int run(String[] args) throws IOException, InterruptedException {
|
||||||
final Configuration conf = getConf();
|
final Configuration conf = getConf();
|
||||||
boolean listSnapshots = false;
|
boolean listSnapshots = false;
|
||||||
|
@ -300,7 +302,7 @@ public final class SnapshotInfo extends Configured implements Tool {
|
||||||
printUsageAndExit();
|
printUsageAndExit();
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
printUsageAndExit();
|
printUsageAndExit(); // FindBugs: REC_CATCH_EXCEPTION
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,21 +54,10 @@ import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Multimap;
|
|
||||||
import com.google.common.collect.Ordering;
|
|
||||||
import com.google.common.collect.TreeMultimap;
|
|
||||||
import com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -90,11 +79,11 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
import org.apache.hadoop.hbase.RegionLocations;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
@ -147,6 +136,15 @@ import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
import com.google.common.base.Joiner;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Multimap;
|
||||||
|
import com.google.common.collect.Ordering;
|
||||||
|
import com.google.common.collect.TreeMultimap;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HBaseFsck (hbck) is a tool for checking and repairing region consistency and
|
* HBaseFsck (hbck) is a tool for checking and repairing region consistency and
|
||||||
* table integrity problems in a corrupted HBase.
|
* table integrity problems in a corrupted HBase.
|
||||||
|
@ -264,7 +262,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
private Path sidelineDir = null;
|
private Path sidelineDir = null;
|
||||||
|
|
||||||
private boolean rerun = false; // if we tried to fix something, rerun hbck
|
private boolean rerun = false; // if we tried to fix something, rerun hbck
|
||||||
private static boolean summary = false; // if we want to print less output
|
private static boolean SUMMARY = false; // if we want to print less output
|
||||||
private boolean checkMetaOnly = false;
|
private boolean checkMetaOnly = false;
|
||||||
private boolean checkRegionBoundaries = false;
|
private boolean checkRegionBoundaries = false;
|
||||||
private boolean ignorePreCheckPermission = false; // if pre-check permission
|
private boolean ignorePreCheckPermission = false; // if pre-check permission
|
||||||
|
@ -3580,9 +3578,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
this.metaEntry = metaEntry;
|
this.metaEntry = metaEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getReplicaId() {
|
public synchronized int getReplicaId() {
|
||||||
if (metaEntry != null) return metaEntry.getReplicaId();
|
return metaEntry != null? metaEntry.getReplicaId(): deployedReplicaId;
|
||||||
return deployedReplicaId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void addServer(HRegionInfo hri, ServerName server) {
|
public synchronized void addServer(HRegionInfo hri, ServerName server) {
|
||||||
|
@ -3887,7 +3884,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
errorList.add(errorCode);
|
errorList.add(errorCode);
|
||||||
if (!summary) {
|
if (!getSUMMARY()) {
|
||||||
System.out.println("ERROR: " + message);
|
System.out.println("ERROR: " + message);
|
||||||
}
|
}
|
||||||
errorCount++;
|
errorCount++;
|
||||||
|
@ -3929,7 +3926,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void report(String message) {
|
public synchronized void report(String message) {
|
||||||
if (! summary) {
|
if (!getSUMMARY()) {
|
||||||
System.out.println("ERROR: " + message);
|
System.out.println("ERROR: " + message);
|
||||||
}
|
}
|
||||||
showProgress = 0;
|
showProgress = 0;
|
||||||
|
@ -3955,11 +3952,15 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void print(String message) {
|
public synchronized void print(String message) {
|
||||||
if (!summary) {
|
if (!getSUMMARY()) {
|
||||||
System.out.println(message);
|
System.out.println(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized static boolean getSUMMARY() {
|
||||||
|
return SUMMARY;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean tableHasErrors(TableInfo table) {
|
public boolean tableHasErrors(TableInfo table) {
|
||||||
return errorTables.contains(table);
|
return errorTables.contains(table);
|
||||||
|
@ -3981,7 +3982,7 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void progress() {
|
public synchronized void progress() {
|
||||||
if (showProgress++ == progressThreshold) {
|
if (showProgress++ == progressThreshold) {
|
||||||
if (!summary) {
|
if (!getSUMMARY()) {
|
||||||
System.out.print(".");
|
System.out.print(".");
|
||||||
}
|
}
|
||||||
showProgress = 0;
|
showProgress = 0;
|
||||||
|
@ -4211,8 +4212,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
* Set summary mode.
|
* Set summary mode.
|
||||||
* Print only summary of the tables and status (OK or INCONSISTENT)
|
* Print only summary of the tables and status (OK or INCONSISTENT)
|
||||||
*/
|
*/
|
||||||
void setSummary() {
|
synchronized static void setSummary() {
|
||||||
summary = true;
|
SUMMARY = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -68,12 +68,17 @@ public class IdReadWriteLock {
|
||||||
/** For testing */
|
/** For testing */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
int purgeAndGetEntryPoolSize() {
|
int purgeAndGetEntryPoolSize() {
|
||||||
System.gc();
|
gc();
|
||||||
Threads.sleep(200);
|
Threads.sleep(200);
|
||||||
lockPool.purge();
|
lockPool.purge();
|
||||||
return lockPool.size();
|
return lockPool.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DM_GC", justification="Intentional")
|
||||||
|
private void gc() {
|
||||||
|
System.gc();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
|
public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
|
||||||
for (ReentrantReadWriteLock readWriteLock;;) {
|
for (ReentrantReadWriteLock readWriteLock;;) {
|
||||||
|
|
|
@ -104,11 +104,8 @@ public class MetaUtils {
|
||||||
* @return HRegion for meta region
|
* @return HRegion for meta region
|
||||||
* @throws IOException e
|
* @throws IOException e
|
||||||
*/
|
*/
|
||||||
public HRegion getMetaRegion() throws IOException {
|
public synchronized HRegion getMetaRegion() throws IOException {
|
||||||
if (this.metaRegion == null) {
|
return this.metaRegion == null? openMetaRegion(): this.metaRegion;
|
||||||
openMetaRegion();
|
|
||||||
}
|
|
||||||
return this.metaRegion;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -46,6 +46,8 @@ import java.util.RandomAccess;
|
||||||
* <p>
|
* <p>
|
||||||
* Iterators are read-only. They cannot be used to remove elements.
|
* Iterators are read-only. They cannot be used to remove elements.
|
||||||
*/
|
*/
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UG_SYNC_SET_UNSYNC_GET",
|
||||||
|
justification="TODO: synchronization in here needs review!!!")
|
||||||
public class SortedList<E> implements List<E>, RandomAccess {
|
public class SortedList<E> implements List<E>, RandomAccess {
|
||||||
private volatile List<E> list;
|
private volatile List<E> list;
|
||||||
private final Comparator<? super E> comparator;
|
private final Comparator<? super E> comparator;
|
||||||
|
@ -80,7 +82,7 @@ public class SortedList<E> implements List<E>, RandomAccess {
|
||||||
* method to get a reference for iterating over using the RandomAccess
|
* method to get a reference for iterating over using the RandomAccess
|
||||||
* pattern.
|
* pattern.
|
||||||
*/
|
*/
|
||||||
public List<E> get() {
|
public List<E> get() { // FindBugs: UG_SYNC_SET_UNSYNC_GET complaint. Fix!!
|
||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +187,7 @@ public class SortedList<E> implements List<E>, RandomAccess {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public E get(int index) {
|
public synchronized E get(int index) {
|
||||||
return list.get(index);
|
return list.get(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,6 +47,8 @@ public class ZKDataMigrator extends Configured implements Tool {
|
||||||
private static final Log LOG = LogFactory.getLog(ZKDataMigrator.class);
|
private static final Log LOG = LogFactory.getLog(ZKDataMigrator.class);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||||
|
justification="Intended")
|
||||||
public int run(String[] as) throws Exception {
|
public int run(String[] as) throws Exception {
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
ZooKeeperWatcher zkw = null;
|
ZooKeeperWatcher zkw = null;
|
||||||
|
|
|
@ -2276,9 +2276,7 @@ public class WALSplitter {
|
||||||
} else {
|
} else {
|
||||||
((Put) m).add(cell);
|
((Put) m).add(cell);
|
||||||
}
|
}
|
||||||
if (m != null) {
|
m.setDurability(durability);
|
||||||
m.setDurability(durability);
|
|
||||||
}
|
|
||||||
previousCell = cell;
|
previousCell = cell;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -338,7 +338,7 @@ public class TestSplitLogManager {
|
||||||
assertTrue(task == task2);
|
assertTrue(task == task2);
|
||||||
LOG.debug("task = " + task);
|
LOG.debug("task = " + task);
|
||||||
assertEquals(1L, tot_mgr_resubmit.get());
|
assertEquals(1L, tot_mgr_resubmit.get());
|
||||||
assertEquals(1, task.incarnation);
|
assertEquals(1, task.incarnation.get());
|
||||||
assertEquals(0, task.unforcedResubmits.get());
|
assertEquals(0, task.unforcedResubmits.get());
|
||||||
assertTrue(task.isOrphan());
|
assertTrue(task.isOrphan());
|
||||||
assertTrue(task.isUnassigned());
|
assertTrue(task.isUnassigned());
|
||||||
|
|
Loading…
Reference in New Issue