HBASE-17844 Subset of HBASE-14614, Procedure v2: Core Assignment Manager (non-critical changes)
Minor changes related to HBASE-14614. Added comments. Changed logging. Added toString formatting. Removed imports. Removed unused code.
This commit is contained in:
parent
752b258b7c
commit
d033cbb715
|
@ -19,27 +19,15 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.master.RegionState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionInTransition;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos.HBaseVersionFileContent;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
|
||||
import org.apache.hadoop.hbase.util.ByteStringer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.VersionedWritable;
|
||||
|
||||
|
||||
|
|
|
@ -2049,7 +2049,7 @@ public class MetaTableAccessor {
|
|||
+ Bytes.toStringBinary(HConstants.MERGEB_QUALIFIER));
|
||||
}
|
||||
|
||||
private static Put addRegionInfo(final Put p, final HRegionInfo hri)
|
||||
public static Put addRegionInfo(final Put p, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
p.addImmutable(getCatalogFamily(), HConstants.REGIONINFO_QUALIFIER,
|
||||
hri.toByteArray());
|
||||
|
|
|
@ -207,12 +207,12 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
|||
long closeBeforeTime = EnvironmentEdgeManager.currentTime() - minIdleTimeBeforeClose;
|
||||
synchronized (connections) {
|
||||
for (T conn : connections.values()) {
|
||||
// remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
|
||||
// connection itself has already shutdown. The latter check is because that we may still
|
||||
// Remove connection if it has not been chosen by anyone for more than maxIdleTime, and the
|
||||
// connection itself has already shutdown. The latter check is because we may still
|
||||
// have some pending calls on connection so we should not shutdown the connection outside.
|
||||
// The connection itself will disconnect if there is no pending call for maxIdleTime.
|
||||
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
|
||||
LOG.info("Cleanup idle connection to " + conn.remoteId().address);
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address);
|
||||
connections.removeValue(conn.remoteId(), conn);
|
||||
conn.cleanupConnection();
|
||||
}
|
||||
|
|
|
@ -25,14 +25,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Throw this in rpc call if there are too many pending requests for one region server
|
||||
* Throw this in RPC call if there are too many pending requests for one region server
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class ServerTooBusyException extends DoNotRetryIOException {
|
||||
|
||||
public ServerTooBusyException(InetSocketAddress address, long count) {
|
||||
super("There are " + count + " concurrent rpc requests for " + address);
|
||||
super("Busy Server! " + count + " concurrent RPCs against " + address);
|
||||
}
|
||||
|
||||
}
|
|
@ -166,7 +166,7 @@ public class RegionState {
|
|||
state = MERGING_NEW;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("");
|
||||
throw new IllegalStateException("Unhandled state " + protoState);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
|
|
@ -1803,7 +1803,7 @@ public final class ProtobufUtil {
|
|||
* has a serialized {@link ServerName} in it.
|
||||
* @return Returns null if <code>data</code> is null else converts passed data
|
||||
* to a ServerName instance.
|
||||
* @throws DeserializationException
|
||||
* @throws DeserializationException
|
||||
*/
|
||||
public static ServerName toServerName(final byte [] data) throws DeserializationException {
|
||||
if (data == null || data.length <= 0) return null;
|
||||
|
|
|
@ -149,7 +149,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
|
||||
|
|
|
@ -248,7 +248,7 @@ public class ChoreService implements ChoreServicer {
|
|||
*/
|
||||
static class ChoreServiceThreadFactory implements ThreadFactory {
|
||||
private final String threadPrefix;
|
||||
private final static String THREAD_NAME_SUFFIX = "_ChoreService_";
|
||||
private final static String THREAD_NAME_SUFFIX = "_Chore_";
|
||||
private AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
|
||||
/**
|
||||
|
|
|
@ -146,7 +146,7 @@ public final class HConstants {
|
|||
public static final int DEFAULT_HBASE_BALANCER_PERIOD = 300000;
|
||||
|
||||
/** The name of the ensemble table */
|
||||
public static final String ENSEMBLE_TABLE_NAME = "hbase:ensemble";
|
||||
public static final TableName ENSEMBLE_TABLE_NAME = TableName.valueOf("hbase:ensemble");
|
||||
|
||||
/** Config for pluggable region normalizer */
|
||||
public static final String HBASE_MASTER_NORMALIZER_CLASS =
|
||||
|
|
|
@ -89,7 +89,12 @@ public final class TableName implements Comparable<TableName> {
|
|||
public static final String OLD_META_STR = ".META.";
|
||||
public static final String OLD_ROOT_STR = "-ROOT-";
|
||||
|
||||
|
||||
/**
|
||||
* @return True if <code>tn</code> is the hbase:meta table name.
|
||||
*/
|
||||
public static boolean isMetaTableName(final TableName tn) {
|
||||
return tn.equals(TableName.META_TABLE_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* TableName for old -ROOT- table. It is used to read/process old WALs which have
|
||||
|
|
|
@ -25,13 +25,10 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
||||
private static final Log LOG = LogFactory.getLog(AbstractProcedureScheduler.class);
|
||||
|
||||
private final ReentrantLock schedLock = new ReentrantLock();
|
||||
private final Condition schedWaitCond = schedLock.newCondition();
|
||||
private boolean running = false;
|
||||
|
|
|
@ -19,24 +19,25 @@
|
|||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
/**
|
||||
* Locking for mutual exclusion between procedures. Only by procedure framework internally.
|
||||
* Locking for mutual exclusion between procedures. Used only by procedure framework internally.
|
||||
* {@link LockAndQueue} has two purposes:
|
||||
* <ol>
|
||||
* <li>Acquire/release exclusive/shared locks</li>
|
||||
* <li>Maintain a list of procedures waiting for this lock<br>
|
||||
* To do so, {@link LockAndQueue} extends {@link ProcedureDeque} class. Using inheritance over
|
||||
* composition for this need is unusual, but the choice is motivated by million regions
|
||||
* assignment case as it will reduce memory footprint and number of objects to be GCed.
|
||||
* <li>Acquire/release exclusive/shared locks.</li>
|
||||
* <li>Maintains a list of procedures waiting on this lock.
|
||||
* {@link LockAndQueue} extends {@link ProcedureDeque} class. Blocked Procedures are added
|
||||
* to our super Deque. Using inheritance over composition to keep the Deque of waiting
|
||||
* Procedures is unusual, but we do it this way because in certain cases, there will be
|
||||
* millions of regions. This layout uses less memory.
|
||||
* </ol>
|
||||
*
|
||||
* NOT thread-safe. Needs external concurrency control. For eg. Uses in MasterProcedureScheduler are
|
||||
* <p>NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
|
||||
* guarded by schedLock().
|
||||
* <br>
|
||||
* There is no need of 'volatile' keyword for member variables because of memory synchronization
|
||||
* guarantees of locks (see 'Memory Synchronization',
|
||||
* http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html)
|
||||
* <br>
|
||||
* We do not implement Lock interface because we need exclusive + shared locking, and also
|
||||
* We do not implement Lock interface because we need exclusive and shared locking, and also
|
||||
* because try-lock functions require procedure id.
|
||||
* <br>
|
||||
* We do not use ReentrantReadWriteLock directly because of its high memory overhead.
|
||||
|
@ -104,6 +105,9 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if we released a lock.
|
||||
*/
|
||||
public boolean releaseExclusiveLock(final Procedure proc) {
|
||||
if (isLockOwner(proc.getProcId())) {
|
||||
exclusiveLockProcIdOwner = Long.MIN_VALUE;
|
||||
|
@ -111,4 +115,11 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "exclusiveLockOwner=" + (hasExclusiveLock()? getExclusiveLockProcIdOwner(): "NONE") +
|
||||
", sharedLockCount=" + getSharedLockCount() +
|
||||
", waitingProcCount=" + size();
|
||||
}
|
||||
}
|
|
@ -253,13 +253,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
*/
|
||||
protected StringBuilder toStringSimpleSB() {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
toStringClassDetails(sb);
|
||||
|
||||
sb.append(", procId=");
|
||||
sb.append("procId=");
|
||||
sb.append(getProcId());
|
||||
|
||||
if (hasParent()) {
|
||||
sb.append(", parent=");
|
||||
sb.append(", parentProcId=");
|
||||
sb.append(getParentProcId());
|
||||
}
|
||||
|
||||
|
@ -275,6 +274,9 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
sb.append(", failed=" + getException());
|
||||
}
|
||||
|
||||
sb.append(", ");
|
||||
toStringClassDetails(sb);
|
||||
|
||||
return sb;
|
||||
}
|
||||
|
||||
|
@ -631,7 +633,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected synchronized boolean childrenCountDown() {
|
||||
assert childrenLatch > 0;
|
||||
assert childrenLatch > 0: this;
|
||||
return --childrenLatch == 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
|
||||
|
@ -50,6 +49,7 @@ public class ProcedureEvent<T> {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "(" + object + ")";
|
||||
return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() +
|
||||
", suspended procedures count=" + getSuspendedProcedures().size();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -360,8 +360,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc;
|
||||
|
||||
if (debugEnabled) {
|
||||
LOG.debug(String.format("Loading state=%s isFailed=%s: %s",
|
||||
proc.getState(), proc.hasException(), proc));
|
||||
LOG.debug(String.format("Loading %s", proc));
|
||||
}
|
||||
|
||||
Long rootProcId = getRootProcedureId(proc);
|
||||
|
@ -483,7 +482,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// We have numThreads executor + one timer thread used for timing out
|
||||
// procedures and triggering periodic procedures.
|
||||
this.corePoolSize = numThreads;
|
||||
LOG.info("Starting executor threads=" + corePoolSize);
|
||||
LOG.info("Starting executor worker threads=" + corePoolSize);
|
||||
|
||||
// Create the Thread Group for the executors
|
||||
threadGroup = new ThreadGroup("ProcedureExecutor");
|
||||
|
@ -522,7 +521,9 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
|
||||
|
||||
// Start the executors. Here we must have the lastProcId set.
|
||||
LOG.debug("Start workers " + workerThreads.size());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Start workers " + workerThreads.size());
|
||||
}
|
||||
timeoutExecutor.start();
|
||||
for (WorkerThread worker: workerThreads) {
|
||||
worker.start();
|
||||
|
@ -1147,8 +1148,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
|
||||
if (proc.isSuccess()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Completed in " +
|
||||
StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc);
|
||||
LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime()));
|
||||
}
|
||||
// Finalize the procedure state
|
||||
if (proc.getProcId() == rootProcId) {
|
||||
|
@ -1242,8 +1242,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
|
||||
// Finalize the procedure state
|
||||
LOG.info("Rolled back " + rootProc +
|
||||
" exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
|
||||
" exception=" + exception.getMessage());
|
||||
" exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()));
|
||||
procedureFinished(rootProc);
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
}
|
||||
|
@ -1342,7 +1341,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
return;
|
||||
} catch (Throwable e) {
|
||||
// Catch NullPointerExceptions or similar errors...
|
||||
String msg = "CODE-BUG: Uncatched runtime exception for procedure: " + procedure;
|
||||
String msg = "CODE-BUG: Uncaught runtime exception: " + procedure;
|
||||
LOG.error(msg, e);
|
||||
procedure.setFailure(new RemoteProcedureException(msg, e));
|
||||
}
|
||||
|
@ -1558,7 +1557,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE);
|
||||
|
||||
public WorkerThread(final ThreadGroup group) {
|
||||
super(group, "ProcedureExecutorWorker-" + workerId.incrementAndGet());
|
||||
super(group, "ProcExecWorker-" + workerId.incrementAndGet());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1674,7 +1673,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// if the procedure is in a waiting state again, put it back in the queue
|
||||
procedure.updateTimestamp();
|
||||
if (procedure.isWaiting()) {
|
||||
delayed.setTimeoutTimestamp(procedure.getTimeoutTimestamp());
|
||||
delayed.setTimeout(procedure.getTimeoutTimestamp());
|
||||
queue.add(delayed);
|
||||
}
|
||||
} else {
|
||||
|
@ -1752,7 +1751,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long getTimeoutTimestamp() {
|
||||
public long getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,13 +23,11 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
* Keep track of the runnable procedures
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public interface ProcedureScheduler {
|
||||
/**
|
||||
* Start the scheduler
|
||||
|
@ -93,7 +91,7 @@ public interface ProcedureScheduler {
|
|||
Procedure poll(long timeout, TimeUnit unit);
|
||||
|
||||
/**
|
||||
* Mark the event has not ready.
|
||||
* Mark the event as not ready.
|
||||
* procedures calling waitEvent() will be suspended.
|
||||
* @param event the event to mark as suspended/not ready
|
||||
*/
|
||||
|
|
|
@ -108,6 +108,9 @@ public abstract class StateMachineProcedure<TEnvironment, TState>
|
|||
if (aborted.get() && isRollbackSupported(getCurrentState())) {
|
||||
setAbortFailure(getClass().getSimpleName(), "abort requested");
|
||||
} else {
|
||||
if (aborted.get()) {
|
||||
LOG.warn("ignoring abort request " + state);
|
||||
}
|
||||
setNextState(getStateId(state));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,13 +32,19 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
public final class DelayedUtil {
|
||||
private DelayedUtil() { }
|
||||
|
||||
/**
|
||||
* Add a timeout to a Delay
|
||||
*/
|
||||
public interface DelayedWithTimeout extends Delayed {
|
||||
long getTimeoutTimestamp();
|
||||
long getTimeout();
|
||||
}
|
||||
|
||||
/**
|
||||
* POISON implementation; used to mark special state: e.g. shutdown.
|
||||
*/
|
||||
public static final DelayedWithTimeout DELAYED_POISON = new DelayedWithTimeout() {
|
||||
@Override
|
||||
public long getTimeoutTimestamp() {
|
||||
public long getTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -49,7 +55,7 @@ public final class DelayedUtil {
|
|||
|
||||
@Override
|
||||
public int compareTo(final Delayed o) {
|
||||
return Long.compare(0, DelayedUtil.getTimeoutTimestamp(o));
|
||||
return Long.compare(0, DelayedUtil.getTimeout(o));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -63,6 +69,9 @@ public final class DelayedUtil {
|
|||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @return null (if an interrupt) or an instance of E; resets interrupt on calling thread.
|
||||
*/
|
||||
public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) {
|
||||
try {
|
||||
return queue.take();
|
||||
|
@ -72,33 +81,42 @@ public final class DelayedUtil {
|
|||
}
|
||||
}
|
||||
|
||||
public static long getRemainingTime(final TimeUnit resultUnit, final long timeoutTime) {
|
||||
/**
|
||||
* @return Time remaining as milliseconds.
|
||||
*/
|
||||
public static long getRemainingTime(final TimeUnit resultUnit, final long timeout) {
|
||||
final long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
if (currentTime >= timeoutTime) {
|
||||
if (currentTime >= timeout) {
|
||||
return 0;
|
||||
}
|
||||
return resultUnit.convert(timeoutTime - currentTime, TimeUnit.MILLISECONDS);
|
||||
return resultUnit.convert(timeout - currentTime, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public static int compareDelayed(final Delayed o1, final Delayed o2) {
|
||||
return Long.compare(getTimeoutTimestamp(o1), getTimeoutTimestamp(o2));
|
||||
return Long.compare(getTimeout(o1), getTimeout(o2));
|
||||
}
|
||||
|
||||
private static long getTimeoutTimestamp(final Delayed o) {
|
||||
private static long getTimeout(final Delayed o) {
|
||||
assert o instanceof DelayedWithTimeout : "expected DelayedWithTimeout instance, got " + o;
|
||||
return ((DelayedWithTimeout)o).getTimeoutTimestamp();
|
||||
return ((DelayedWithTimeout)o).getTimeout();
|
||||
}
|
||||
|
||||
public static abstract class DelayedObject implements DelayedWithTimeout {
|
||||
@Override
|
||||
public long getDelay(final TimeUnit unit) {
|
||||
return DelayedUtil.getRemainingTime(unit, getTimeoutTimestamp());
|
||||
return DelayedUtil.getRemainingTime(unit, getTimeout());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(final Delayed other) {
|
||||
return DelayedUtil.compareDelayed(this, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
long timeout = getTimeout();
|
||||
return "timeout=" + timeout + ", delay=" + getDelay(TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
public static abstract class DelayedContainer<T> extends DelayedObject {
|
||||
|
@ -126,25 +144,25 @@ public final class DelayedUtil {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return getClass().getSimpleName() + "(" + getObject() + ")";
|
||||
return "containedObject=" + getObject() + ", " + super.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> {
|
||||
private long timeoutTimestamp;
|
||||
private long timeout;
|
||||
|
||||
public DelayedContainerWithTimestamp(final T object, final long timeoutTimestamp) {
|
||||
public DelayedContainerWithTimestamp(final T object, final long timeout) {
|
||||
super(object);
|
||||
setTimeoutTimestamp(timeoutTimestamp);
|
||||
setTimeout(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getTimeoutTimestamp() {
|
||||
return timeoutTimestamp;
|
||||
public long getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public void setTimeoutTimestamp(final long timeoutTimestamp) {
|
||||
this.timeoutTimestamp = timeoutTimestamp;
|
||||
public void setTimeout(final long timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,12 +18,16 @@
|
|||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -35,18 +39,14 @@ import org.apache.hadoop.hbase.ProcedureInfo;
|
|||
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
|
||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||
import org.apache.hadoop.hbase.io.util.StreamUtils;
|
||||
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
|
||||
import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
import org.apache.hadoop.hbase.util.NonceKey;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ProcedureTestingUtility {
|
||||
private static final Log LOG = LogFactory.getLog(ProcedureTestingUtility.class);
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ public class TestDelayedUtil {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long getTimeoutTimestamp() {
|
||||
public long getTimeout() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,7 +138,7 @@ public class RSGroupBasedLoadBalancer implements RSGroupableBalancer {
|
|||
}
|
||||
}
|
||||
}
|
||||
groupClusterLoad.put(TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME), groupClusterState);
|
||||
groupClusterLoad.put(HConstants.ENSEMBLE_TABLE_NAME, groupClusterState);
|
||||
this.internalBalancer.setClusterLoad(groupClusterLoad);
|
||||
List<RegionPlan> groupPlans = this.internalBalancer
|
||||
.balanceCluster(groupClusterState);
|
||||
|
|
|
@ -17,11 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
|
@ -37,8 +35,6 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
|||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||
@InterfaceStability.Evolving
|
||||
public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
|
||||
private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
|
||||
|
||||
private int port;
|
||||
private final PriorityFunction priority;
|
||||
private final RpcExecutor callExecutor;
|
||||
|
@ -82,14 +78,14 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
|
||||
if (callqReadShare > 0) {
|
||||
// at least 1 read handler and 1 write handler
|
||||
callExecutor = new RWQueueRpcExecutor("deafult.RWQ", Math.max(2, handlerCount),
|
||||
callExecutor = new RWQueueRpcExecutor("default.RWQ", Math.max(2, handlerCount),
|
||||
maxQueueLength, priority, conf, server);
|
||||
} else {
|
||||
if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
|
||||
callExecutor = new FastPathBalancedQueueRpcExecutor("deafult.FPBQ", handlerCount,
|
||||
callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount,
|
||||
maxQueueLength, priority, conf, server);
|
||||
} else {
|
||||
callExecutor = new BalancedQueueRpcExecutor("deafult.BQ", handlerCount, maxQueueLength,
|
||||
callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength,
|
||||
priority, conf, server);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1980,8 +1980,8 @@ public class SimpleRpcServer extends RpcServer {
|
|||
if (!running) {
|
||||
return;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(Thread.currentThread().getName()+": task running");
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("running");
|
||||
}
|
||||
try {
|
||||
closeIdle(false);
|
||||
|
|
|
@ -44,7 +44,9 @@ import edu.umd.cs.findbugs.annotations.Nullable;
|
|||
* <p>On cluster startup, bulk assignment can be used to determine
|
||||
* locations for all Regions in a cluster.
|
||||
*
|
||||
* <p>This classes produces plans for the {@link AssignmentManager} to execute.
|
||||
* <p>This classes produces plans for the
|
||||
* {@link org.apache.hadoop.hbase.master.AssignmentManager}
|
||||
* to execute.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObserver {
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -218,9 +219,7 @@ public class MasterWalManager {
|
|||
}
|
||||
|
||||
public void splitLog(final ServerName serverName) throws IOException {
|
||||
Set<ServerName> serverNames = new HashSet<>();
|
||||
serverNames.add(serverName);
|
||||
splitLog(serverNames);
|
||||
splitLog(Collections.<ServerName>singleton(serverName));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -228,9 +227,7 @@ public class MasterWalManager {
|
|||
* @param serverName logs belonging to this server will be split
|
||||
*/
|
||||
public void splitMetaLog(final ServerName serverName) throws IOException {
|
||||
Set<ServerName> serverNames = new HashSet<>();
|
||||
serverNames.add(serverName);
|
||||
splitMetaLog(serverNames);
|
||||
splitMetaLog(Collections.<ServerName>singleton(serverName));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -347,4 +344,4 @@ public class MasterWalManager {
|
|||
public RecoveryMode getLogRecoveryMode() {
|
||||
return this.splitLogManager.getRecoveryMode();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1033,7 +1033,7 @@ public class RegionStates {
|
|||
for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
|
||||
for (HRegionInfo hri: e.getValue()) {
|
||||
if (hri.isMetaRegion()) continue;
|
||||
TableName tablename = bytable ? hri.getTable() : TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME);
|
||||
TableName tablename = bytable ? hri.getTable() : HConstants.ENSEMBLE_TABLE_NAME;
|
||||
Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
|
||||
if (svrToRegions == null) {
|
||||
svrToRegions = new HashMap<>(serverHoldings.size());
|
||||
|
|
|
@ -30,7 +30,7 @@ import java.util.Random;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.RegionLoad;
|
|||
import org.apache.hadoop.hbase.ServerLoad;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
|
||||
|
@ -156,23 +157,16 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
@Override
|
||||
public synchronized void setConf(Configuration conf) {
|
||||
super.setConf(conf);
|
||||
LOG.info("loading config");
|
||||
|
||||
maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
|
||||
|
||||
stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
|
||||
maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
|
||||
|
||||
numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
|
||||
isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
|
||||
|
||||
minCostNeedBalance = conf.getFloat(MIN_COST_NEED_BALANCE_KEY, minCostNeedBalance);
|
||||
|
||||
if (localityCandidateGenerator == null) {
|
||||
localityCandidateGenerator = new LocalityBasedCandidateGenerator(services);
|
||||
}
|
||||
localityCost = new LocalityCostFunction(conf, services);
|
||||
|
||||
if (candidateGenerators == null) {
|
||||
candidateGenerators = new CandidateGenerator[] {
|
||||
new RandomCandidateGenerator(),
|
||||
|
@ -181,17 +175,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
new RegionReplicaRackCandidateGenerator(),
|
||||
};
|
||||
}
|
||||
|
||||
regionLoadFunctions = new CostFromRegionLoadFunction[] {
|
||||
new ReadRequestCostFunction(conf),
|
||||
new WriteRequestCostFunction(conf),
|
||||
new MemstoreSizeCostFunction(conf),
|
||||
new StoreFileCostFunction(conf)
|
||||
};
|
||||
|
||||
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
|
||||
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
|
||||
|
||||
costFunctions = new CostFunction[]{
|
||||
new RegionCountSkewCostFunction(conf),
|
||||
new PrimaryRegionCountSkewCostFunction(conf),
|
||||
|
@ -205,10 +196,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
|
|||
regionLoadFunctions[2],
|
||||
regionLoadFunctions[3],
|
||||
};
|
||||
|
||||
curFunctionCosts= new Double[costFunctions.length];
|
||||
tempFunctionCosts= new Double[costFunctions.length];
|
||||
|
||||
LOG.info("Loaded config; maxSteps=" + maxSteps + ", stepsPerRegion=" + stepsPerRegion +
|
||||
", maxRunningTime=" + maxRunningTime + ", isByTable=" + isByTable + ", etc.");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,11 +43,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Procedure to allow clients and external admin tools to take locks on table/namespace/regions.
|
||||
* This procedure when scheduled, acquires specified locks, suspends itself and waits for :
|
||||
* - call to unlock: if lock request came from the process itself, say master chore.
|
||||
* - Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding
|
||||
* the lock or not based on last heartbeat timestamp.
|
||||
* Procedure to allow blessed clients and external admin tools to take our internal Schema locks
|
||||
* used by the procedure framework isolating procedures doing creates/deletes etc. on
|
||||
* table/namespace/regions.
|
||||
* This procedure when scheduled, acquires specified locks, suspends itself and waits for:
|
||||
* <ul>
|
||||
* <li>Call to unlock: if lock request came from the process itself, say master chore.</li>
|
||||
* <li>Timeout : if lock request came from RPC. On timeout, evaluates if it should continue holding
|
||||
* the lock or not based on last heartbeat timestamp.</li>
|
||||
* </ul>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
||||
|
@ -191,7 +195,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
public void updateHeartBeat() {
|
||||
lastHeartBeat.set(System.currentTimeMillis());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Update heartbeat. Proc: " + toString());
|
||||
LOG.debug("Heartbeat " + toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,8 +206,10 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
*/
|
||||
protected boolean setTimeoutFailure(final MasterProcedureEnv env) {
|
||||
synchronized (event) {
|
||||
if (!event.isReady()) { // maybe unlock() awakened the event.
|
||||
if (LOG.isDebugEnabled()) LOG.debug("Timeout failure " + this.event);
|
||||
if (!event.isReady()) { // Maybe unlock() awakened the event.
|
||||
setState(ProcedureProtos.ProcedureState.RUNNABLE);
|
||||
if (LOG.isDebugEnabled()) LOG.debug("Calling wake on " + this.event);
|
||||
env.getProcedureScheduler().wakeEvent(event);
|
||||
}
|
||||
}
|
||||
|
@ -234,7 +240,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
}
|
||||
if (unlock.get() || hasHeartbeatExpired()) {
|
||||
locked.set(false);
|
||||
LOG.debug((unlock.get() ? "UNLOCKED - " : "TIMED OUT - ") + toString());
|
||||
LOG.debug((unlock.get()? "UNLOCKED " : "TIMED OUT ") + toString());
|
||||
return null;
|
||||
}
|
||||
synchronized (event) {
|
||||
|
@ -302,7 +308,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
hasLock = ret;
|
||||
if (ret) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("LOCKED - " + toString());
|
||||
LOG.debug("LOCKED " + toString());
|
||||
}
|
||||
lastHeartBeat.set(System.currentTimeMillis());
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
|
@ -352,7 +358,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
} else if (tableName != null) {
|
||||
return setupTableLock();
|
||||
} else {
|
||||
LOG.error("Unknown level specified in proc - " + toString());
|
||||
LOG.error("Unknown level specified in " + toString());
|
||||
throw new IllegalArgumentException("no namespace/table/region provided");
|
||||
}
|
||||
}
|
||||
|
@ -364,10 +370,10 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
this.opType = TableOperationType.EDIT;
|
||||
return new NamespaceExclusiveLock();
|
||||
case SHARED:
|
||||
LOG.error("Shared lock on namespace not supported. Proc - " + toString());
|
||||
LOG.error("Shared lock on namespace not supported for " + toString());
|
||||
throw new IllegalArgumentException("Shared lock on namespace not supported");
|
||||
default:
|
||||
LOG.error("Unexpected lock type in proc - " + toString());
|
||||
LOG.error("Unexpected lock type " + toString());
|
||||
throw new IllegalArgumentException("Wrong lock type: " + type.toString());
|
||||
}
|
||||
}
|
||||
|
@ -381,7 +387,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
this.opType = TableOperationType.READ;
|
||||
return new TableSharedLock();
|
||||
default:
|
||||
LOG.error("Unexpected lock type in proc - " + toString());
|
||||
LOG.error("Unexpected lock type " + toString());
|
||||
throw new IllegalArgumentException("Wrong lock type:" + type.toString());
|
||||
}
|
||||
}
|
||||
|
@ -393,7 +399,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
|
|||
this.opType = TableOperationType.REGION_EDIT;
|
||||
return new RegionExclusiveLock();
|
||||
default:
|
||||
LOG.error("Only exclusive lock supported on regions. Proc - " + toString());
|
||||
LOG.error("Only exclusive lock supported on regions for " + toString());
|
||||
throw new IllegalArgumentException("Only exclusive lock supported on regions.");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,9 +69,8 @@ public abstract class AbstractStateMachineTableProcedure<TState>
|
|||
@Override
|
||||
public void toStringClassDetails(final StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" (table=");
|
||||
sb.append(" table=");
|
||||
sb.append(getTableName());
|
||||
sb.append(")");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -111,4 +110,4 @@ public abstract class AbstractStateMachineTableProcedure<TState>
|
|||
throw new TableNotFoundException(getTableName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,6 @@ public class CreateNamespaceProcedure
|
|||
if (isTraceEnabled()) {
|
||||
LOG.trace(this + " execute state=" + state);
|
||||
}
|
||||
|
||||
try {
|
||||
switch (state) {
|
||||
case CREATE_NAMESPACE_PREPARE:
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.TableExistsException;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
|
||||
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
|
||||
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
||||
|
@ -51,52 +51,51 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
|
|||
* This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
|
||||
* that can be executed without having to wait on a lock.
|
||||
* Most of the master operations can be executed concurrently, if they
|
||||
* are operating on different tables (e.g. two create table can be performed
|
||||
* at the same, time assuming table A and table B) or against two different servers; say
|
||||
* two servers that crashed at about the same time.
|
||||
* are operating on different tables (e.g. two create table procedures can be performed
|
||||
* at the same time) or against two different servers; say two servers that crashed at
|
||||
* about the same time.
|
||||
*
|
||||
* <p>Each procedure should implement an interface providing information for this queue.
|
||||
* for example table related procedures should implement TableProcedureInterface.
|
||||
* each procedure will be pushed in its own queue, and based on the operation type
|
||||
* we may take smarter decision. e.g. we can abort all the operations preceding
|
||||
* <p>Each procedure should implement an Interface providing information for this queue.
|
||||
* For example table related procedures should implement TableProcedureInterface.
|
||||
* Each procedure will be pushed in its own queue, and based on the operation type
|
||||
* we may make smarter decisions: e.g. we can abort all the operations preceding
|
||||
* a delete table, or similar.
|
||||
*
|
||||
* <h4>Concurrency control</h4>
|
||||
* Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap,
|
||||
* serverBuckets) is controlled by schedLock(). That mainly includes:<br>
|
||||
* serverBuckets) is controlled by schedLock(). This mainly includes:<br>
|
||||
* <ul>
|
||||
* <li>
|
||||
* {@link #push(Procedure, boolean, boolean)} : A push will add a Queue back to run-queue
|
||||
* {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue
|
||||
* when:
|
||||
* <ol>
|
||||
* <li>queue was empty before push (so must have been out of run-queue)</li>
|
||||
* <li>child procedure is added (which means parent procedure holds exclusive lock, and it
|
||||
* <li>Queue was empty before push (so must have been out of run-queue)</li>
|
||||
* <li>Child procedure is added (which means parent procedure holds exclusive lock, and it
|
||||
* must have moved Queue out of run-queue)</li>
|
||||
* </ol>
|
||||
* </li>
|
||||
* <li>
|
||||
* {@link #poll(long)} : A poll will remove a Queue from run-queue when:
|
||||
* {@link #poll(long)}: A poll will remove a Queue from run-queue when:
|
||||
* <ol>
|
||||
* <li>queue becomes empty after poll</li>
|
||||
* <li>exclusive lock is requested by polled procedure and lock is available (returns the
|
||||
* <li>Queue becomes empty after poll</li>
|
||||
* <li>Exclusive lock is requested by polled procedure and lock is available (returns the
|
||||
* procedure)</li>
|
||||
* <li>exclusive lock is requested but lock is not available (returns null)</li>
|
||||
* <li>Polled procedure is child of parent holding exclusive lock, and the next procedure is
|
||||
* <li>Exclusive lock is requested but lock is not available (returns null)</li>
|
||||
* <li>Polled procedure is child of parent holding exclusive lock and the next procedure is
|
||||
* not a child</li>
|
||||
* </ol>
|
||||
* </li>
|
||||
* <li>
|
||||
* namespace/table/region locks: Queue is added back to run-queue when lock being released is:
|
||||
* Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
|
||||
* <ol>
|
||||
* <li>exclusive lock</li>
|
||||
* <li>last shared lock (in case queue was removed because next procedure in queue required
|
||||
* <li>Exclusive lock</li>
|
||||
* <li>Last shared lock (in case queue was removed because next procedure in queue required
|
||||
* exclusive lock)</li>
|
||||
* </ol>
|
||||
* </li>
|
||||
* </ul>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||
private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class);
|
||||
|
||||
|
@ -118,16 +117,16 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
* TableQueue with priority 1.
|
||||
*/
|
||||
private static class TablePriorities {
|
||||
final int metaTablePriority;
|
||||
final int userTablePriority;
|
||||
final int sysTablePriority;
|
||||
|
||||
TablePriorities(Configuration conf) {
|
||||
metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
|
||||
sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
|
||||
userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
|
||||
}
|
||||
|
||||
final int metaTablePriority;
|
||||
final int userTablePriority;
|
||||
final int sysTablePriority;
|
||||
|
||||
int getPriority(TableName tableName) {
|
||||
if (tableName.equals(TableName.META_TABLE_NAME)) {
|
||||
return metaTablePriority;
|
||||
|
@ -773,7 +772,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
|
||||
namespaceLock.releaseExclusiveLock(procedure);
|
||||
int waitingCount = 0;
|
||||
if(systemNamespaceTableLock.releaseSharedLock()) {
|
||||
if (systemNamespaceTableLock.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME));
|
||||
waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
|
||||
}
|
||||
|
@ -924,6 +923,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
* locks.
|
||||
*/
|
||||
private static class SchemaLocking {
|
||||
final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
|
||||
final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
|
||||
final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
|
||||
// Single map for all regions irrespective of tables. Key is encoded region name.
|
||||
final Map<String, LockAndQueue> regionLocks = new HashMap<>();
|
||||
|
||||
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
|
||||
LockAndQueue lock = map.get(key);
|
||||
if (lock == null) {
|
||||
|
@ -969,11 +974,29 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
regionLocks.clear();
|
||||
}
|
||||
|
||||
final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
|
||||
final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
|
||||
final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
|
||||
// Single map for all regions irrespective of tables. Key is encoded region name.
|
||||
final Map<String, LockAndQueue> regionLocks = new HashMap<>();
|
||||
@Override
|
||||
public String toString() {
|
||||
return "serverLocks=" + filterUnlocked(this.serverLocks) +
|
||||
", namespaceLocks=" + filterUnlocked(this.namespaceLocks) +
|
||||
", tableLocks=" + filterUnlocked(this.tableLocks) +
|
||||
", regionLocks=" + filterUnlocked(this.regionLocks);
|
||||
}
|
||||
|
||||
private String filterUnlocked(Map<?, LockAndQueue> locks) {
|
||||
StringBuilder sb = new StringBuilder("{");
|
||||
int initialLength = sb.length();
|
||||
for (Map.Entry<?, LockAndQueue> entry: locks.entrySet()) {
|
||||
if (!entry.getValue().isLocked()) continue;
|
||||
if (sb.length() > initialLength) sb.append(", ");
|
||||
sb.append("{");
|
||||
sb.append(entry.getKey());
|
||||
sb.append("=");
|
||||
sb.append(entry.getValue());
|
||||
sb.append("}");
|
||||
}
|
||||
sb.append("}");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
// ======================================================================
|
||||
|
@ -1057,4 +1080,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
return Math.max(1, queue.getPriority() * quantum); // TODO
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For debugging. Expensive.
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public String dumpLocks() throws IOException {
|
||||
// TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
|
||||
return this.locking.toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -287,7 +287,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
protected final Configuration conf;
|
||||
private final Configuration baseConf;
|
||||
private final int rowLockWaitDuration;
|
||||
private CompactedHFilesDischarger compactedFileDischarger;
|
||||
static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
|
||||
|
||||
// The internal wait duration to acquire a lock before read/update
|
||||
|
@ -1703,8 +1702,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
if (this.metricsRegionWrapper != null) {
|
||||
Closeables.closeQuietly(this.metricsRegionWrapper);
|
||||
}
|
||||
// stop the Compacted hfile discharger
|
||||
if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true);
|
||||
status.markComplete("Closed");
|
||||
LOG.info("Closed " + this);
|
||||
return result;
|
||||
|
@ -7612,7 +7609,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT +
|
||||
ClassSize.ARRAY +
|
||||
50 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
49 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
|
||||
(14 * Bytes.SIZEOF_LONG) +
|
||||
6 * Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
|
|
|
@ -1298,10 +1298,9 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
// Ready to go. Have list of files to compact.
|
||||
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
|
||||
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
|
||||
+ " into tmpdir=" + fs.getTempDir() + ", totalSize="
|
||||
+ TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
|
||||
LOG.info("Starting compaction of " + filesToCompact +
|
||||
" into tmpdir=" + fs.getTempDir() + ", totalSize=" +
|
||||
TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
|
||||
|
||||
// Commence the compaction.
|
||||
List<Path> newFiles = compaction.compact(throughputController, user);
|
||||
|
|
|
@ -123,8 +123,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||
openSuccessful = true;
|
||||
|
||||
// Done! Successful region open
|
||||
LOG.debug("Opened " + regionName + " on " +
|
||||
this.server.getServerName());
|
||||
LOG.debug("Opened " + regionName + " on " + this.server.getServerName());
|
||||
} finally {
|
||||
// Do all clean up here
|
||||
if (!openSuccessful) {
|
||||
|
|
|
@ -99,8 +99,10 @@ public class PressureAwareCompactionThroughputController extends PressureAwareTh
|
|||
maxThroughputLowerBound + (maxThroughputUpperBound - maxThroughputLowerBound)
|
||||
* compactionPressure;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
|
||||
if (LOG.isTraceEnabled()) {
|
||||
// TODO: FIX!!! Don't log unless some activity or a change in config. Making TRACE
|
||||
// in the meantime.
|
||||
LOG.trace("CompactionPressure is " + compactionPressure + ", tune throughput to "
|
||||
+ throughputDesc(maxThroughputToSet));
|
||||
}
|
||||
this.setMaxThroughput(maxThroughputToSet);
|
||||
|
|
|
@ -695,7 +695,8 @@ public class WALSplitter {
|
|||
*/
|
||||
public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
|
||||
long newSeqId, long saftyBumper) throws IOException {
|
||||
|
||||
// TODO: Why are we using a method in here as part of our normal region open where
|
||||
// there is no splitting involved? Fix. St.Ack 01/20/2017.
|
||||
Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
|
||||
long maxSeqId = 0;
|
||||
FileStatus[] files = null;
|
||||
|
@ -732,7 +733,7 @@ public class WALSplitter {
|
|||
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
|
||||
LOG.debug("Wrote file=" + newSeqIdFile + ", newSeqId=" + newSeqId
|
||||
+ ", maxSeqId=" + maxSeqId);
|
||||
}
|
||||
} catch (FileAlreadyExistsException ignored) {
|
||||
|
|
|
@ -127,7 +127,7 @@ public class TestStochasticBalancerJmxMetrics extends BalancerTestBase {
|
|||
conf.setBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, false);
|
||||
loadBalancer.setConf(conf);
|
||||
|
||||
TableName tableName = TableName.valueOf(HConstants.ENSEMBLE_TABLE_NAME);
|
||||
TableName tableName = HConstants.ENSEMBLE_TABLE_NAME;
|
||||
Map<ServerName, List<HRegionInfo>> clusterState = mockClusterServers(mockCluster_ensemble);
|
||||
loadBalancer.balanceCluster(tableName, clusterState);
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
||||
import org.apache.hadoop.hbase.util.HBaseFsckRepair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
|
|
@ -150,6 +150,9 @@ public class TestChangingEncoding {
|
|||
Result result = table.get(get);
|
||||
for (int j = 0; j < NUM_COLS_PER_ROW; ++j) {
|
||||
Cell kv = result.getColumnLatestCell(CF_BYTES, getQualifier(j));
|
||||
if (kv == null) {
|
||||
continue;
|
||||
}
|
||||
assertTrue(CellUtil.matchingValue(kv, getValue(batchId, i, j)));
|
||||
}
|
||||
}
|
||||
|
@ -238,7 +241,7 @@ public class TestChangingEncoding {
|
|||
public void testCrazyRandomChanges() throws Exception {
|
||||
prepareTest("RandomChanges");
|
||||
Random rand = new Random(2934298742974297L);
|
||||
for (int i = 0; i < 20; ++i) {
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
int encodingOrdinal = rand.nextInt(DataBlockEncoding.values().length);
|
||||
DataBlockEncoding encoding = DataBlockEncoding.values()[encodingOrdinal];
|
||||
setEncodingConf(encoding, rand.nextBoolean());
|
||||
|
@ -246,5 +249,4 @@ public class TestChangingEncoding {
|
|||
verifyAllData();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -434,7 +434,7 @@ public class TestSimpleRpcScheduler {/*
|
|||
@Test
|
||||
public void testCoDelScheduling() throws Exception {
|
||||
CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
|
||||
envEdge.threadNamePrefixs.add("RpcServer.deafult.FPBQ.Codel.handler");
|
||||
envEdge.threadNamePrefixs.add("RpcServer.default.FPBQ.Codel.handler");
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
|
||||
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
|
||||
|
@ -456,7 +456,6 @@ public class TestSimpleRpcScheduler {/*
|
|||
for (int i = 0; i < 100; i++) {
|
||||
long time = System.currentTimeMillis();
|
||||
envEdge.timeQ.put(time);
|
||||
long now = System.currentTimeMillis();
|
||||
CallRunner cr = getMockedCallRunner(time, 2);
|
||||
// LOG.info("" + i + " " + (System.currentTimeMillis() - now) + " cr=" + cr);
|
||||
scheduler.dispatch(cr);
|
||||
|
|
|
@ -76,7 +76,6 @@ public class TestLockProcedure {
|
|||
// crank this up if this test turns out to be flaky.
|
||||
private static final int HEARTBEAT_TIMEOUT = 1000;
|
||||
private static final int LOCAL_LOCKS_TIMEOUT = 2000;
|
||||
private static final int ZK_EXPIRATION = 2 * HEARTBEAT_TIMEOUT;
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestLockProcedure.class);
|
||||
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
|
|
@ -207,10 +207,9 @@ public abstract class AbstractTestWALReplay {
|
|||
|
||||
// move region to another regionserver
|
||||
Region destRegion = regions.get(0);
|
||||
int originServerNum = hbaseCluster
|
||||
.getServerWith(destRegion.getRegionInfo().getRegionName());
|
||||
assertTrue("Please start more than 1 regionserver", hbaseCluster
|
||||
.getRegionServerThreads().size() > 1);
|
||||
int originServerNum = hbaseCluster.getServerWith(destRegion.getRegionInfo().getRegionName());
|
||||
assertTrue("Please start more than 1 regionserver",
|
||||
hbaseCluster.getRegionServerThreads().size() > 1);
|
||||
int destServerNum = 0;
|
||||
while (destServerNum == originServerNum) {
|
||||
destServerNum++;
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
|
@ -31,11 +32,15 @@ import org.apache.hadoop.hbase.wal.WALFactory;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestRule;
|
||||
|
||||
@Category({ VerySlowRegionServerTests.class, LargeTests.class })
|
||||
public class TestAsyncLogRolling extends AbstractTestLogRolling {
|
||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
|
||||
withLookingForStuckThread(true).build();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -62,4 +67,4 @@ public class TestAsyncLogRolling extends AbstractTestLogRolling {
|
|||
doPut(table, 2);
|
||||
assertEquals(numRolledLogFiles + 1, AsyncFSWALProvider.getNumRolledLogFiles(wal));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue