HBASE-19811 Fix findbugs and error-prone warnings in hbase-server (branch-2)
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
1e5fc1ed63
commit
b1269ec57f
|
@ -33,6 +33,7 @@ public class SslRMIServerSocketFactorySecure extends SslRMIServerSocketFactory {
|
|||
@Override
|
||||
public ServerSocket createServerSocket(int port) throws IOException {
|
||||
return new ServerSocket(port) {
|
||||
@Override
|
||||
public Socket accept() throws IOException {
|
||||
Socket socket = super.accept();
|
||||
SSLSocketFactory sslSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
|
||||
|
|
|
@ -227,6 +227,7 @@ public class EntityLock {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
final LockHeartbeatRequest lockHeartbeatRequest =
|
||||
LockHeartbeatRequest.newBuilder().setProcId(procId).build();
|
||||
|
|
|
@ -79,7 +79,7 @@ public class ConfigurationManager {
|
|||
// notified when the configuration is reloaded from disk. This is a set
|
||||
// constructed from a WeakHashMap, whose entries would be removed if the
|
||||
// observer classes go out of scope.
|
||||
private Set<ConfigurationObserver> configurationObservers =
|
||||
private final Set<ConfigurationObserver> configurationObservers =
|
||||
Collections.newSetFromMap(new WeakHashMap<ConfigurationObserver,
|
||||
Boolean>());
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements
|
|||
|
||||
private TaskExecutor splitTaskExecutor;
|
||||
|
||||
private AtomicInteger taskReadySeq = new AtomicInteger(0);
|
||||
private final AtomicInteger taskReadySeq = new AtomicInteger(0);
|
||||
private volatile String currentTask = null;
|
||||
private int currentVersion;
|
||||
private volatile boolean shouldStop = false;
|
||||
|
|
|
@ -118,7 +118,7 @@ extends RowProcessorService implements RegionCoprocessor {
|
|||
Class<?> cls;
|
||||
try {
|
||||
cls = Class.forName(className);
|
||||
RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.newInstance();
|
||||
RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.getDeclaredConstructor().newInstance();
|
||||
if (request.hasRowProcessorInitializerMessageName()) {
|
||||
Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
|
||||
.asSubclass(Message.class);
|
||||
|
@ -141,11 +141,7 @@ extends RowProcessorService implements RegionCoprocessor {
|
|||
ci.initialize(s);
|
||||
}
|
||||
return ci;
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException(e);
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -572,6 +572,7 @@ public abstract class CoprocessorHost<C extends Coprocessor, E extends Coprocess
|
|||
return this.result;
|
||||
}
|
||||
|
||||
@Override
|
||||
void callObserver() throws IOException {
|
||||
Optional<O> observer = observerGetter.apply(getEnvironment().getInstance());
|
||||
if (observer.isPresent()) {
|
||||
|
|
|
@ -48,6 +48,7 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
|
|||
this.bypassable = bypassable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public E getEnvironment() {
|
||||
return env;
|
||||
}
|
||||
|
@ -60,6 +61,7 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
|
|||
return this.bypassable;
|
||||
};
|
||||
|
||||
@Override
|
||||
public void bypass() {
|
||||
if (!this.bypassable) {
|
||||
throw new UnsupportedOperationException("This method does not support 'bypass'.");
|
||||
|
@ -82,6 +84,7 @@ public class ObserverContextImpl<E extends CoprocessorEnvironment> implements Ob
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<User> getCaller() {
|
||||
return Optional.ofNullable(caller);
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ class ReadOnlyConfiguration extends Configuration {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void reloadConfiguration() {
|
||||
public synchronized void reloadConfiguration() {
|
||||
// This is a write operation. We need to allow it though because if any Configuration in
|
||||
// current JVM context calls addDefaultResource, this forces a reload of all Configurations
|
||||
// (all Configurations are 'registered' by the default constructor. Rather than turn
|
||||
|
@ -100,10 +100,12 @@ class ReadOnlyConfiguration extends Configuration {
|
|||
return conf.get(name);
|
||||
}
|
||||
|
||||
// Do not add @Override because it is not in Hadoop 2.6.5
|
||||
public void setAllowNullValueProperties(boolean val) {
|
||||
throw new UnsupportedOperationException("Read-only Configuration");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTrimmed(String name) {
|
||||
return conf.getTrimmed(name);
|
||||
}
|
||||
|
@ -129,12 +131,12 @@ class ReadOnlyConfiguration extends Configuration {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void unset(String name) {
|
||||
public synchronized void unset(String name) {
|
||||
throw new UnsupportedOperationException("Read-only Configuration");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIfUnset(String name, String value) {
|
||||
public synchronized void setIfUnset(String name, String value) {
|
||||
throw new UnsupportedOperationException("Read-only Configuration");
|
||||
}
|
||||
|
||||
|
@ -239,7 +241,7 @@ class ReadOnlyConfiguration extends Configuration {
|
|||
}
|
||||
|
||||
@Override
|
||||
public String[] getPropertySources(String name) {
|
||||
public synchronized String[] getPropertySources(String name) {
|
||||
return conf.getPropertySources(name);
|
||||
}
|
||||
|
||||
|
@ -326,7 +328,7 @@ class ReadOnlyConfiguration extends Configuration {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Class<?>[] getClasses(String name, Class<?>[] defaultValue) {
|
||||
public Class<?>[] getClasses(String name, Class<?>... defaultValue) {
|
||||
return conf.getClasses(name, defaultValue);
|
||||
}
|
||||
|
||||
|
@ -422,7 +424,7 @@ class ReadOnlyConfiguration extends Configuration {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setQuietMode(boolean quietmode) {
|
||||
public synchronized void setQuietMode(boolean quietmode) {
|
||||
throw new UnsupportedOperationException("Read-only Configuration");
|
||||
}
|
||||
|
||||
|
|
|
@ -119,6 +119,7 @@ final public class FilterWrapper extends Filter {
|
|||
return filterCell(c);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReturnCode filterCell(final Cell c) throws IOException {
|
||||
return this.filter.filterCell(c);
|
||||
}
|
||||
|
|
|
@ -235,16 +235,15 @@ public class HFileSystem extends FilterFileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
/**
|
||||
* Returns a brand new instance of the FileSystem. It does not use
|
||||
* the FileSystem.Cache. In newer versions of HDFS, we can directly
|
||||
* invoke FileSystem.newInstance(Configuration).
|
||||
*
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @return A new instance of the filesystem
|
||||
*/
|
||||
private static FileSystem newInstanceFileSystem(Configuration conf)
|
||||
throws IOException {
|
||||
private static FileSystem newInstanceFileSystem(Configuration conf) throws IOException {
|
||||
URI uri = FileSystem.getDefaultUri(conf);
|
||||
FileSystem fs = null;
|
||||
Class<?> clazz = conf.getClass("fs." + uri.getScheme() + ".impl", null);
|
||||
|
@ -361,47 +360,43 @@ public class HFileSystem extends FilterFileSystem {
|
|||
|
||||
private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
|
||||
final ReorderBlocks lrb, final Configuration conf) {
|
||||
return (ClientProtocol) Proxy.newProxyInstance
|
||||
(cp.getClass().getClassLoader(),
|
||||
new Class[]{ClientProtocol.class, Closeable.class},
|
||||
new InvocationHandler() {
|
||||
public Object invoke(Object proxy, Method method,
|
||||
Object[] args) throws Throwable {
|
||||
try {
|
||||
if ((args == null || args.length == 0)
|
||||
&& "close".equals(method.getName())) {
|
||||
RPC.stopProxy(cp);
|
||||
return null;
|
||||
} else {
|
||||
Object res = method.invoke(cp, args);
|
||||
if (res != null && args != null && args.length == 3
|
||||
&& "getBlockLocations".equals(method.getName())
|
||||
&& res instanceof LocatedBlocks
|
||||
&& args[0] instanceof String
|
||||
&& args[0] != null) {
|
||||
lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
} catch (InvocationTargetException ite) {
|
||||
// We will have this for all the exception, checked on not, sent
|
||||
// by any layer, including the functional exception
|
||||
Throwable cause = ite.getCause();
|
||||
if (cause == null){
|
||||
throw new RuntimeException(
|
||||
"Proxy invocation failed and getCause is null", ite);
|
||||
}
|
||||
if (cause instanceof UndeclaredThrowableException) {
|
||||
Throwable causeCause = cause.getCause();
|
||||
if (causeCause == null) {
|
||||
throw new RuntimeException("UndeclaredThrowableException had null cause!");
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
throw cause;
|
||||
return (ClientProtocol) Proxy.newProxyInstance(cp.getClass().getClassLoader(),
|
||||
new Class[]{ClientProtocol.class, Closeable.class}, new InvocationHandler() {
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||
try {
|
||||
if ((args == null || args.length == 0) && "close".equals(method.getName())) {
|
||||
RPC.stopProxy(cp);
|
||||
return null;
|
||||
} else {
|
||||
Object res = method.invoke(cp, args);
|
||||
if (res != null && args != null && args.length == 3
|
||||
&& "getBlockLocations".equals(method.getName())
|
||||
&& res instanceof LocatedBlocks
|
||||
&& args[0] instanceof String
|
||||
&& args[0] != null) {
|
||||
lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
});
|
||||
} catch (InvocationTargetException ite) {
|
||||
// We will have this for all the exception, checked on not, sent
|
||||
// by any layer, including the functional exception
|
||||
Throwable cause = ite.getCause();
|
||||
if (cause == null){
|
||||
throw new RuntimeException("Proxy invocation failed and getCause is null", ite);
|
||||
}
|
||||
if (cause instanceof UndeclaredThrowableException) {
|
||||
Throwable causeCause = cause.getCause();
|
||||
if (causeCause == null) {
|
||||
throw new RuntimeException("UndeclaredThrowableException had null cause!");
|
||||
}
|
||||
cause = cause.getCause();
|
||||
}
|
||||
throw cause;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -424,6 +419,7 @@ public class HFileSystem extends FilterFileSystem {
|
|||
* datanode is actually dead, so if we use it it will timeout.
|
||||
*/
|
||||
static class ReorderWALBlocks implements ReorderBlocks {
|
||||
@Override
|
||||
public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
|
||||
throws IOException {
|
||||
|
||||
|
@ -481,6 +477,7 @@ public class HFileSystem extends FilterFileSystem {
|
|||
* createNonRecursive. This is a hadoop bug and when it is fixed in Hadoop,
|
||||
* this definition will go away.
|
||||
*/
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public FSDataOutputStream createNonRecursive(Path f,
|
||||
boolean overwrite,
|
||||
|
|
|
@ -22,15 +22,16 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -73,7 +74,7 @@ public class FSDataInputStreamWrapper implements Closeable {
|
|||
*/
|
||||
private volatile FSDataInputStream stream = null;
|
||||
private volatile FSDataInputStream streamNoFsChecksum = null;
|
||||
private Object streamNoFsChecksumFirstCreateLock = new Object();
|
||||
private final Object streamNoFsChecksumFirstCreateLock = new Object();
|
||||
|
||||
// The configuration states that we should validate hbase checksums
|
||||
private boolean useHBaseChecksumConfigured;
|
||||
|
@ -86,7 +87,7 @@ public class FSDataInputStreamWrapper implements Closeable {
|
|||
|
||||
// In the case of a checksum failure, do these many succeeding
|
||||
// reads without hbase checksum verification.
|
||||
private volatile int hbaseChecksumOffCount = -1;
|
||||
private AtomicInteger hbaseChecksumOffCount = new AtomicInteger(-1);
|
||||
|
||||
private Boolean instanceOfCanUnbuffer = null;
|
||||
// Using reflection to get org.apache.hadoop.fs.CanUnbuffer#unbuffer method to avoid compilation
|
||||
|
@ -216,7 +217,7 @@ public class FSDataInputStreamWrapper implements Closeable {
|
|||
}
|
||||
if (!partOfConvoy) {
|
||||
this.useHBaseChecksum = false;
|
||||
this.hbaseChecksumOffCount = offCount;
|
||||
this.hbaseChecksumOffCount.set(offCount);
|
||||
}
|
||||
return this.stream;
|
||||
}
|
||||
|
@ -224,7 +225,7 @@ public class FSDataInputStreamWrapper implements Closeable {
|
|||
/** Report that checksum was ok, so we may ponder going back to HBase checksum. */
|
||||
public void checksumOk() {
|
||||
if (this.useHBaseChecksumConfigured && !this.useHBaseChecksum
|
||||
&& (this.hbaseChecksumOffCount-- < 0)) {
|
||||
&& (this.hbaseChecksumOffCount.getAndDecrement() < 0)) {
|
||||
// The stream we need is already open (because we were using HBase checksum in the past).
|
||||
assert this.streamNoFsChecksum != null;
|
||||
this.useHBaseChecksum = true;
|
||||
|
|
|
@ -129,35 +129,41 @@ public class HalfStoreFileReader extends StoreFileReader {
|
|||
final HFileScanner delegate = s;
|
||||
public boolean atEnd = false;
|
||||
|
||||
@Override
|
||||
public Cell getKey() {
|
||||
if (atEnd) return null;
|
||||
return delegate.getKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getKeyString() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getKeyString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer getValue() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValueString() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getValueString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell getCell() {
|
||||
if (atEnd) return null;
|
||||
|
||||
return delegate.getCell();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean next() throws IOException {
|
||||
if (atEnd) return false;
|
||||
|
||||
|
@ -200,10 +206,12 @@ public class HalfStoreFileReader extends StoreFileReader {
|
|||
return (this.delegate.getReader().getComparator().compare(splitCell, getKey())) > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() {
|
||||
return this.delegate.getReader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSeeked() {
|
||||
return this.delegate.isSeeked();
|
||||
}
|
||||
|
|
|
@ -222,6 +222,7 @@ public class Reference {
|
|||
return Arrays.hashCode(splitkey) + region.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null) return false;
|
||||
|
|
|
@ -507,6 +507,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
|
|||
* @param syncBlock will call hsync if true, otherwise hflush.
|
||||
* @return A CompletableFuture that hold the acked length after flushing.
|
||||
*/
|
||||
@Override
|
||||
public CompletableFuture<Long> flush(boolean syncBlock) {
|
||||
CompletableFuture<Long> future = new CompletableFuture<>();
|
||||
flush0(future, syncBlock);
|
||||
|
|
|
@ -126,6 +126,7 @@ public interface BlockCache extends Iterable<CachedBlock> {
|
|||
/**
|
||||
* @return Iterator over the blocks in the cache.
|
||||
*/
|
||||
@Override
|
||||
Iterator<CachedBlock> iterator();
|
||||
|
||||
/**
|
||||
|
|
|
@ -156,6 +156,7 @@ public class CompoundBloomFilter extends CompoundBloomFilterBase
|
|||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsAutoLoading() {
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -545,7 +545,7 @@ public class FixedFileTrailer {
|
|||
try {
|
||||
// If null, it should be the Bytes.BYTES_RAWCOMPARATOR
|
||||
if (klass != null) {
|
||||
CellComparator comp = klass.newInstance();
|
||||
CellComparator comp = klass.getDeclaredConstructor().newInstance();
|
||||
// if the name wasn't one of the legacy names, maybe its a legit new
|
||||
// kind of comparator.
|
||||
comparatorClassName = klass.getName();
|
||||
|
@ -589,12 +589,8 @@ public class FixedFileTrailer {
|
|||
public static CellComparator createComparator(
|
||||
String comparatorClassName) throws IOException {
|
||||
try {
|
||||
Class<? extends CellComparator> comparatorClass = getComparatorClass(comparatorClassName);
|
||||
return comparatorClass != null ? comparatorClass.newInstance() : null;
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException("Comparator class " + comparatorClassName +
|
||||
" is not instantiable", e);
|
||||
} catch (IllegalAccessException e) {
|
||||
return getComparatorClass(comparatorClassName).getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Comparator class " + comparatorClassName +
|
||||
" is not instantiable", e);
|
||||
}
|
||||
|
|
|
@ -255,42 +255,43 @@ public class HFileBlock implements Cacheable {
|
|||
*/
|
||||
static final CacheableDeserializer<Cacheable> BLOCK_DESERIALIZER =
|
||||
new CacheableDeserializer<Cacheable>() {
|
||||
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
|
||||
@Override
|
||||
public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType)
|
||||
throws IOException {
|
||||
// The buf has the file block followed by block metadata.
|
||||
// Set limit to just before the BLOCK_METADATA_SPACE then rewind.
|
||||
buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
|
||||
// Get a new buffer to pass the HFileBlock for it to 'own'.
|
||||
ByteBuff newByteBuff;
|
||||
if (reuse) {
|
||||
newByteBuff = buf.slice();
|
||||
} else {
|
||||
int len = buf.limit();
|
||||
newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len));
|
||||
newByteBuff.put(0, buf, buf.position(), len);
|
||||
}
|
||||
// Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
|
||||
buf.position(buf.limit());
|
||||
buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
|
||||
boolean usesChecksum = buf.get() == (byte)1;
|
||||
long offset = buf.getLong();
|
||||
int nextBlockOnDiskSize = buf.getInt();
|
||||
HFileBlock hFileBlock =
|
||||
new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
|
||||
return hFileBlock;
|
||||
}
|
||||
// The buf has the file block followed by block metadata.
|
||||
// Set limit to just before the BLOCK_METADATA_SPACE then rewind.
|
||||
buf.limit(buf.limit() - BLOCK_METADATA_SPACE).rewind();
|
||||
// Get a new buffer to pass the HFileBlock for it to 'own'.
|
||||
ByteBuff newByteBuff;
|
||||
if (reuse) {
|
||||
newByteBuff = buf.slice();
|
||||
} else {
|
||||
int len = buf.limit();
|
||||
newByteBuff = new SingleByteBuff(ByteBuffer.allocate(len));
|
||||
newByteBuff.put(0, buf, buf.position(), len);
|
||||
}
|
||||
// Read out the BLOCK_METADATA_SPACE content and shove into our HFileBlock.
|
||||
buf.position(buf.limit());
|
||||
buf.limit(buf.limit() + HFileBlock.BLOCK_METADATA_SPACE);
|
||||
boolean usesChecksum = buf.get() == (byte) 1;
|
||||
long offset = buf.getLong();
|
||||
int nextBlockOnDiskSize = buf.getInt();
|
||||
HFileBlock hFileBlock =
|
||||
new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null);
|
||||
return hFileBlock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDeserialiserIdentifier() {
|
||||
return DESERIALIZER_IDENTIFIER;
|
||||
}
|
||||
@Override
|
||||
public int getDeserialiserIdentifier() {
|
||||
return DESERIALIZER_IDENTIFIER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HFileBlock deserialize(ByteBuff b) throws IOException {
|
||||
// Used only in tests
|
||||
return deserialize(b, false, MemoryType.EXCLUSIVE);
|
||||
}
|
||||
};
|
||||
@Override
|
||||
public HFileBlock deserialize(ByteBuff b) throws IOException {
|
||||
// Used only in tests
|
||||
return deserialize(b, false, MemoryType.EXCLUSIVE);
|
||||
}
|
||||
};
|
||||
|
||||
private static final int DESERIALIZER_IDENTIFIER;
|
||||
static {
|
||||
|
@ -1480,6 +1481,7 @@ public class HFileBlock implements Cacheable {
|
|||
this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockIterator blockRange(final long startOffset, final long endOffset) {
|
||||
final FSReader owner = this; // handle for inner class
|
||||
return new BlockIterator() {
|
||||
|
|
|
@ -947,7 +947,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
|
|||
|
||||
Cell ret;
|
||||
int cellBufSize = getKVBufSize();
|
||||
long seqId = 0l;
|
||||
long seqId = 0L;
|
||||
if (this.reader.shouldIncludeMemStoreTS()) {
|
||||
seqId = currMemstoreTS;
|
||||
}
|
||||
|
|
|
@ -166,5 +166,6 @@ public interface HFileScanner extends Shipper, Closeable {
|
|||
/**
|
||||
* Close this HFile scanner and do necessary cleanup.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
|
|
|
@ -448,6 +448,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
* @param cacheKey block's cache key
|
||||
* @param buf block buffer
|
||||
*/
|
||||
@Override
|
||||
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
|
||||
cacheBlock(cacheKey, buf, false);
|
||||
}
|
||||
|
@ -794,6 +795,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
return totalSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(BlockBucket that) {
|
||||
return Long.compare(this.overflow(), that.overflow());
|
||||
}
|
||||
|
@ -970,6 +972,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
* <p>Includes: total accesses, hits, misses, evicted blocks, and runs
|
||||
* of the eviction processes.
|
||||
*/
|
||||
@Override
|
||||
public CacheStats getStats() {
|
||||
return this.stats;
|
||||
}
|
||||
|
@ -1096,6 +1099,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
|
|||
return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (victimHandler != null) {
|
||||
victimHandler.shutdown();
|
||||
|
|
|
@ -90,6 +90,7 @@ public class LruCachedBlock implements HeapSize, Comparable<LruCachedBlock> {
|
|||
return this.cachedTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return size;
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ public class LruCachedBlockQueue implements HeapSize {
|
|||
* Total size of all elements in this queue.
|
||||
* @return size of all elements currently in queue, in bytes
|
||||
*/
|
||||
@Override
|
||||
public long heapSize() {
|
||||
return heapSize;
|
||||
}
|
||||
|
|
|
@ -414,6 +414,7 @@ public final class BucketAllocator {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder(1024);
|
||||
for (int i = 0; i < buckets.length; ++i) {
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Objects;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
|
@ -904,6 +905,7 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
this.writerEnabled = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
List<RAMQueueEntry> entries = new ArrayList<>();
|
||||
try {
|
||||
|
@ -1395,10 +1397,22 @@ public class BucketCache implements BlockCache, HeapSize {
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
return this == that;
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
BucketEntryGroup that = (BucketEntryGroup) o;
|
||||
return totalSize == that.totalSize && bucketSize == that.bucketSize
|
||||
&& Objects.equals(queue, that.queue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(queue, totalSize, bucketSize);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -59,6 +59,7 @@ public class CachedEntryQueue {
|
|||
}
|
||||
queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
|
||||
|
||||
@Override
|
||||
public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
|
||||
Entry<BlockCacheKey, BucketEntry> entry2) {
|
||||
return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
|
||||
|
|
|
@ -101,7 +101,7 @@ class BufferChain {
|
|||
try {
|
||||
long ret = channel.write(buffers, bufferOffset, bufCount);
|
||||
if (ret > 0) {
|
||||
remaining -= ret;
|
||||
remaining = (int) (remaining - ret);
|
||||
}
|
||||
return ret;
|
||||
} finally {
|
||||
|
|
|
@ -91,6 +91,7 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
|
|||
this.fastPathHandlerStack = fastPathHandlerStack;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected CallRunner getCallRunner() throws InterruptedException {
|
||||
// Get a callrunner if one in the Q.
|
||||
CallRunner cr = this.q.poll();
|
||||
|
|
|
@ -193,14 +193,15 @@ abstract class ServerRpcConnection implements Closeable {
|
|||
String className = header.getCellBlockCodecClass();
|
||||
if (className == null || className.length() == 0) return;
|
||||
try {
|
||||
this.codec = (Codec)Class.forName(className).newInstance();
|
||||
this.codec = (Codec)Class.forName(className).getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new UnsupportedCellCodecException(className, e);
|
||||
}
|
||||
if (!header.hasCellBlockCompressorClass()) return;
|
||||
className = header.getCellBlockCompressorClass();
|
||||
try {
|
||||
this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
|
||||
this.compressionCodec =
|
||||
(CompressionCodec)Class.forName(className).getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new UnsupportedCompressionCodecException(className, e);
|
||||
}
|
||||
|
|
|
@ -548,6 +548,7 @@ public class SimpleRpcServer extends RpcServer {
|
|||
* The number of open RPC conections
|
||||
* @return the number of open rpc connections
|
||||
*/
|
||||
@Override
|
||||
public int getNumOpenConnections() {
|
||||
return connectionManager.size();
|
||||
}
|
||||
|
|
|
@ -118,10 +118,8 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
|||
this.master = master;
|
||||
this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
|
||||
try {
|
||||
this.publisher = publisherClass.newInstance();
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException("Can't create publisher " + publisherClass.getName(), e);
|
||||
} catch (IllegalAccessException e) {
|
||||
this.publisher = publisherClass.getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Can't create publisher " + publisherClass.getName(), e);
|
||||
}
|
||||
this.publisher.connect(conf);
|
||||
|
@ -166,7 +164,8 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
|||
.build());
|
||||
}
|
||||
|
||||
protected void cleanup() {
|
||||
@Override
|
||||
protected synchronized void cleanup() {
|
||||
connected = false;
|
||||
publisher.close();
|
||||
}
|
||||
|
|
|
@ -155,6 +155,7 @@ public class DeadServer {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (ServerName sn : deadServers.keySet()) {
|
||||
|
|
|
@ -69,11 +69,12 @@ public class HMasterCommandLine extends ServerCommandLine {
|
|||
this.masterClass = masterClass;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getUsage() {
|
||||
return USAGE;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int run(String args[]) throws Exception {
|
||||
Options opt = new Options();
|
||||
opt.addOption("localRegionServers", true,
|
||||
|
|
|
@ -156,6 +156,7 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse
|
|||
* Notification that config has changed
|
||||
* @param conf
|
||||
*/
|
||||
@Override
|
||||
void onConfigurationChange(Configuration conf);
|
||||
|
||||
/**
|
||||
|
|
|
@ -52,6 +52,7 @@ public class MasterAnnotationReadingPriorityFunction extends AnnotationReadingPr
|
|||
super(rpcServices, clz);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority(RPCProtos.RequestHeader header, Message param, User user) {
|
||||
// Yes this is copy pasted from the base class but it keeps from having to look in the
|
||||
// annotatedQos table twice something that could get costly since this is called for
|
||||
|
|
|
@ -133,6 +133,7 @@ public class MasterCoprocessorHost
|
|||
* @return An instance of MasterServices, an object NOT for general user-space Coprocessor
|
||||
* consumption.
|
||||
*/
|
||||
@Override
|
||||
public MasterServices getMasterServices() {
|
||||
return this.masterServices;
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ public class MobCompactionChore extends ScheduledChore {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup() {
|
||||
protected synchronized void cleanup() {
|
||||
super.cleanup();
|
||||
pool.shutdown();
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
|
|||
@InterfaceAudience.Private
|
||||
public class RegionServerTracker extends ZKListener {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class);
|
||||
private NavigableMap<ServerName, RegionServerInfo> regionServers = new TreeMap<>();
|
||||
private final NavigableMap<ServerName, RegionServerInfo> regionServers = new TreeMap<>();
|
||||
private ServerManager serverManager;
|
||||
private MasterServices server;
|
||||
|
||||
|
|
|
@ -637,7 +637,7 @@ public class SplitLogManager {
|
|||
public enum TerminationStatus {
|
||||
IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
|
||||
|
||||
String statusMsg;
|
||||
final String statusMsg;
|
||||
|
||||
TerminationStatus(String msg) {
|
||||
statusMsg = msg;
|
||||
|
|
|
@ -612,7 +612,7 @@ public class MergeTableRegionsProcedure
|
|||
final TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
|
||||
|
||||
for (String family: regionFs.getFamilies()) {
|
||||
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(family.getBytes());
|
||||
final ColumnFamilyDescriptor hcd = htd.getColumnFamily(Bytes.toBytes(family));
|
||||
final Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(family);
|
||||
|
||||
if (storeFiles != null && storeFiles.size() > 0) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.SortedSet;
|
|||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -864,7 +865,7 @@ public class RegionStates {
|
|||
private final RegionStateNode regionNode;
|
||||
|
||||
private volatile Exception exception = null;
|
||||
private volatile int retries = 0;
|
||||
private AtomicInteger retries = new AtomicInteger();
|
||||
|
||||
public RegionFailedOpen(final RegionStateNode regionNode) {
|
||||
this.regionNode = regionNode;
|
||||
|
@ -879,11 +880,11 @@ public class RegionStates {
|
|||
}
|
||||
|
||||
public int incrementAndGetRetries() {
|
||||
return ++this.retries;
|
||||
return this.retries.incrementAndGet();
|
||||
}
|
||||
|
||||
public int getRetries() {
|
||||
return retries;
|
||||
return retries.get();
|
||||
}
|
||||
|
||||
public void setException(final Exception exception) {
|
||||
|
|
|
@ -86,7 +86,7 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setMasterServices(MasterServices masterServices) {
|
||||
public synchronized void setMasterServices(MasterServices masterServices) {
|
||||
super.setMasterServices(masterServices);
|
||||
fnm = masterServices.getFavoredNodesManager();
|
||||
}
|
||||
|
@ -692,7 +692,8 @@ public class FavoredStochasticBalancer extends StochasticLoadBalancer implements
|
|||
* implementation. For the misplaced regions, we assign a bogus server to it and AM takes care.
|
||||
*/
|
||||
@Override
|
||||
public List<RegionPlan> balanceCluster(Map<ServerName, List<RegionInfo>> clusterState) {
|
||||
public synchronized List<RegionPlan> balanceCluster(Map<ServerName,
|
||||
List<RegionInfo>> clusterState) {
|
||||
|
||||
if (this.services != null) {
|
||||
|
||||
|
|
|
@ -72,21 +72,22 @@ class RegionLocationFinder {
|
|||
private CacheLoader<RegionInfo, HDFSBlocksDistribution> loader =
|
||||
new CacheLoader<RegionInfo, HDFSBlocksDistribution>() {
|
||||
|
||||
public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri,
|
||||
HDFSBlocksDistribution oldValue) throws Exception {
|
||||
return executor.submit(new Callable<HDFSBlocksDistribution>() {
|
||||
@Override
|
||||
public HDFSBlocksDistribution call() throws Exception {
|
||||
return internalGetTopBlockLocation(hri);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<HDFSBlocksDistribution> reload(final RegionInfo hri,
|
||||
HDFSBlocksDistribution oldValue) throws Exception {
|
||||
return executor.submit(new Callable<HDFSBlocksDistribution>() {
|
||||
@Override
|
||||
public HDFSBlocksDistribution load(RegionInfo key) throws Exception {
|
||||
return internalGetTopBlockLocation(key);
|
||||
public HDFSBlocksDistribution call() throws Exception {
|
||||
return internalGetTopBlockLocation(hri);
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public HDFSBlocksDistribution load(RegionInfo key) throws Exception {
|
||||
return internalGetTopBlockLocation(key);
|
||||
}
|
||||
};
|
||||
|
||||
// The cache for where regions are located.
|
||||
private LoadingCache<RegionInfo, HDFSBlocksDistribution> cache = null;
|
||||
|
|
|
@ -106,6 +106,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClusterLoad(Map<TableName, Map<ServerName, List<RegionInfo>>> clusterLoad){
|
||||
serverLoadList = new ArrayList<>();
|
||||
float sum = 0;
|
||||
|
|
|
@ -349,8 +349,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
// Allow turning this feature off if the locality cost is not going to
|
||||
// be used in any computations.
|
||||
RegionLocationFinder finder = null;
|
||||
if (this.localityCost != null && this.localityCost.getMultiplier() > 0
|
||||
|| this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0) {
|
||||
if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
|
||||
|| (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
|
||||
finder = this.regionFinder;
|
||||
}
|
||||
|
||||
|
@ -1401,7 +1401,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
|
||||
// Now if we found a region load get the type of cost that was requested.
|
||||
if (regionLoadList != null) {
|
||||
cost += getRegionLoadCost(regionLoadList);
|
||||
cost = (long) (cost + getRegionLoadCost(regionLoadList));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -205,7 +205,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
Class<? extends FileCleanerDelegate> c = Class.forName(className).asSubclass(
|
||||
FileCleanerDelegate.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
T cleaner = (T) c.newInstance();
|
||||
T cleaner = (T) c.getDeclaredConstructor().newInstance();
|
||||
cleaner.setConf(conf);
|
||||
cleaner.init(this.params);
|
||||
return cleaner;
|
||||
|
@ -360,7 +360,7 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
public synchronized void cleanup() {
|
||||
for (T lc : this.cleanersChain) {
|
||||
try {
|
||||
lc.stop("Exiting");
|
||||
|
|
|
@ -182,7 +182,7 @@ public class HFileCleaner extends CleanerChore<BaseHFileCleanerDelegate> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
public synchronized void cleanup() {
|
||||
super.cleanup();
|
||||
stopHFileDeleteThreads();
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ public class LogCleaner extends CleanerChore<BaseLogCleanerDelegate> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
public synchronized void cleanup() {
|
||||
super.cleanup();
|
||||
interruptOldWALsCleaner();
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
* @return false, so procedure framework doesn't mark this procedure as failure.
|
||||
*/
|
||||
@Override
|
||||
protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
|
||||
protected synchronized boolean setTimeoutFailure(final MasterProcedureEnv env) {
|
||||
synchronized (event) {
|
||||
if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event);
|
||||
if (!event.isReady()) { // Maybe unlock() awakened the event.
|
||||
|
|
|
@ -71,7 +71,9 @@ public abstract class ProcedurePrepareLatch {
|
|||
}
|
||||
|
||||
private static class NoopLatch extends ProcedurePrepareLatch {
|
||||
@Override
|
||||
protected void countDown(final Procedure proc) {}
|
||||
@Override
|
||||
public void await() throws IOException {}
|
||||
}
|
||||
|
||||
|
@ -80,6 +82,7 @@ public abstract class ProcedurePrepareLatch {
|
|||
|
||||
private IOException exception = null;
|
||||
|
||||
@Override
|
||||
protected void countDown(final Procedure proc) {
|
||||
if (proc.hasException()) {
|
||||
exception = proc.getException().unwrapRemoteIOException();
|
||||
|
@ -87,6 +90,7 @@ public abstract class ProcedurePrepareLatch {
|
|||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void await() throws IOException {
|
||||
try {
|
||||
latch.await();
|
||||
|
|
|
@ -117,6 +117,7 @@ public class RSProcedureDispatcher
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void abortPendingOperations(final ServerName serverName,
|
||||
final Set<RemoteProcedure> operations) {
|
||||
// TODO: Replace with a ServerNotOnlineException()
|
||||
|
@ -126,10 +127,12 @@ public class RSProcedureDispatcher
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serverAdded(final ServerName serverName) {
|
||||
addNode(serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serverRemoved(final ServerName serverName) {
|
||||
removeNode(serverName);
|
||||
}
|
||||
|
@ -138,6 +141,7 @@ public class RSProcedureDispatcher
|
|||
* Base remote call
|
||||
*/
|
||||
protected abstract class AbstractRSRemoteCall implements Callable<Void> {
|
||||
@Override
|
||||
public abstract Void call();
|
||||
|
||||
private final ServerName serverName;
|
||||
|
@ -269,6 +273,7 @@ public class RSProcedureDispatcher
|
|||
this.remoteProcedures = remoteProcedures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call() {
|
||||
request = ExecuteProceduresRequest.newBuilder();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
@ -290,11 +295,13 @@ public class RSProcedureDispatcher
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchOpenRequests(final MasterProcedureEnv env,
|
||||
final List<RegionOpenOperation> operations) {
|
||||
request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchCloseRequests(final MasterProcedureEnv env,
|
||||
final List<RegionCloseOperation> operations) {
|
||||
for (RegionCloseOperation op: operations) {
|
||||
|
@ -471,11 +478,13 @@ public class RSProcedureDispatcher
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchOpenRequests(final MasterProcedureEnv env,
|
||||
final List<RegionOpenOperation> operations) {
|
||||
submitTask(new OpenRegionRemoteCall(serverName, operations));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispatchCloseRequests(final MasterProcedureEnv env,
|
||||
final List<RegionCloseOperation> operations) {
|
||||
for (RegionCloseOperation op: operations) {
|
||||
|
|
|
@ -86,6 +86,7 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(final Configuration conf) {
|
||||
super.setConf(conf);
|
||||
try {
|
||||
|
@ -95,6 +96,7 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate {
|
|||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
cache = new SnapshotFileCache(fs, rootDir, cacheRefreshPeriod, cacheRefreshPeriod,
|
||||
"snapshot-hfile-cleaner-cache-refresher", new SnapshotFileCache.SnapshotFileInspector() {
|
||||
@Override
|
||||
public Collection<String> filesUnderSnapshot(final Path snapshotDir)
|
||||
throws IOException {
|
||||
return SnapshotReferenceUtil.getHFileNames(conf, fs, snapshotDir);
|
||||
|
|
|
@ -54,6 +54,7 @@ public class CachedMobFile extends MobFile implements Comparable<CachedMobFile>
|
|||
this.accessCount = accessCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(CachedMobFile that) {
|
||||
if (this.accessCount == that.accessCount) return 0;
|
||||
return this.accessCount < that.accessCount ? 1 : -1;
|
||||
|
|
|
@ -86,6 +86,7 @@ public class ExpiredMobFileCleaner extends Configured implements Tool {
|
|||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="REC_CATCH_EXCEPTION",
|
||||
justification="Intentional")
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
if (args.length != 2) {
|
||||
printUsage();
|
||||
|
|
|
@ -294,6 +294,7 @@ public class PartitionedMobCompactionRequest extends MobCompactionRequest {
|
|||
this.endKey = endKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(CompactionDelPartitionId o) {
|
||||
/*
|
||||
* 1). Compare the start key, if the k1 < k2, then k1 is less
|
||||
|
|
|
@ -74,6 +74,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* monitored Handler.
|
||||
* @return the queue timestamp or -1 if there is no RPC currently running.
|
||||
*/
|
||||
@Override
|
||||
public long getRPCQueueTime() {
|
||||
if (getState() != State.RUNNING) {
|
||||
return -1;
|
||||
|
@ -86,6 +87,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* monitored Handler.
|
||||
* @return the start timestamp or -1 if there is no RPC currently running.
|
||||
*/
|
||||
@Override
|
||||
public long getRPCStartTime() {
|
||||
if (getState() != State.RUNNING) {
|
||||
return -1;
|
||||
|
@ -98,6 +100,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* by this Handler.
|
||||
* @return a string representing the method call without parameters
|
||||
*/
|
||||
@Override
|
||||
public synchronized String getRPC() {
|
||||
return getRPC(false);
|
||||
}
|
||||
|
@ -108,6 +111,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* @param withParams toggle inclusion of parameters in the RPC String
|
||||
* @return A human-readable string representation of the method call.
|
||||
*/
|
||||
@Override
|
||||
public synchronized String getRPC(boolean withParams) {
|
||||
if (getState() != State.RUNNING) {
|
||||
// no RPC is currently running
|
||||
|
@ -132,6 +136,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* by this Handler.
|
||||
* @return A human-readable string representation of the method call.
|
||||
*/
|
||||
@Override
|
||||
public long getRPCPacketLength() {
|
||||
if (getState() != State.RUNNING || packet == null) {
|
||||
// no RPC is currently running, or we don't have an RPC's packet info
|
||||
|
@ -146,6 +151,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* @return A human-readable string representation of the address and port
|
||||
* of the client.
|
||||
*/
|
||||
@Override
|
||||
public String getClient() {
|
||||
return clientAddress + ":" + remotePort;
|
||||
}
|
||||
|
@ -155,6 +161,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* RPC call.
|
||||
* @return true if the monitored handler is currently servicing an RPC call.
|
||||
*/
|
||||
@Override
|
||||
public boolean isRPCRunning() {
|
||||
return getState() == State.RUNNING;
|
||||
}
|
||||
|
@ -166,6 +173,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* @return true if the monitored handler is currently servicing an RPC call
|
||||
* to a database command.
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean isOperationRunning() {
|
||||
if(!isRPCRunning()) {
|
||||
return false;
|
||||
|
@ -183,6 +191,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* @param methodName The name of the method that will be called by the RPC.
|
||||
* @param params The parameters that will be passed to the indicated method.
|
||||
*/
|
||||
@Override
|
||||
public synchronized void setRPC(String methodName, Object [] params,
|
||||
long queueTime) {
|
||||
this.methodName = methodName;
|
||||
|
@ -197,6 +206,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* that it can later compute its size if asked for it.
|
||||
* @param param The protobuf received by the RPC for this call
|
||||
*/
|
||||
@Override
|
||||
public void setRPCPacket(Message param) {
|
||||
this.packet = param;
|
||||
}
|
||||
|
@ -206,6 +216,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
* @param clientAddress the address of the current client
|
||||
* @param remotePort the port from which the client connected
|
||||
*/
|
||||
@Override
|
||||
public void setConnection(String clientAddress, int remotePort) {
|
||||
this.clientAddress = clientAddress;
|
||||
this.remotePort = remotePort;
|
||||
|
@ -218,6 +229,7 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
|
|||
this.packet = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Map<String, Object> toMap() {
|
||||
// only include RPC info if the Handler is actively servicing an RPC call
|
||||
Map<String, Object> map = super.toMap();
|
||||
|
|
|
@ -154,6 +154,7 @@ class MonitoredTaskImpl implements MonitoredTask {
|
|||
* Force the completion timestamp backwards so that
|
||||
* it expires now.
|
||||
*/
|
||||
@Override
|
||||
public void expireNow() {
|
||||
stateTime -= 180 * 1000;
|
||||
}
|
||||
|
|
|
@ -319,7 +319,7 @@ public class TaskMonitor {
|
|||
OPERATION("operation"),
|
||||
ALL("all");
|
||||
|
||||
private String type;
|
||||
private final String type;
|
||||
|
||||
private TaskType(String type) {
|
||||
this.type = type.toLowerCase();
|
||||
|
|
|
@ -103,7 +103,7 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
|
|||
//
|
||||
|
||||
/** lock to prevent nodes from acquiring and then releasing before we can track them */
|
||||
private Object joinBarrierLock = new Object();
|
||||
private final Object joinBarrierLock = new Object();
|
||||
private final List<String> acquiringMembers;
|
||||
private final List<String> inBarrierMembers;
|
||||
private final HashMap<String, byte[]> dataFromFinishedMembers;
|
||||
|
|
|
@ -88,11 +88,9 @@ public abstract class ProcedureManagerHost<E extends ProcedureManager> {
|
|||
E impl;
|
||||
Object o = null;
|
||||
try {
|
||||
o = implClass.newInstance();
|
||||
o = implClass.getDeclaredConstructor().newInstance();
|
||||
impl = (E)o;
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(e);
|
||||
} catch (IllegalAccessException e) {
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
|
|
|
@ -153,6 +153,7 @@ abstract public class Subprocedure implements Callable<Void> {
|
|||
* Subprocedure, ForeignException)}.
|
||||
*/
|
||||
@SuppressWarnings("finally")
|
||||
@Override
|
||||
final public Void call() {
|
||||
LOG.debug("Starting subprocedure '" + barrierName + "' with timeout " +
|
||||
executionTimeoutTimer.getMaxTime() + "ms");
|
||||
|
|
|
@ -19,19 +19,21 @@ package org.apache.hadoop.hbase.procedure;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/**
|
||||
* ZooKeeper based {@link ProcedureCoordinatorRpcs} for a {@link ProcedureCoordinator}
|
||||
*/
|
||||
|
@ -218,8 +220,8 @@ public class ZKProcedureCoordinator implements ProcedureCoordinatorRpcs {
|
|||
} else {
|
||||
dataFromMember = Arrays.copyOfRange(dataFromMember, ProtobufUtil.lengthOfPBMagic(),
|
||||
dataFromMember.length);
|
||||
LOG.debug("Finished data from procedure '" + procName
|
||||
+ "' member '" + member + "': " + new String(dataFromMember));
|
||||
LOG.debug("Finished data from procedure '{}' member '{}': {}", procName, member,
|
||||
new String(dataFromMember, StandardCharsets.UTF_8));
|
||||
coordinator.memberFinishedBarrier(procName, member, dataFromMember);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -348,6 +348,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(final String memberName, final ProcedureMember listener) {
|
||||
LOG.debug("Starting procedure member '" + memberName + "'");
|
||||
this.member = listener;
|
||||
|
|
|
@ -157,6 +157,7 @@ public abstract class ZKProcedureUtil
|
|||
return ZNodePaths.joinZNode(controller.abortZnode, opInstanceName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZKWatcher getWatcher() {
|
||||
return watcher;
|
||||
}
|
||||
|
|
|
@ -455,7 +455,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
}
|
||||
|
||||
private static class NamedLock<T> {
|
||||
private HashSet<T> locks = new HashSet<>();
|
||||
private final HashSet<T> locks = new HashSet<>();
|
||||
|
||||
public void lock(final T name) throws InterruptedException {
|
||||
synchronized (locks) {
|
||||
|
@ -501,6 +501,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
return time;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o instanceof SizeSnapshotWithTimestamp) {
|
||||
SizeSnapshotWithTimestamp other = (SizeSnapshotWithTimestamp) o;
|
||||
|
@ -509,6 +510,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
HashCodeBuilder hcb = new HashCodeBuilder();
|
||||
return hcb.append(size).append(time).toHashCode();
|
||||
|
|
|
@ -386,7 +386,8 @@ public class QuotaObserverChore extends ScheduledChore {
|
|||
for (TableName tableInNS : tablesByNamespace.get(namespace)) {
|
||||
final SpaceQuotaSnapshot tableQuotaSnapshot =
|
||||
tableSnapshotStore.getCurrentState(tableInNS);
|
||||
final boolean hasTableQuota = QuotaSnapshotStore.NO_QUOTA != tableQuotaSnapshot;
|
||||
final boolean hasTableQuota =
|
||||
!Objects.equals(QuotaSnapshotStore.NO_QUOTA, tableQuotaSnapshot);
|
||||
if (hasTableQuota && tableQuotaSnapshot.getQuotaStatus().isInViolation()) {
|
||||
// Table-level quota violation policy is being applied here.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
|
|
|
@ -100,6 +100,7 @@ public abstract class RateLimiter {
|
|||
this.avail = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
String rateLimiter = this.getClass().getSimpleName();
|
||||
if (getLimit() == Long.MAX_VALUE) {
|
||||
|
|
|
@ -53,7 +53,7 @@ public class RegionServerSpaceQuotaManager {
|
|||
private SpaceQuotaRefresherChore spaceQuotaRefresher;
|
||||
private AtomicReference<Map<TableName, SpaceQuotaSnapshot>> currentQuotaSnapshots;
|
||||
private boolean started = false;
|
||||
private ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
|
||||
private final ConcurrentHashMap<TableName,SpaceViolationPolicyEnforcement> enforcedPolicies;
|
||||
private SpaceViolationPolicyEnforcementFactory factory;
|
||||
|
||||
public RegionServerSpaceQuotaManager(RegionServerServices rsServices) {
|
||||
|
|
|
@ -54,8 +54,8 @@ public class SpaceQuotaSnapshotNotifierFactory {
|
|||
.getClass(SNAPSHOT_NOTIFIER_KEY, SNAPSHOT_NOTIFIER_DEFAULT,
|
||||
SpaceQuotaSnapshotNotifier.class);
|
||||
try {
|
||||
return clz.newInstance();
|
||||
} catch (InstantiationException | IllegalAccessException e) {
|
||||
return clz.getDeclaredConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("Failed to instantiate the implementation", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ public abstract class AbstractMemStore implements MemStore {
|
|||
// Used to track when to flush
|
||||
private volatile long timeOfOldestEdit;
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize.OBJECT
|
||||
public final static long FIXED_OVERHEAD = (long) ClassSize.OBJECT
|
||||
+ (4 * ClassSize.REFERENCE)
|
||||
+ (2 * Bytes.SIZEOF_LONG); // snapshotId, timeOfOldestEdit
|
||||
|
||||
|
|
|
@ -101,10 +101,13 @@ public class AdaptiveMemStoreCompactionStrategy extends MemStoreCompactionStrate
|
|||
public void resetStats() {
|
||||
compactionProbability = initialCompactionProbability;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action getMergingAction() {
|
||||
return Action.MERGE_COUNT_UNIQUE_KEYS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Action getFlattenAction() {
|
||||
return Action.FLATTEN;
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment {
|
|||
|
||||
@Override
|
||||
protected long indexEntrySize() {
|
||||
return (ClassSize.CELL_CHUNK_MAP_ENTRY - KeyValue.FIXED_OVERHEAD);
|
||||
return ((long) ClassSize.CELL_CHUNK_MAP_ENTRY - KeyValue.FIXED_OVERHEAD);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -73,113 +73,140 @@ public class CellSet implements NavigableSet<Cell> {
|
|||
return delegatee;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell ceiling(Cell e) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Cell> descendingIterator() {
|
||||
return this.delegatee.descendingMap().values().iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NavigableSet<Cell> descendingSet() {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell floor(Cell e) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSet<Cell> headSet(final Cell toElement) {
|
||||
return headSet(toElement, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NavigableSet<Cell> headSet(final Cell toElement,
|
||||
boolean inclusive) {
|
||||
return new CellSet(this.delegatee.headMap(toElement, inclusive), UNKNOWN_NUM_UNIQUES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell higher(Cell e) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Cell> iterator() {
|
||||
return this.delegatee.values().iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell lower(Cell e) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell pollFirst() {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell pollLast() {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSet<Cell> subSet(Cell fromElement, Cell toElement) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NavigableSet<Cell> subSet(Cell fromElement,
|
||||
boolean fromInclusive, Cell toElement, boolean toInclusive) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SortedSet<Cell> tailSet(Cell fromElement) {
|
||||
return tailSet(fromElement, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NavigableSet<Cell> tailSet(Cell fromElement, boolean inclusive) {
|
||||
return new CellSet(this.delegatee.tailMap(fromElement, inclusive), UNKNOWN_NUM_UNIQUES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<? super Cell> comparator() {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell first() {
|
||||
return this.delegatee.firstEntry().getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell last() {
|
||||
return this.delegatee.lastEntry().getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(Cell e) {
|
||||
return this.delegatee.put(e, e) == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean addAll(Collection<? extends Cell> c) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
this.delegatee.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object o) {
|
||||
//noinspection SuspiciousMethodCalls
|
||||
return this.delegatee.containsKey(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsAll(Collection<?> c) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return this.delegatee.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
return this.delegatee.remove(o) != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeAll(Collection<?> c) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean retainAll(Collection<?> c) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
@ -188,14 +215,17 @@ public class CellSet implements NavigableSet<Cell> {
|
|||
return this.delegatee.get(kv);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return this.delegatee.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object[] toArray() {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T[] toArray(T[] a) {
|
||||
throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED);
|
||||
}
|
||||
|
|
|
@ -143,7 +143,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
|
||||
IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
|
||||
}
|
||||
inmemoryFlushSize *= factor;
|
||||
inmemoryFlushSize = (long) (inmemoryFlushSize * factor);
|
||||
LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize
|
||||
+ " and immutable segments index to be of type " + indexType);
|
||||
}
|
||||
|
@ -365,7 +365,7 @@ public class CompactingMemStore extends AbstractMemStore {
|
|||
MutableSegment activeTmp = active;
|
||||
List<? extends Segment> pipelineList = pipeline.getSegments();
|
||||
List<? extends Segment> snapshotList = snapshot.getAllSegments();
|
||||
long order = 1 + pipelineList.size() + snapshotList.size();
|
||||
long order = 1L + pipelineList.size() + snapshotList.size();
|
||||
// The list of elements in pipeline + the active element + the snapshot segment
|
||||
// The order is the Segment ordinal
|
||||
List<KeyValueScanner> list = createList((int) order);
|
||||
|
|
|
@ -279,6 +279,7 @@ public class CompositeImmutableSegment extends ImmutableSegment {
|
|||
/**
|
||||
* Dumps all cells of the segment into the given log
|
||||
*/
|
||||
@Override
|
||||
void dump(Logger log) {
|
||||
for (ImmutableSegment s : segments) {
|
||||
s.dump(log);
|
||||
|
|
|
@ -89,6 +89,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
|
|||
super.forceSelect(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> compact(ThroughputController throughputController, User user)
|
||||
throws IOException {
|
||||
if (request instanceof DateTieredCompactionRequest) {
|
||||
|
|
|
@ -61,11 +61,9 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
|||
synchronized (flushLock) {
|
||||
status.setStatus("Flushing " + store + ": creating writer");
|
||||
// Write the map out to the disk
|
||||
writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(),
|
||||
/* isCompaction = */ false,
|
||||
/* includeMVCCReadpoint = */ true,
|
||||
/* includesTags = */ snapshot.isTagsPresent(),
|
||||
/* shouldDropBehind = */ false);
|
||||
writer = store.createWriterInTmp(cellsCount,
|
||||
store.getColumnFamilyDescriptor().getCompressionType(), false, true,
|
||||
snapshot.isTagsPresent(), false);
|
||||
IOException e = null;
|
||||
try {
|
||||
performFlush(scanner, writer, smallestReadPoint, throughputController);
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Map;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -82,15 +83,15 @@ public class HMobStore extends HStore {
|
|||
private MobCacheConfig mobCacheConfig;
|
||||
private Path homePath;
|
||||
private Path mobFamilyPath;
|
||||
private volatile long cellsCountCompactedToMob = 0;
|
||||
private volatile long cellsCountCompactedFromMob = 0;
|
||||
private volatile long cellsSizeCompactedToMob = 0;
|
||||
private volatile long cellsSizeCompactedFromMob = 0;
|
||||
private volatile long mobFlushCount = 0;
|
||||
private volatile long mobFlushedCellsCount = 0;
|
||||
private volatile long mobFlushedCellsSize = 0;
|
||||
private volatile long mobScanCellsCount = 0;
|
||||
private volatile long mobScanCellsSize = 0;
|
||||
private AtomicLong cellsCountCompactedToMob = new AtomicLong();
|
||||
private AtomicLong cellsCountCompactedFromMob = new AtomicLong();
|
||||
private AtomicLong cellsSizeCompactedToMob = new AtomicLong();
|
||||
private AtomicLong cellsSizeCompactedFromMob = new AtomicLong();
|
||||
private AtomicLong mobFlushCount = new AtomicLong();
|
||||
private AtomicLong mobFlushedCellsCount = new AtomicLong();
|
||||
private AtomicLong mobFlushedCellsSize = new AtomicLong();
|
||||
private AtomicLong mobScanCellsCount = new AtomicLong();
|
||||
private AtomicLong mobScanCellsSize = new AtomicLong();
|
||||
private ColumnFamilyDescriptor family;
|
||||
private Map<String, List<Path>> map = new ConcurrentHashMap<>();
|
||||
private final IdLock keyLock = new IdLock();
|
||||
|
@ -453,76 +454,75 @@ public class HMobStore extends HStore {
|
|||
}
|
||||
|
||||
public void updateCellsCountCompactedToMob(long count) {
|
||||
cellsCountCompactedToMob += count;
|
||||
cellsCountCompactedToMob.addAndGet(count);
|
||||
}
|
||||
|
||||
public long getCellsCountCompactedToMob() {
|
||||
return cellsCountCompactedToMob;
|
||||
return cellsCountCompactedToMob.get();
|
||||
}
|
||||
|
||||
public void updateCellsCountCompactedFromMob(long count) {
|
||||
cellsCountCompactedFromMob += count;
|
||||
cellsCountCompactedFromMob.addAndGet(count);
|
||||
}
|
||||
|
||||
public long getCellsCountCompactedFromMob() {
|
||||
return cellsCountCompactedFromMob;
|
||||
return cellsCountCompactedFromMob.get();
|
||||
}
|
||||
|
||||
public void updateCellsSizeCompactedToMob(long size) {
|
||||
cellsSizeCompactedToMob += size;
|
||||
cellsSizeCompactedToMob.addAndGet(size);
|
||||
}
|
||||
|
||||
public long getCellsSizeCompactedToMob() {
|
||||
return cellsSizeCompactedToMob;
|
||||
return cellsSizeCompactedToMob.get();
|
||||
}
|
||||
|
||||
public void updateCellsSizeCompactedFromMob(long size) {
|
||||
cellsSizeCompactedFromMob += size;
|
||||
cellsSizeCompactedFromMob.addAndGet(size);
|
||||
}
|
||||
|
||||
public long getCellsSizeCompactedFromMob() {
|
||||
return cellsSizeCompactedFromMob;
|
||||
return cellsSizeCompactedFromMob.get();
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT")
|
||||
public void updateMobFlushCount() {
|
||||
mobFlushCount++;
|
||||
mobFlushCount.incrementAndGet();
|
||||
}
|
||||
|
||||
public long getMobFlushCount() {
|
||||
return mobFlushCount;
|
||||
return mobFlushCount.get();
|
||||
}
|
||||
|
||||
public void updateMobFlushedCellsCount(long count) {
|
||||
mobFlushedCellsCount += count;
|
||||
mobFlushedCellsCount.addAndGet(count);
|
||||
}
|
||||
|
||||
public long getMobFlushedCellsCount() {
|
||||
return mobFlushedCellsCount;
|
||||
return mobFlushedCellsCount.get();
|
||||
}
|
||||
|
||||
public void updateMobFlushedCellsSize(long size) {
|
||||
mobFlushedCellsSize += size;
|
||||
mobFlushedCellsSize.addAndGet(size);
|
||||
}
|
||||
|
||||
public long getMobFlushedCellsSize() {
|
||||
return mobFlushedCellsSize;
|
||||
return mobFlushedCellsSize.get();
|
||||
}
|
||||
|
||||
public void updateMobScanCellsCount(long count) {
|
||||
mobScanCellsCount += count;
|
||||
mobScanCellsCount.addAndGet(count);
|
||||
}
|
||||
|
||||
public long getMobScanCellsCount() {
|
||||
return mobScanCellsCount;
|
||||
return mobScanCellsCount.get();
|
||||
}
|
||||
|
||||
public void updateMobScanCellsSize(long size) {
|
||||
mobScanCellsSize += size;
|
||||
mobScanCellsSize.addAndGet(size);
|
||||
}
|
||||
|
||||
public long getMobScanCellsSize() {
|
||||
return mobScanCellsSize;
|
||||
return mobScanCellsSize.get();
|
||||
}
|
||||
|
||||
public byte[] getRefCellTags() {
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.io.IOException;
|
|||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.ParseException;
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
|
@ -1015,7 +1016,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
|
||||
long storeMaxSequenceId = store.getMaxSequenceId().orElse(0L);
|
||||
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
|
||||
maxSeqIdInStores.put(Bytes.toBytes(store.getColumnFamilyName()),
|
||||
storeMaxSequenceId);
|
||||
if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
|
||||
maxSeqId = storeMaxSequenceId;
|
||||
|
@ -5524,7 +5525,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
HStore store = this.stores.get(column);
|
||||
if (store == null) {
|
||||
throw new IllegalArgumentException(
|
||||
"No column family : " + new String(column) + " available");
|
||||
"No column family : " + new String(column, StandardCharsets.UTF_8) + " available");
|
||||
}
|
||||
Collection<HStoreFile> storeFiles = store.getStorefiles();
|
||||
if (storeFiles == null) {
|
||||
|
|
|
@ -904,7 +904,7 @@ public class HRegionServer extends HasThread implements
|
|||
*/
|
||||
private boolean isClusterUp() {
|
||||
return this.masterless ||
|
||||
this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp();
|
||||
(this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1745,7 +1745,7 @@ public class HRegionServer extends HasThread implements
|
|||
if (r.shouldFlush(whyFlush)) {
|
||||
FlushRequester requester = server.getFlushRequester();
|
||||
if (requester != null) {
|
||||
long randomDelay = RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME;
|
||||
long randomDelay = (long) RandomUtils.nextInt(0, RANGE_OF_DELAY) + MIN_DELAY_TIME;
|
||||
LOG.info(getName() + " requesting flush of " +
|
||||
r.getRegionInfo().getRegionNameAsString() + " because " +
|
||||
whyFlush.toString() +
|
||||
|
@ -3111,13 +3111,13 @@ public class HRegionServer extends HasThread implements
|
|||
}
|
||||
}
|
||||
|
||||
final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(encodedName.getBytes(),
|
||||
final Boolean previous = this.regionsInTransitionInRS.putIfAbsent(Bytes.toBytes(encodedName),
|
||||
Boolean.FALSE);
|
||||
|
||||
if (Boolean.TRUE.equals(previous)) {
|
||||
LOG.info("Received CLOSE for the region:" + encodedName + " , which we are already " +
|
||||
"trying to OPEN. Cancelling OPENING.");
|
||||
if (!regionsInTransitionInRS.replace(encodedName.getBytes(), previous, Boolean.FALSE)){
|
||||
if (!regionsInTransitionInRS.replace(Bytes.toBytes(encodedName), previous, Boolean.FALSE)) {
|
||||
// The replace failed. That should be an exceptional case, but theoretically it can happen.
|
||||
// We're going to try to do a standard close then.
|
||||
LOG.warn("The opening for region " + encodedName + " was done before we could cancel it." +
|
||||
|
@ -3140,7 +3140,7 @@ public class HRegionServer extends HasThread implements
|
|||
|
||||
if (actualRegion == null) {
|
||||
LOG.debug("Received CLOSE for a region which is not online, and we're not opening.");
|
||||
this.regionsInTransitionInRS.remove(encodedName.getBytes());
|
||||
this.regionsInTransitionInRS.remove(Bytes.toBytes(encodedName));
|
||||
// The master deletes the znode when it receives this exception.
|
||||
throw new NotServingRegionException("The region " + encodedName +
|
||||
" is not online, and is not opening.");
|
||||
|
|
|
@ -44,6 +44,7 @@ public class HRegionServerCommandLine extends ServerCommandLine {
|
|||
this.regionServerClass = clazz;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getUsage() {
|
||||
return USAGE;
|
||||
}
|
||||
|
@ -73,6 +74,7 @@ public class HRegionServerCommandLine extends ServerCommandLine {
|
|||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String args[]) throws Exception {
|
||||
if (args.length != 1) {
|
||||
usage(null);
|
||||
|
|
|
@ -42,6 +42,7 @@ import java.util.concurrent.ExecutorCompletionService;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -149,8 +150,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
volatile boolean forceMajor = false;
|
||||
/* how many bytes to write between status checks */
|
||||
static int closeCheckInterval = 0;
|
||||
private volatile long storeSize = 0L;
|
||||
private volatile long totalUncompressedBytes = 0L;
|
||||
private AtomicLong storeSize = new AtomicLong();
|
||||
private AtomicLong totalUncompressedBytes = new AtomicLong();
|
||||
|
||||
/**
|
||||
* RWLock for store operations.
|
||||
|
@ -209,13 +210,13 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
private int compactionCheckMultiplier;
|
||||
protected Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
private volatile long flushedCellsCount = 0;
|
||||
private volatile long compactedCellsCount = 0;
|
||||
private volatile long majorCompactedCellsCount = 0;
|
||||
private volatile long flushedCellsSize = 0;
|
||||
private volatile long flushedOutputFileSize = 0;
|
||||
private volatile long compactedCellsSize = 0;
|
||||
private volatile long majorCompactedCellsSize = 0;
|
||||
private AtomicLong flushedCellsCount = new AtomicLong();
|
||||
private AtomicLong compactedCellsCount = new AtomicLong();
|
||||
private AtomicLong majorCompactedCellsCount = new AtomicLong();
|
||||
private AtomicLong flushedCellsSize = new AtomicLong();
|
||||
private AtomicLong flushedOutputFileSize = new AtomicLong();
|
||||
private AtomicLong compactedCellsSize = new AtomicLong();
|
||||
private AtomicLong majorCompactedCellsSize = new AtomicLong();
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -544,8 +545,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
HStoreFile storeFile = completionService.take().get();
|
||||
if (storeFile != null) {
|
||||
long length = storeFile.getReader().length();
|
||||
this.storeSize += length;
|
||||
this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
|
||||
this.storeSize.addAndGet(length);
|
||||
this.totalUncompressedBytes
|
||||
.addAndGet(storeFile.getReader().getTotalUncompressedBytes());
|
||||
LOG.debug("loaded {}", storeFile);
|
||||
results.add(storeFile);
|
||||
}
|
||||
|
@ -844,8 +846,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
|
||||
private void bulkLoadHFile(HStoreFile sf) throws IOException {
|
||||
StoreFileReader r = sf.getReader();
|
||||
this.storeSize += r.length();
|
||||
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
|
||||
this.storeSize.addAndGet(r.length());
|
||||
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
|
||||
|
||||
// Append the new storefile into the list
|
||||
this.lock.writeLock().lock();
|
||||
|
@ -1021,8 +1023,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
HStoreFile sf = createStoreFileAndReader(dstPath);
|
||||
|
||||
StoreFileReader r = sf.getReader();
|
||||
this.storeSize += r.length();
|
||||
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
|
||||
this.storeSize.addAndGet(r.length());
|
||||
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
|
||||
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Added " + sf + ", entries=" + r.getEntries() +
|
||||
|
@ -1373,11 +1375,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
writeCompactionWalRecord(filesToCompact, sfs);
|
||||
replaceStoreFiles(filesToCompact, sfs);
|
||||
if (cr.isMajor()) {
|
||||
majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
||||
majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
|
||||
majorCompactedCellsCount.addAndGet(getCompactionProgress().totalCompactingKVs);
|
||||
majorCompactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
|
||||
} else {
|
||||
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
|
||||
compactedCellsSize += getCompactionProgress().totalCompactedSize;
|
||||
compactedCellsCount.addAndGet(getCompactionProgress().totalCompactingKVs);
|
||||
compactedCellsSize.addAndGet(getCompactionProgress().totalCompactedSize);
|
||||
}
|
||||
long outputBytes = getTotalSize(sfs);
|
||||
|
||||
|
@ -1478,7 +1480,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
}
|
||||
}
|
||||
message.append("total size for store is ")
|
||||
.append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize, "", 1))
|
||||
.append(StringUtils.TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1))
|
||||
.append(". This selection was in queue for ")
|
||||
.append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()))
|
||||
.append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
|
||||
|
@ -1772,7 +1774,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
completeCompaction(delSfs);
|
||||
LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
|
||||
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
|
||||
+ "; total size for store is " + TraditionalBinaryPrefix.long2String(storeSize, "", 1));
|
||||
+ "; total size for store is "
|
||||
+ TraditionalBinaryPrefix.long2String(storeSize.get(), "", 1));
|
||||
}
|
||||
|
||||
public void cancelRequestedCompaction(CompactionContext compaction) {
|
||||
|
@ -1826,16 +1829,16 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
@VisibleForTesting
|
||||
protected void completeCompaction(Collection<HStoreFile> compactedFiles)
|
||||
throws IOException {
|
||||
this.storeSize = 0L;
|
||||
this.totalUncompressedBytes = 0L;
|
||||
this.storeSize.set(0L);
|
||||
this.totalUncompressedBytes.set(0L);
|
||||
for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) {
|
||||
StoreFileReader r = hsf.getReader();
|
||||
if (r == null) {
|
||||
LOG.warn("StoreFile {} has a null Reader", hsf);
|
||||
continue;
|
||||
}
|
||||
this.storeSize += r.length();
|
||||
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
|
||||
this.storeSize.addAndGet(r.length());
|
||||
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1896,7 +1899,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return storeSize;
|
||||
return storeSize.get();
|
||||
}
|
||||
|
||||
public void triggerMajorCompaction() {
|
||||
|
@ -2043,7 +2046,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
|
||||
@Override
|
||||
public long getStoreSizeUncompressed() {
|
||||
return this.totalUncompressedBytes;
|
||||
return this.totalUncompressedBytes.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2235,9 +2238,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
committedFiles.add(sf.getPath());
|
||||
}
|
||||
|
||||
HStore.this.flushedCellsCount += cacheFlushCount;
|
||||
HStore.this.flushedCellsSize += cacheFlushSize;
|
||||
HStore.this.flushedOutputFileSize += outputFileSize;
|
||||
HStore.this.flushedCellsCount.addAndGet(cacheFlushCount);
|
||||
HStore.this.flushedCellsSize.addAndGet(cacheFlushSize);
|
||||
HStore.this.flushedOutputFileSize.addAndGet(outputFileSize);
|
||||
|
||||
// Add new file to store files. Clear snapshot too while we have the Store write lock.
|
||||
return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
|
||||
|
@ -2270,8 +2273,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file);
|
||||
HStoreFile storeFile = createStoreFileAndReader(storeFileInfo);
|
||||
storeFiles.add(storeFile);
|
||||
HStore.this.storeSize += storeFile.getReader().length();
|
||||
HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes();
|
||||
HStore.this.storeSize.addAndGet(storeFile.getReader().length());
|
||||
HStore.this.totalUncompressedBytes
|
||||
.addAndGet(storeFile.getReader().getTotalUncompressedBytes());
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("Region: " + HStore.this.getRegionInfo().getEncodedName() +
|
||||
" added " + storeFile + ", entries=" + storeFile.getReader().getEntries() +
|
||||
|
@ -2315,7 +2319,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
}
|
||||
|
||||
public static final long FIXED_OVERHEAD =
|
||||
ClassSize.align(ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (11 * Bytes.SIZEOF_LONG)
|
||||
ClassSize.align(ClassSize.OBJECT + (26 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)
|
||||
+ (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN));
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
|
||||
|
@ -2354,37 +2358,37 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||
|
||||
@Override
|
||||
public long getFlushedCellsCount() {
|
||||
return flushedCellsCount;
|
||||
return flushedCellsCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFlushedCellsSize() {
|
||||
return flushedCellsSize;
|
||||
return flushedCellsSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFlushedOutputFileSize() {
|
||||
return flushedOutputFileSize;
|
||||
return flushedOutputFileSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompactedCellsCount() {
|
||||
return compactedCellsCount;
|
||||
return compactedCellsCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCompactedCellsSize() {
|
||||
return compactedCellsSize;
|
||||
return compactedCellsSize.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMajorCompactedCellsCount() {
|
||||
return majorCompactedCellsCount;
|
||||
return majorCompactedCellsCount.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMajorCompactedCellsSize() {
|
||||
return majorCompactedCellsSize;
|
||||
return majorCompactedCellsSize.get();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -95,7 +95,7 @@ public class IncreasingToUpperBoundRegionSplitPolicy extends ConstantSizeRegionS
|
|||
}
|
||||
}
|
||||
|
||||
return foundABigStore | force;
|
||||
return foundABigStore || force;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -67,5 +67,6 @@ public interface InternalScanner extends Closeable {
|
|||
* Closes the scanner and releases any resources it has allocated
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
void close() throws IOException;
|
||||
}
|
||||
|
|
|
@ -104,6 +104,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell peek() {
|
||||
if (this.current == null) {
|
||||
return null;
|
||||
|
@ -111,6 +112,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
|||
return this.current.peek();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell next() throws IOException {
|
||||
if(this.current == null) {
|
||||
return null;
|
||||
|
@ -182,6 +184,8 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
|||
public KVScannerComparator(CellComparator kvComparator) {
|
||||
this.kvComparator = kvComparator;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(KeyValueScanner left, KeyValueScanner right) {
|
||||
int comparison = compare(left.peek(), right.peek());
|
||||
if (comparison != 0) {
|
||||
|
@ -210,6 +214,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
|
||||
scanner.close();
|
||||
|
|
|
@ -84,6 +84,7 @@ public interface KeyValueScanner extends Shipper, Closeable {
|
|||
/**
|
||||
* Close the KeyValue scanner.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
/**
|
||||
|
|
|
@ -167,8 +167,8 @@ public class MemStoreCompactor {
|
|||
|
||||
// Substitute the pipeline with one segment
|
||||
if (!isInterrupted.get()) {
|
||||
if (resultSwapped = compactingMemStore.swapCompactedSegments(
|
||||
versionedList, result, merge)) {
|
||||
resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
|
||||
if (resultSwapped) {
|
||||
// update compaction strategy
|
||||
strategy.updateStats(result);
|
||||
// update the wal so it can be truncated and not get too long
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
@ -713,9 +714,14 @@ class MemStoreFlusher implements FlushRequester {
|
|||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return System.identityHashCode(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return (this == obj);
|
||||
return Objects.equals(this, obj);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -76,6 +76,7 @@ public class MemStoreMergerSegmentsIterator extends MemStoreSegmentsIterator {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed) {
|
||||
return;
|
||||
|
|
|
@ -788,7 +788,8 @@ class MetricsRegionServerWrapperImpl
|
|||
|
||||
OptionalDouble storeAvgStoreFileAge = store.getAvgStoreFileAge();
|
||||
if (storeAvgStoreFileAge.isPresent()) {
|
||||
avgAgeNumerator += storeAvgStoreFileAge.getAsDouble() * storeHFiles;
|
||||
avgAgeNumerator =
|
||||
(long) (avgAgeNumerator + storeAvgStoreFileAge.getAsDouble() * storeHFiles);
|
||||
}
|
||||
|
||||
tempStorefileIndexSize += store.getStorefilesRootLevelIndexSize();
|
||||
|
@ -931,6 +932,7 @@ class MetricsRegionServerWrapperImpl
|
|||
return averageRegionSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDataMissCount() {
|
||||
if (this.cacheStats == null) {
|
||||
return 0;
|
||||
|
|
|
@ -249,7 +249,7 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
|
|||
|
||||
OptionalDouble storeAvgStoreFileAge = store.getAvgStoreFileAge();
|
||||
if (storeAvgStoreFileAge.isPresent()) {
|
||||
avgAgeNumerator += storeAvgStoreFileAge.getAsDouble() * storeHFiles;
|
||||
avgAgeNumerator += (long) storeAvgStoreFileAge.getAsDouble() * storeHFiles;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,8 +74,12 @@ public class MultiVersionConcurrencyControl {
|
|||
public void advanceTo(long newStartPoint) {
|
||||
while (true) {
|
||||
long seqId = this.getWritePoint();
|
||||
if (seqId >= newStartPoint) break;
|
||||
if (this.tryAdvanceTo(/* newSeqId = */ newStartPoint, /* expected = */ seqId)) break;
|
||||
if (seqId >= newStartPoint) {
|
||||
break;
|
||||
}
|
||||
if (this.tryAdvanceTo(newStartPoint, seqId)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -239,6 +243,7 @@ public class MultiVersionConcurrencyControl {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
public String toString() {
|
||||
return MoreObjects.toStringHelper(this)
|
||||
.add("readPoint", readPoint)
|
||||
|
|
|
@ -2807,7 +2807,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
private static final long serialVersionUID = -4305297078988180130L;
|
||||
|
||||
@Override
|
||||
public Throwable fillInStackTrace() {
|
||||
public synchronized Throwable fillInStackTrace() {
|
||||
return this;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -279,6 +279,7 @@ public class RegionServerCoprocessorHost extends
|
|||
* @return An instance of RegionServerServices, an object NOT for general user-space Coprocessor
|
||||
* consumption.
|
||||
*/
|
||||
@Override
|
||||
public RegionServerServices getRegionServerServices() {
|
||||
return this.regionServerServices;
|
||||
}
|
||||
|
|
|
@ -435,8 +435,8 @@ public class ScannerContext {
|
|||
TIME_LIMIT_REACHED_MID_ROW(true, true),
|
||||
BATCH_LIMIT_REACHED(true, true);
|
||||
|
||||
private boolean moreValues;
|
||||
private boolean limitReached;
|
||||
private final boolean moreValues;
|
||||
private final boolean limitReached;
|
||||
|
||||
private NextState(boolean moreValues, boolean limitReached) {
|
||||
this.moreValues = moreValues;
|
||||
|
@ -492,13 +492,13 @@ public class ScannerContext {
|
|||
* limits, the checker must know their own scope (i.e. are they checking the limits between
|
||||
* rows, between cells, etc...)
|
||||
*/
|
||||
int depth;
|
||||
final int depth;
|
||||
|
||||
LimitScope(int depth) {
|
||||
this.depth = depth;
|
||||
}
|
||||
|
||||
int depth() {
|
||||
final int depth() {
|
||||
return depth;
|
||||
}
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ public class ServerNonceManager {
|
|||
}
|
||||
|
||||
public boolean isExpired(long minRelevantTime) {
|
||||
return getActivityTime() < (minRelevantTime & (~0l >>> 3));
|
||||
return getActivityTime() < (minRelevantTime & (~0L >>> 3));
|
||||
}
|
||||
|
||||
public void setMvcc(long mvcc) {
|
||||
|
|
|
@ -24,6 +24,7 @@ public class SteppingSplitPolicy extends IncreasingToUpperBoundRegionSplitPolicy
|
|||
* This allows a table to spread quickly across servers, while avoiding creating
|
||||
* too many regions.
|
||||
*/
|
||||
@Override
|
||||
protected long getSizeToCheck(final int tableRegionsCount) {
|
||||
return tableRegionsCount == 1 ? this.initialSize : getDesiredMaxFileSize();
|
||||
}
|
||||
|
|
|
@ -187,14 +187,17 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
return scanners;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell peek() {
|
||||
return cur;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cell next() throws IOException {
|
||||
Cell retKey = cur;
|
||||
|
||||
|
@ -215,6 +218,7 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
return retKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seek(Cell key) throws IOException {
|
||||
if (seekCount != null) seekCount.increment();
|
||||
|
||||
|
@ -242,6 +246,7 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean reseek(Cell key) throws IOException {
|
||||
if (seekCount != null) seekCount.increment();
|
||||
|
||||
|
@ -298,6 +303,7 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (closed) return;
|
||||
cur = null;
|
||||
|
|
|
@ -104,12 +104,8 @@ public class StripeStoreFlusher extends StoreFlusher {
|
|||
return new StripeMultiFileWriter.WriterFactory() {
|
||||
@Override
|
||||
public StoreFileWriter createWriter() throws IOException {
|
||||
StoreFileWriter writer = store.createWriterInTmp(
|
||||
kvCount, store.getColumnFamilyDescriptor().getCompressionType(),
|
||||
/* isCompaction = */ false,
|
||||
/* includeMVCCReadpoint = */ true,
|
||||
/* includesTags = */ true,
|
||||
/* shouldDropBehind = */ false);
|
||||
StoreFileWriter writer = store.createWriterInTmp(kvCount,
|
||||
store.getColumnFamilyDescriptor().getCompressionType(), false, true, true, false);
|
||||
return writer;
|
||||
}
|
||||
};
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.io.compress.Compression;
|
|||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.CustomizedScanInfoBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
|
@ -261,10 +260,8 @@ public abstract class Compactor<T extends CellSink> {
|
|||
throws IOException {
|
||||
// When all MVCC readpoints are 0, don't write them.
|
||||
// See HBASE-8166, HBASE-12600, and HBASE-13389.
|
||||
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression,
|
||||
/* isCompaction = */true,
|
||||
/* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0,
|
||||
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
|
||||
return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
|
||||
fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, shouldDropBehind);
|
||||
}
|
||||
|
||||
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue