This commit is contained in:
anoopsamjohn 2016-09-15 18:07:12 +05:30
commit e6f8f6dbd0
33 changed files with 677 additions and 606 deletions

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
@ -1085,9 +1086,11 @@ public interface Admin extends Abortable, Closeable {
*
* @param name name of namespace descriptor
* @return A descriptor
* @throws org.apache.hadoop.hbase.NamespaceNotFoundException
* @throws IOException if a remote or network exception occurs
*/
NamespaceDescriptor getNamespaceDescriptor(final String name)
throws IOException;
throws NamespaceNotFoundException, IOException;
/**
* List available namespace descriptors

View File

@ -756,7 +756,7 @@ class AsyncProcess {
@Override
public void run() {
AbstractResponse res;
AbstractResponse res = null;
CancellableRegionServerCallable callable = currentCallable;
try {
// setup the callable based on the actions, if we don't have one already from the request
@ -802,7 +802,7 @@ class AsyncProcess {
throw new RuntimeException(t);
} finally {
decTaskCounters(multiAction.getRegions(), server);
if (callsInProgress != null && callable != null) {
if (callsInProgress != null && callable != null && res != null) {
callsInProgress.remove(callable);
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.ProcedureUtil;
@ -1981,7 +1982,8 @@ public class HBaseAdmin implements Admin {
}
@Override
public NamespaceDescriptor getNamespaceDescriptor(final String name) throws IOException {
public NamespaceDescriptor getNamespaceDescriptor(final String name)
throws NamespaceNotFoundException, IOException {
return executeCallable(new MasterCallable<NamespaceDescriptor>(getConnection(),
getRpcControllerFactory()) {
@Override

View File

@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.MethodDescriptor;
@ -137,6 +140,16 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
private final ScheduledFuture<?> cleanupIdleConnectionTask;
private int maxConcurrentCallsPerServer;
private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
@Override public AtomicInteger load(InetSocketAddress key) throws Exception {
return new AtomicInteger(0);
}
});
/**
* Construct an IPC client for the cluster <code>clusterId</code>
* @param conf configuration
@ -167,6 +180,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
this.metrics = metrics;
this.maxConcurrentCallsPerServer = conf.getInt(
HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
@ -382,16 +398,22 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
}
}, cs);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
}
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {

View File

@ -0,0 +1,38 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Throw this in rpc call if there are too many pending requests for one region server
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ServerTooBusyException extends DoNotRetryIOException {
public ServerTooBusyException(InetSocketAddress address, long count) {
super("There are " + count + " concurrent rpc requests for " + address);
}
}

View File

@ -737,6 +737,18 @@ public final class HConstants {
*/
public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1;
/**
* The maximum number of concurrent pending RPC requests for one server in process level.
*/
public static final String HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD =
"hbase.client.perserver.requests.threshold";
/**
* Default value of {@link #HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD}.
*/
public static final int DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD = Integer.MAX_VALUE;
/**
* Parameter name for server pause value, used mostly as value to wait before
* running a retry of a failed operation.

View File

@ -252,6 +252,14 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
return this.code;
}
private static Type[] codeArray = new Type[256];
static {
for (Type t : Type.values()) {
codeArray[t.code & 0xff] = t;
}
}
/**
* Cannot rely on enum ordinals . They change if item is removed or moved.
* Do our own codes.
@ -259,11 +267,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
* @return Type associated with passed code.
*/
public static Type codeToType(final byte b) {
for (Type t : Type.values()) {
if (t.getCode() == b) {
Type t = codeArray[b & 0xff];
if (t != null) {
return t;
}
}
throw new RuntimeException("Unknown code " + b);
}
}

View File

@ -487,23 +487,33 @@ possible configurations would overwhelm and obscure the important.
<property>
<name>hbase.client.max.total.tasks</name>
<value>100</value>
<description>The maximum number of concurrent tasks a single HTable instance will
<description>The maximum number of concurrent mutation tasks a single HTable instance will
send to the cluster.</description>
</property>
<property>
<name>hbase.client.max.perserver.tasks</name>
<value>5</value>
<description>The maximum number of concurrent tasks a single HTable instance will
<description>The maximum number of concurrent mutation tasks a single HTable instance will
send to a single region server.</description>
</property>
<property>
<name>hbase.client.max.perregion.tasks</name>
<value>1</value>
<description>The maximum number of concurrent connections the client will
<description>The maximum number of concurrent mutation tasks the client will
maintain to a single Region. That is, if there is already
hbase.client.max.perregion.tasks writes in progress for this region, new puts
won't be sent to this region until some writes finishes.</description>
</property>
<property>
<name>hbase.client.perserver.requests.threshold</name>
<value>2147483647</value>
<description>The max number of concurrent pending requests for one server in all client threads
(process level). Exceeding requests will be thrown ServerTooBusyException immediately to prevent
user's threads being occupied and blocked by only one slow region server. If you use a fix
number of threads to access HBase in a synchronous way, set this to a suitable value which is
related to the number of threads will help you. See
https://issues.apache.org/jira/browse/HBASE-16388 for details.</description>
</property>
<property>
<name>hbase.client.scanner.caching</name>
<value>2147483647</value>

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
/**
* Base class for all the Namespace procedures that want to use a StateMachineProcedure.
* It provide some basic helpers like basic locking and basic toStringClassDetails().
*/
@InterfaceAudience.Private
public abstract class AbstractStateMachineNamespaceProcedure<TState>
extends StateMachineProcedure<MasterProcedureEnv, TState>
implements TableProcedureInterface {
protected AbstractStateMachineNamespaceProcedure() {
// Required by the Procedure framework to create the procedure on replay
}
protected AbstractStateMachineNamespaceProcedure(final MasterProcedureEnv env) {
this.setOwner(env.getRequestUser().getShortName());
}
protected abstract String getNamespaceName();
@Override
public TableName getTableName() {
return TableName.NAMESPACE_TABLE_NAME;
}
@Override
public abstract TableOperationType getTableOperationType();
@Override
public void toStringClassDetails(final StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (namespace=");
sb.append(getNamespaceName());
sb.append(")");
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User;
/**
* Base class for all the Table procedures that want to use a StateMachineProcedure.
* It provide some basic helpers like basic locking, sync latch, and basic toStringClassDetails().
*/
@InterfaceAudience.Private
public abstract class AbstractStateMachineTableProcedure<TState>
extends StateMachineProcedure<MasterProcedureEnv, TState>
implements TableProcedureInterface {
// used for compatibility with old clients
private final ProcedurePrepareLatch syncLatch;
private User user;
protected AbstractStateMachineTableProcedure() {
// Required by the Procedure framework to create the procedure on replay
syncLatch = null;
}
protected AbstractStateMachineTableProcedure(final MasterProcedureEnv env) {
this(env, null);
}
protected AbstractStateMachineTableProcedure(final MasterProcedureEnv env,
final ProcedurePrepareLatch latch) {
this.user = env.getRequestUser();
this.setOwner(user.getShortName());
// used for compatibility with clients without procedures
// they need a sync TableExistsException, TableNotFoundException, TableNotDisabledException, ...
this.syncLatch = latch;
}
@Override
public abstract TableName getTableName();
@Override
public abstract TableOperationType getTableOperationType();
@Override
public void toStringClassDetails(final StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (table=");
sb.append(getTableName());
sb.append(")");
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
}
protected User getUser() {
return user;
}
protected void setUser(final User user) {
this.user = user;
}
protected void releaseSyncLatch() {
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
}
/**
* Check whether a table is modifiable - exists and either offline or online with config set
* @param env MasterProcedureEnv
* @throws IOException
*/
protected void checkTableModifiable(final MasterProcedureEnv env) throws IOException {
// Checks whether the table exists
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), getTableName())) {
throw new TableNotFoundException(getTableName());
}
}
}

View File

@ -33,37 +33,30 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.AddColumnFamilyState;
import org.apache.hadoop.hbase.security.User;
/**
* The procedure to add a column family to an existing table.
*/
@InterfaceAudience.Private
public class AddColumnFamilyProcedure
extends StateMachineProcedure<MasterProcedureEnv, AddColumnFamilyState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<AddColumnFamilyState> {
private static final Log LOG = LogFactory.getLog(AddColumnFamilyProcedure.class);
private TableName tableName;
private HTableDescriptor unmodifiedHTableDescriptor;
private HColumnDescriptor cfDescriptor;
private User user;
private List<HRegionInfo> regionInfoList;
private Boolean traceEnabled;
// used for compatibility with old clients, until 2.0 the client had a sync behavior
private final ProcedurePrepareLatch syncLatch;
public AddColumnFamilyProcedure() {
super();
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
this.syncLatch = null;
}
public AddColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
@ -73,14 +66,12 @@ public class AddColumnFamilyProcedure
public AddColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
final HColumnDescriptor cfDescriptor, final ProcedurePrepareLatch latch) {
super(env, latch);
this.tableName = tableName;
this.cfDescriptor = cfDescriptor;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
this.syncLatch = latch;
}
@Override
@ -153,7 +144,7 @@ public class AddColumnFamilyProcedure
@Override
protected void completionCleanup(final MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
}
@Override
@ -171,24 +162,13 @@ public class AddColumnFamilyProcedure
return AddColumnFamilyState.ADD_COLUMN_FAMILY_PREPARE;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.AddColumnFamilyStateData.Builder addCFMsg =
MasterProcedureProtos.AddColumnFamilyStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setColumnfamilySchema(ProtobufUtil.convertToColumnFamilySchema(cfDescriptor));
if (unmodifiedHTableDescriptor != null) {
@ -205,7 +185,7 @@ public class AddColumnFamilyProcedure
MasterProcedureProtos.AddColumnFamilyStateData addCFMsg =
MasterProcedureProtos.AddColumnFamilyStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(addCFMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(addCFMsg.getUserInfo()));
tableName = ProtobufUtil.toTableName(addCFMsg.getTableName());
cfDescriptor = ProtobufUtil.convertToHColumnDesc(addCFMsg.getColumnfamilySchema());
if (addCFMsg.hasUnmodifiedTableSchema()) {
@ -244,7 +224,7 @@ public class AddColumnFamilyProcedure
*/
private void prepareAdd(final MasterProcedureEnv env) throws IOException {
// Checks whether the table is allowed to be modified.
MasterDDLOperationHelper.checkTableModifiable(env, tableName);
checkTableModifiable(env);
// In order to update the descriptor, we need to retrieve the old descriptor for comparison.
unmodifiedHTableDescriptor = env.getMasterServices().getTableDescriptors().get(tableName);
@ -369,10 +349,10 @@ public class AddColumnFamilyProcedure
if (cpHost != null) {
switch (state) {
case ADD_COLUMN_FAMILY_PRE_OPERATION:
cpHost.preAddColumnFamilyAction(tableName, cfDescriptor, user);
cpHost.preAddColumnFamilyAction(tableName, cfDescriptor, getUser());
break;
case ADD_COLUMN_FAMILY_POST_OPERATION:
cpHost.postCompletedAddColumnFamilyAction(tableName, cfDescriptor, user);
cpHost.postCompletedAddColumnFamilyAction(tableName, cfDescriptor, getUser());
break;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);

View File

@ -47,12 +47,10 @@ import org.apache.hadoop.hbase.master.MetricsSnapshot;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure.CreateHdfsRegions;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CloneSnapshotState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@ -67,11 +65,9 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
public class CloneSnapshotProcedure
extends StateMachineProcedure<MasterProcedureEnv, CloneSnapshotState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<CloneSnapshotState> {
private static final Log LOG = LogFactory.getLog(CloneSnapshotProcedure.class);
private User user;
private HTableDescriptor hTableDescriptor;
private SnapshotDescription snapshot;
private List<HRegionInfo> newRegions = null;
@ -97,10 +93,9 @@ public class CloneSnapshotProcedure
*/
public CloneSnapshotProcedure(final MasterProcedureEnv env,
final HTableDescriptor hTableDescriptor, final SnapshotDescription snapshot) {
super(env);
this.hTableDescriptor = hTableDescriptor;
this.snapshot = snapshot;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
getMonitorStatus();
}
@ -233,7 +228,7 @@ public class CloneSnapshotProcedure
MasterProcedureProtos.CloneSnapshotStateData.Builder cloneSnapshotMsg =
MasterProcedureProtos.CloneSnapshotStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setSnapshot(this.snapshot)
.setTableSchema(ProtobufUtil.convertToTableSchema(hTableDescriptor));
if (newRegions != null) {
@ -264,7 +259,7 @@ public class CloneSnapshotProcedure
MasterProcedureProtos.CloneSnapshotStateData cloneSnapshotMsg =
MasterProcedureProtos.CloneSnapshotStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(cloneSnapshotMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(cloneSnapshotMsg.getUserInfo()));
snapshot = cloneSnapshotMsg.getSnapshot();
hTableDescriptor = ProtobufUtil.convertToHTableDesc(cloneSnapshotMsg.getTableSchema());
if (cloneSnapshotMsg.getRegionInfoCount() == 0) {
@ -290,19 +285,6 @@ public class CloneSnapshotProcedure
getMonitorStatus();
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) {
return false;
}
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
}
/**
* Action before any real action of cloning from snapshot.
* @param env MasterProcedureEnv
@ -339,7 +321,7 @@ public class CloneSnapshotProcedure
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preCreateTableAction(hTableDescriptor, null, user);
cpHost.preCreateTableAction(hTableDescriptor, null, getUser());
}
}
@ -355,7 +337,7 @@ public class CloneSnapshotProcedure
if (cpHost != null) {
final HRegionInfo[] regions = (newRegions == null) ? null :
newRegions.toArray(new HRegionInfo[newRegions.size()]);
cpHost.postCompletedCreateTableAction(hTableDescriptor, regions, user);
cpHost.postCompletedCreateTableAction(hTableDescriptor, regions, getUser());
}
}

View File

@ -26,11 +26,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateNamespaceState;
@ -41,8 +39,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
*/
@InterfaceAudience.Private
public class CreateNamespaceProcedure
extends StateMachineProcedure<MasterProcedureEnv, CreateNamespaceState>
implements TableProcedureInterface {
extends AbstractStateMachineNamespaceProcedure<CreateNamespaceState> {
private static final Log LOG = LogFactory.getLog(CreateNamespaceProcedure.class);
private NamespaceDescriptor nsDescriptor;
@ -54,9 +51,9 @@ public class CreateNamespaceProcedure
public CreateNamespaceProcedure(final MasterProcedureEnv env,
final NamespaceDescriptor nsDescriptor) {
super(env);
this.nsDescriptor = nsDescriptor;
this.traceEnabled = null;
this.setOwner(env.getRequestUser().getUGI().getShortUserName());
}
@Override
@ -157,14 +154,6 @@ public class CreateNamespaceProcedure
nsDescriptor = ProtobufUtil.toNamespaceDescriptor(createNamespaceMsg.getNamespaceDescriptor());
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (Namespace=");
sb.append(nsDescriptor.getName());
sb.append(")");
}
private boolean isBootstrapNamespace() {
return nsDescriptor.equals(NamespaceDescriptor.DEFAULT_NAMESPACE) ||
nsDescriptor.equals(NamespaceDescriptor.SYSTEM_NAMESPACE);
@ -183,22 +172,13 @@ public class CreateNamespaceProcedure
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
}
@Override
public TableName getTableName() {
return TableName.NAMESPACE_TABLE_NAME;
}
@Override
public TableOperationType getTableOperationType() {
return TableOperationType.EDIT;
}
private String getNamespaceName() {
@Override
protected String getNamespaceName() {
return nsDescriptor.getName();
}

View File

@ -41,12 +41,10 @@ import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@ -56,20 +54,15 @@ import com.google.common.collect.Lists;
@InterfaceAudience.Private
public class CreateTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, CreateTableState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<CreateTableState> {
private static final Log LOG = LogFactory.getLog(CreateTableProcedure.class);
// used for compatibility with old clients
private final ProcedurePrepareLatch syncLatch;
private HTableDescriptor hTableDescriptor;
private List<HRegionInfo> newRegions;
private User user;
public CreateTableProcedure() {
// Required by the Procedure framework to create the procedure on replay
syncLatch = null;
super();
}
public CreateTableProcedure(final MasterProcedureEnv env,
@ -80,14 +73,9 @@ public class CreateTableProcedure
public CreateTableProcedure(final MasterProcedureEnv env,
final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions,
final ProcedurePrepareLatch syncLatch) {
super(env, syncLatch);
this.hTableDescriptor = hTableDescriptor;
this.newRegions = newRegions != null ? Lists.newArrayList(newRegions) : null;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
// used for compatibility with clients without procedures
// they need a sync TableExistsException
this.syncLatch = syncLatch;
}
@Override
@ -101,7 +89,7 @@ public class CreateTableProcedure
case CREATE_TABLE_PRE_OPERATION:
// Verify if we can create the table
boolean exists = !prepareCreate(env);
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
if (exists) {
assert isFailed() : "the delete should have an exception here";
@ -151,7 +139,7 @@ public class CreateTableProcedure
// We can fail if the table does exist or the descriptor is malformed.
// TODO: coprocessor rollback semantic is still undefined.
DeleteTableProcedure.deleteTableStates(env, getTableName());
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
return;
}
@ -194,21 +182,13 @@ public class CreateTableProcedure
return TableOperationType.CREATE;
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (table=");
sb.append(getTableName());
sb.append(")");
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.CreateTableStateData.Builder state =
MasterProcedureProtos.CreateTableStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableSchema(ProtobufUtil.convertToTableSchema(hTableDescriptor));
if (newRegions != null) {
for (HRegionInfo hri: newRegions) {
@ -224,7 +204,7 @@ public class CreateTableProcedure
MasterProcedureProtos.CreateTableStateData state =
MasterProcedureProtos.CreateTableStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(state.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(state.getUserInfo()));
hTableDescriptor = ProtobufUtil.convertToHTableDesc(state.getTableSchema());
if (state.getRegionInfoCount() == 0) {
newRegions = null;
@ -244,11 +224,6 @@ public class CreateTableProcedure
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
}
private boolean prepareCreate(final MasterProcedureEnv env) throws IOException {
final TableName tableName = getTableName();
if (MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
@ -278,7 +253,7 @@ public class CreateTableProcedure
if (cpHost != null) {
final HRegionInfo[] regions = newRegions == null ? null :
newRegions.toArray(new HRegionInfo[newRegions.size()]);
cpHost.preCreateTableAction(hTableDescriptor, regions, user);
cpHost.preCreateTableAction(hTableDescriptor, regions, getUser());
}
}
@ -288,7 +263,7 @@ public class CreateTableProcedure
if (cpHost != null) {
final HRegionInfo[] regions = (newRegions == null) ? null :
newRegions.toArray(new HRegionInfo[newRegions.size()]);
cpHost.postCompletedCreateTableAction(hTableDescriptor, regions, user);
cpHost.postCompletedCreateTableAction(hTableDescriptor, regions, getUser());
}
}

View File

@ -33,11 +33,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteColumnFamilyState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@ -46,27 +44,22 @@ import org.apache.hadoop.hbase.util.Bytes;
*/
@InterfaceAudience.Private
public class DeleteColumnFamilyProcedure
extends StateMachineProcedure<MasterProcedureEnv, DeleteColumnFamilyState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<DeleteColumnFamilyState> {
private static final Log LOG = LogFactory.getLog(DeleteColumnFamilyProcedure.class);
private HTableDescriptor unmodifiedHTableDescriptor;
private TableName tableName;
private byte [] familyName;
private boolean hasMob;
private User user;
private List<HRegionInfo> regionInfoList;
private Boolean traceEnabled;
// used for compatibility with old clients, until 2.0 the client had a sync behavior
private final ProcedurePrepareLatch syncLatch;
public DeleteColumnFamilyProcedure() {
super();
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
this.syncLatch = null;
}
public DeleteColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
@ -76,14 +69,12 @@ public class DeleteColumnFamilyProcedure
public DeleteColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
final byte[] familyName, final ProcedurePrepareLatch latch) {
super(env, latch);
this.tableName = tableName;
this.familyName = familyName;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
this.unmodifiedHTableDescriptor = null;
this.regionInfoList = null;
this.traceEnabled = null;
this.syncLatch = latch;
}
@Override
@ -160,7 +151,7 @@ public class DeleteColumnFamilyProcedure
@Override
protected void completionCleanup(final MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
}
@Override
@ -178,24 +169,13 @@ public class DeleteColumnFamilyProcedure
return DeleteColumnFamilyState.DELETE_COLUMN_FAMILY_PREPARE;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.DeleteColumnFamilyStateData.Builder deleteCFMsg =
MasterProcedureProtos.DeleteColumnFamilyStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setColumnfamilyName(ByteStringer.wrap(familyName));
if (unmodifiedHTableDescriptor != null) {
@ -211,7 +191,7 @@ public class DeleteColumnFamilyProcedure
super.deserializeStateData(stream);
MasterProcedureProtos.DeleteColumnFamilyStateData deleteCFMsg =
MasterProcedureProtos.DeleteColumnFamilyStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(deleteCFMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(deleteCFMsg.getUserInfo()));
tableName = ProtobufUtil.toTableName(deleteCFMsg.getTableName());
familyName = deleteCFMsg.getColumnfamilyName().toByteArray();
@ -251,7 +231,7 @@ public class DeleteColumnFamilyProcedure
*/
private void prepareDelete(final MasterProcedureEnv env) throws IOException {
// Checks whether the table is allowed to be modified.
MasterDDLOperationHelper.checkTableModifiable(env, tableName);
checkTableModifiable(env);
// In order to update the descriptor, we need to retrieve the old descriptor for comparison.
unmodifiedHTableDescriptor = env.getMasterServices().getTableDescriptors().get(tableName);
@ -384,10 +364,10 @@ public class DeleteColumnFamilyProcedure
if (cpHost != null) {
switch (state) {
case DELETE_COLUMN_FAMILY_PRE_OPERATION:
cpHost.preDeleteColumnFamilyAction(tableName, familyName, user);
cpHost.preDeleteColumnFamilyAction(tableName, familyName, getUser());
break;
case DELETE_COLUMN_FAMILY_POST_OPERATION:
cpHost.postCompletedDeleteColumnFamilyAction(tableName, familyName, user);
cpHost.postCompletedDeleteColumnFamilyAction(tableName, familyName, getUser());
break;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);

View File

@ -31,12 +31,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteNamespaceState;
@ -47,8 +45,7 @@ import org.apache.hadoop.hbase.util.FSUtils;
*/
@InterfaceAudience.Private
public class DeleteNamespaceProcedure
extends StateMachineProcedure<MasterProcedureEnv, DeleteNamespaceState>
implements TableProcedureInterface {
extends AbstractStateMachineNamespaceProcedure<DeleteNamespaceState> {
private static final Log LOG = LogFactory.getLog(DeleteNamespaceProcedure.class);
private NamespaceDescriptor nsDescriptor;
@ -61,10 +58,10 @@ public class DeleteNamespaceProcedure
}
public DeleteNamespaceProcedure(final MasterProcedureEnv env, final String namespaceName) {
super(env);
this.namespaceName = namespaceName;
this.nsDescriptor = null;
this.traceEnabled = null;
this.setOwner(env.getRequestUser().getUGI().getShortUserName());
}
@Override
@ -175,36 +172,13 @@ public class DeleteNamespaceProcedure
}
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (Namespace=");
sb.append(namespaceName);
sb.append(")");
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
}
@Override
public TableName getTableName() {
return TableName.NAMESPACE_TABLE_NAME;
}
@Override
public TableOperationType getTableOperationType() {
return TableOperationType.EDIT;
}
private String getNamespaceName() {
@Override
protected String getNamespaceName() {
return namespaceName;
}

View File

@ -49,31 +49,24 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
@InterfaceAudience.Private
public class DeleteTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, DeleteTableState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<DeleteTableState> {
private static final Log LOG = LogFactory.getLog(DeleteTableProcedure.class);
private List<HRegionInfo> regions;
private User user;
private TableName tableName;
// used for compatibility with old clients
private final ProcedurePrepareLatch syncLatch;
public DeleteTableProcedure() {
// Required by the Procedure framework to create the procedure on replay
syncLatch = null;
super();
}
public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName) {
@ -82,13 +75,8 @@ public class DeleteTableProcedure
public DeleteTableProcedure(final MasterProcedureEnv env, final TableName tableName,
final ProcedurePrepareLatch syncLatch) {
super(env, syncLatch);
this.tableName = tableName;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
// used for compatibility with clients without procedures
// they need a sync TableNotFoundException, TableNotDisabledException, ...
this.syncLatch = syncLatch;
}
@Override
@ -102,7 +90,7 @@ public class DeleteTableProcedure
case DELETE_TABLE_PRE_OPERATION:
// Verify if we can delete the table
boolean deletable = prepareDelete(env);
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
if (!deletable) {
assert isFailed() : "the delete should have an exception here";
return Flow.NO_MORE_STATE;
@ -163,7 +151,7 @@ public class DeleteTableProcedure
// nothing to rollback, pre-delete is just table-state checks.
// We can fail if the table does not exist or is not disabled.
// TODO: coprocessor rollback semantic is still undefined.
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
return;
}
@ -206,32 +194,13 @@ public class DeleteTableProcedure
return TableOperationType.DELETE;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (table=");
sb.append(getTableName());
sb.append(")");
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.DeleteTableStateData.Builder state =
MasterProcedureProtos.DeleteTableStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableName(ProtobufUtil.toProtoTableName(tableName));
if (regions != null) {
for (HRegionInfo hri: regions) {
@ -247,7 +216,7 @@ public class DeleteTableProcedure
MasterProcedureProtos.DeleteTableStateData state =
MasterProcedureProtos.DeleteTableStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(state.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(state.getUserInfo()));
tableName = ProtobufUtil.toTableName(state.getTableName());
if (state.getRegionInfoCount() == 0) {
regions = null;
@ -274,7 +243,7 @@ public class DeleteTableProcedure
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
final TableName tableName = this.tableName;
cpHost.preDeleteTableAction(tableName, user);
cpHost.preDeleteTableAction(tableName, getUser());
}
return true;
}
@ -286,7 +255,7 @@ public class DeleteTableProcedure
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
final TableName tableName = this.tableName;
cpHost.postCompletedDeleteTableAction(tableName, user);
cpHost.postCompletedDeleteTableAction(tableName, getUser());
}
}

View File

@ -41,27 +41,20 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DisableTableState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.htrace.Trace;
@InterfaceAudience.Private
public class DisableTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, DisableTableState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<DisableTableState> {
private static final Log LOG = LogFactory.getLog(DisableTableProcedure.class);
// This is for back compatible with 1.0 asynchronized operations.
private final ProcedurePrepareLatch syncLatch;
private TableName tableName;
private boolean skipTableStateCheck;
private User user;
private Boolean traceEnabled = null;
@ -72,7 +65,7 @@ public class DisableTableProcedure
}
public DisableTableProcedure() {
syncLatch = null;
super();
}
/**
@ -94,20 +87,9 @@ public class DisableTableProcedure
*/
public DisableTableProcedure(final MasterProcedureEnv env, final TableName tableName,
final boolean skipTableStateCheck, final ProcedurePrepareLatch syncLatch) {
super(env, syncLatch);
this.tableName = tableName;
this.skipTableStateCheck = skipTableStateCheck;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
// Compatible with 1.0: We use latch to make sure that this procedure implementation is
// compatible with 1.0 asynchronized operations. We need to lock the table and check
// whether the Disable operation could be performed (table exists and online; table state
// is ENABLED). Once it is done, we are good to release the latch and the client can
// start asynchronously wait for the operation.
//
// Note: the member syncLatch could be null if we are in failover or recovery scenario.
// This is ok for backward compatible, as 1.0 client would not able to peek at procedure.
this.syncLatch = syncLatch;
}
@Override
@ -173,7 +155,7 @@ public class DisableTableProcedure
case DISABLE_TABLE_PRE_OPERATION:
return;
case DISABLE_TABLE_PREPARE:
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
return;
default:
break;
@ -209,24 +191,13 @@ public class DisableTableProcedure
return DisableTableState.DISABLE_TABLE_PREPARE;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.DisableTableStateData.Builder disableTableMsg =
MasterProcedureProtos.DisableTableStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setSkipTableStateCheck(skipTableStateCheck);
@ -239,19 +210,11 @@ public class DisableTableProcedure
MasterProcedureProtos.DisableTableStateData disableTableMsg =
MasterProcedureProtos.DisableTableStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(disableTableMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(disableTableMsg.getUserInfo()));
tableName = ProtobufUtil.toTableName(disableTableMsg.getTableName());
skipTableStateCheck = disableTableMsg.getSkipTableStateCheck();
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (table=");
sb.append(tableName);
sb.append(")");
}
@Override
public TableName getTableName() {
return tableName;
@ -297,7 +260,7 @@ public class DisableTableProcedure
}
// We are done the check. Future actions in this procedure could be done asynchronously.
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
return canTableBeDisabled;
}
@ -457,10 +420,10 @@ public class DisableTableProcedure
if (cpHost != null) {
switch (state) {
case DISABLE_TABLE_PRE_OPERATION:
cpHost.preDisableTableAction(tableName, user);
cpHost.preDisableTableAction(tableName, getUser());
break;
case DISABLE_TABLE_POST_OPERATION:
cpHost.postCompletedDisableTableAction(tableName, user);
cpHost.postCompletedDisableTableAction(tableName, getUser());
break;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);

View File

@ -42,11 +42,9 @@ import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DispatchMergingRegionsState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
@ -54,8 +52,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
*/
@InterfaceAudience.Private
public class DispatchMergingRegionsProcedure
extends StateMachineProcedure<MasterProcedureEnv, DispatchMergingRegionsState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<DispatchMergingRegionsState> {
private static final Log LOG = LogFactory.getLog(DispatchMergingRegionsProcedure.class);
private final AtomicBoolean aborted = new AtomicBoolean(false);
@ -66,7 +63,6 @@ implements TableProcedureInterface {
private String regionsToMergeListFullName;
private String regionsToMergeListEncodedName;
private User user;
private TableName tableName;
private HRegionInfo [] regionsToMerge;
private boolean forcible;
@ -85,6 +81,7 @@ implements TableProcedureInterface {
final TableName tableName,
final HRegionInfo [] regionsToMerge,
final boolean forcible) {
super(env);
this.traceEnabled = isTraceEnabled();
this.assignmentManager = getAssignmentManager(env);
this.tableName = tableName;
@ -94,9 +91,6 @@ implements TableProcedureInterface {
this.regionsToMerge = regionsToMerge;
this.forcible = forcible;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
this.timeout = -1;
this.regionsToMergeListFullName = getRegionsToMergeListFullNameString();
this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString();
@ -220,7 +214,7 @@ implements TableProcedureInterface {
MasterProcedureProtos.DispatchMergingRegionsStateData.Builder dispatchMergingRegionsMsg =
MasterProcedureProtos.DispatchMergingRegionsStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setForcible(forcible);
for (HRegionInfo hri: regionsToMerge) {
@ -235,7 +229,7 @@ implements TableProcedureInterface {
MasterProcedureProtos.DispatchMergingRegionsStateData dispatchMergingRegionsMsg =
MasterProcedureProtos.DispatchMergingRegionsStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo()));
tableName = ProtobufUtil.toTableName(dispatchMergingRegionsMsg.getTableName());
assert(dispatchMergingRegionsMsg.getRegionInfoCount() == 2);
@ -419,7 +413,7 @@ implements TableProcedureInterface {
regionsToMerge[0],
regionsToMerge[1],
forcible,
user);
getUser());
LOG.info("Sent merge to server " + getServerName(env) + " for region " +
getRegionsToMergeListEncodedNameString() + ", focible=" + forcible);
return;

View File

@ -45,31 +45,24 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.EnableTableState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
@InterfaceAudience.Private
public class EnableTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, EnableTableState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<EnableTableState> {
private static final Log LOG = LogFactory.getLog(EnableTableProcedure.class);
// This is for back compatible with 1.0 asynchronized operations.
private final ProcedurePrepareLatch syncLatch;
private TableName tableName;
private boolean skipTableStateCheck;
private User user;
private Boolean traceEnabled = null;
public EnableTableProcedure() {
syncLatch = null;
super();
}
/**
@ -91,20 +84,9 @@ public class EnableTableProcedure
*/
public EnableTableProcedure(final MasterProcedureEnv env, final TableName tableName,
final boolean skipTableStateCheck, final ProcedurePrepareLatch syncLatch) {
super(env, syncLatch);
this.tableName = tableName;
this.skipTableStateCheck = skipTableStateCheck;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
// Compatible with 1.0: We use latch to make sure that this procedure implementation is
// compatible with 1.0 asynchronized operations. We need to lock the table and check
// whether the Enable operation could be performed (table exists and offline; table state
// is DISABLED). Once it is done, we are good to release the latch and the client can
// start asynchronously wait for the operation.
//
// Note: the member syncLatch could be null if we are in failover or recovery scenario.
// This is ok for backward compatible, as 1.0 client would not able to peek at procedure.
this.syncLatch = syncLatch;
}
@Override
@ -166,7 +148,7 @@ public class EnableTableProcedure
case ENABLE_TABLE_PRE_OPERATION:
return;
case ENABLE_TABLE_PREPARE:
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
return;
default:
break;
@ -202,24 +184,13 @@ public class EnableTableProcedure
return EnableTableState.ENABLE_TABLE_PREPARE;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.EnableTableStateData.Builder enableTableMsg =
MasterProcedureProtos.EnableTableStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setSkipTableStateCheck(skipTableStateCheck);
@ -232,19 +203,11 @@ public class EnableTableProcedure
MasterProcedureProtos.EnableTableStateData enableTableMsg =
MasterProcedureProtos.EnableTableStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(enableTableMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(enableTableMsg.getUserInfo()));
tableName = ProtobufUtil.toTableName(enableTableMsg.getTableName());
skipTableStateCheck = enableTableMsg.getSkipTableStateCheck();
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (table=");
sb.append(tableName);
sb.append(")");
}
@Override
public TableName getTableName() {
return tableName;
@ -291,7 +254,7 @@ public class EnableTableProcedure
}
// We are done the check. Future actions in this procedure could be done asynchronously.
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
return canTableBeEnabled;
}
@ -533,10 +496,10 @@ public class EnableTableProcedure
if (cpHost != null) {
switch (state) {
case ENABLE_TABLE_PRE_OPERATION:
cpHost.preEnableTableAction(getTableName(), user);
cpHost.preEnableTableAction(getTableName(), getUser());
break;
case ENABLE_TABLE_POST_OPERATION:
cpHost.postCompletedEnableTableAction(getTableName(), user);
cpHost.postCompletedEnableTableAction(getTableName(), getUser());
break;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);

View File

@ -29,11 +29,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -55,20 +53,6 @@ public final class MasterDDLOperationHelper {
private MasterDDLOperationHelper() {}
/**
* Check whether a table is modifiable - exists and either offline or online with config set
* @param env MasterProcedureEnv
* @param tableName name of the table
* @throws IOException
*/
public static void checkTableModifiable(final MasterProcedureEnv env, final TableName tableName)
throws IOException {
// Checks whether the table exists
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
throw new TableNotFoundException(tableName);
}
}
/**
* Remove the column family from the file system
**/

View File

@ -34,35 +34,28 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyColumnFamilyState;
import org.apache.hadoop.hbase.security.User;
/**
* The procedure to modify a column family from an existing table.
*/
@InterfaceAudience.Private
public class ModifyColumnFamilyProcedure
extends StateMachineProcedure<MasterProcedureEnv, ModifyColumnFamilyState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<ModifyColumnFamilyState> {
private static final Log LOG = LogFactory.getLog(ModifyColumnFamilyProcedure.class);
private TableName tableName;
private HTableDescriptor unmodifiedHTableDescriptor;
private HColumnDescriptor cfDescriptor;
private User user;
private Boolean traceEnabled;
// used for compatibility with old clients, until 2.0 the client had a sync behavior
private final ProcedurePrepareLatch syncLatch;
public ModifyColumnFamilyProcedure() {
super();
this.unmodifiedHTableDescriptor = null;
this.traceEnabled = null;
this.syncLatch = null;
}
public ModifyColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
@ -72,13 +65,11 @@ public class ModifyColumnFamilyProcedure
public ModifyColumnFamilyProcedure(final MasterProcedureEnv env, final TableName tableName,
final HColumnDescriptor cfDescriptor, final ProcedurePrepareLatch latch) {
super(env, latch);
this.tableName = tableName;
this.cfDescriptor = cfDescriptor;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
this.unmodifiedHTableDescriptor = null;
this.traceEnabled = null;
this.syncLatch = latch;
}
@Override
@ -150,7 +141,7 @@ public class ModifyColumnFamilyProcedure
@Override
protected void completionCleanup(final MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
}
@Override
@ -168,24 +159,13 @@ public class ModifyColumnFamilyProcedure
return ModifyColumnFamilyState.MODIFY_COLUMN_FAMILY_PREPARE;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, tableName);
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, tableName);
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.ModifyColumnFamilyStateData.Builder modifyCFMsg =
MasterProcedureProtos.ModifyColumnFamilyStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setColumnfamilySchema(ProtobufUtil.convertToColumnFamilySchema(cfDescriptor));
if (unmodifiedHTableDescriptor != null) {
@ -202,7 +182,7 @@ public class ModifyColumnFamilyProcedure
MasterProcedureProtos.ModifyColumnFamilyStateData modifyCFMsg =
MasterProcedureProtos.ModifyColumnFamilyStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(modifyCFMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(modifyCFMsg.getUserInfo()));
tableName = ProtobufUtil.toTableName(modifyCFMsg.getTableName());
cfDescriptor = ProtobufUtil.convertToHColumnDesc(modifyCFMsg.getColumnfamilySchema());
if (modifyCFMsg.hasUnmodifiedTableSchema()) {
@ -241,7 +221,7 @@ public class ModifyColumnFamilyProcedure
*/
private void prepareModify(final MasterProcedureEnv env) throws IOException {
// Checks whether the table is allowed to be modified.
MasterDDLOperationHelper.checkTableModifiable(env, tableName);
checkTableModifiable(env);
unmodifiedHTableDescriptor = env.getMasterServices().getTableDescriptors().get(tableName);
if (unmodifiedHTableDescriptor == null) {
@ -350,10 +330,10 @@ public class ModifyColumnFamilyProcedure
if (cpHost != null) {
switch (state) {
case MODIFY_COLUMN_FAMILY_PRE_OPERATION:
cpHost.preModifyColumnFamilyAction(tableName, cfDescriptor, user);
cpHost.preModifyColumnFamilyAction(tableName, cfDescriptor, getUser());
break;
case MODIFY_COLUMN_FAMILY_POST_OPERATION:
cpHost.postCompletedModifyColumnFamilyAction(tableName, cfDescriptor, user);
cpHost.postCompletedModifyColumnFamilyAction(tableName, cfDescriptor, getUser());
break;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);

View File

@ -26,10 +26,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.TableNamespaceManager;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyNamespaceState;
@ -39,8 +37,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyNa
*/
@InterfaceAudience.Private
public class ModifyNamespaceProcedure
extends StateMachineProcedure<MasterProcedureEnv, ModifyNamespaceState>
implements TableProcedureInterface {
extends AbstractStateMachineNamespaceProcedure<ModifyNamespaceState> {
private static final Log LOG = LogFactory.getLog(ModifyNamespaceProcedure.class);
private NamespaceDescriptor oldNsDescriptor;
@ -54,10 +51,10 @@ public class ModifyNamespaceProcedure
public ModifyNamespaceProcedure(final MasterProcedureEnv env,
final NamespaceDescriptor newNsDescriptor) {
super(env);
this.oldNsDescriptor = null;
this.newNsDescriptor = newNsDescriptor;
this.traceEnabled = null;
this.setOwner(env.getRequestUser().getUGI().getShortUserName());
}
@Override
@ -160,36 +157,13 @@ public class ModifyNamespaceProcedure
}
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (Namespace=");
sb.append(newNsDescriptor.getName());
sb.append(")");
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
}
@Override
public TableName getTableName() {
return TableName.NAMESPACE_TABLE_NAME;
}
@Override
public TableOperationType getTableOperationType() {
return TableOperationType.EDIT;
}
private String getNamespaceName() {
@Override
protected String getNamespaceName() {
return newNsDescriptor.getName();
}

View File

@ -44,33 +44,26 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyTableState;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@InterfaceAudience.Private
public class ModifyTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, ModifyTableState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<ModifyTableState> {
private static final Log LOG = LogFactory.getLog(ModifyTableProcedure.class);
private HTableDescriptor unmodifiedHTableDescriptor = null;
private HTableDescriptor modifiedHTableDescriptor;
private User user;
private boolean deleteColumnFamilyInModify;
private List<HRegionInfo> regionInfoList;
private Boolean traceEnabled = null;
// used for compatibility with old clients, until 2.0 the client had a sync behavior
private final ProcedurePrepareLatch syncLatch;
public ModifyTableProcedure() {
super();
initilize();
this.syncLatch = null;
}
public ModifyTableProcedure(final MasterProcedureEnv env, final HTableDescriptor htd) {
@ -79,11 +72,9 @@ public class ModifyTableProcedure
public ModifyTableProcedure(final MasterProcedureEnv env, final HTableDescriptor htd,
final ProcedurePrepareLatch latch) {
super(env, latch);
initilize();
this.modifiedHTableDescriptor = htd;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
this.syncLatch = latch;
}
private void initilize() {
@ -174,7 +165,7 @@ public class ModifyTableProcedure
@Override
protected void completionCleanup(final MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
}
@Override
@ -192,24 +183,13 @@ public class ModifyTableProcedure
return ModifyTableState.MODIFY_TABLE_PREPARE;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
}
@Override
public void serializeStateData(final OutputStream stream) throws IOException {
super.serializeStateData(stream);
MasterProcedureProtos.ModifyTableStateData.Builder modifyTableMsg =
MasterProcedureProtos.ModifyTableStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setModifiedTableSchema(ProtobufUtil.convertToTableSchema(modifiedHTableDescriptor))
.setDeleteColumnFamilyInModify(deleteColumnFamilyInModify);
@ -227,7 +207,7 @@ public class ModifyTableProcedure
MasterProcedureProtos.ModifyTableStateData modifyTableMsg =
MasterProcedureProtos.ModifyTableStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(modifyTableMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(modifyTableMsg.getUserInfo()));
modifiedHTableDescriptor = ProtobufUtil.convertToHTableDesc(modifyTableMsg.getModifiedTableSchema());
deleteColumnFamilyInModify = modifyTableMsg.getDeleteColumnFamilyInModify();
@ -237,14 +217,6 @@ public class ModifyTableProcedure
}
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
sb.append(" (table=");
sb.append(getTableName());
sb.append(")");
}
@Override
public TableName getTableName() {
return modifiedHTableDescriptor.getTableName();
@ -447,10 +419,10 @@ public class ModifyTableProcedure
if (cpHost != null) {
switch (state) {
case MODIFY_TABLE_PRE_OPERATION:
cpHost.preModifyTableAction(getTableName(), modifiedHTableDescriptor, user);
cpHost.preModifyTableAction(getTableName(), modifiedHTableDescriptor, getUser());
break;
case MODIFY_TABLE_POST_OPERATION:
cpHost.postCompletedModifyTableAction(getTableName(), modifiedHTableDescriptor, user);
cpHost.postCompletedModifyTableAction(getTableName(), modifiedHTableDescriptor,getUser());
break;
default:
throw new UnsupportedOperationException(this + " unhandled state=" + state);

View File

@ -46,13 +46,11 @@ import org.apache.hadoop.hbase.master.MetricsSnapshot;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.RestoreSnapshotState;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@ -61,8 +59,7 @@ import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private
public class RestoreSnapshotProcedure
extends StateMachineProcedure<MasterProcedureEnv, RestoreSnapshotState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<RestoreSnapshotState> {
private static final Log LOG = LogFactory.getLog(RestoreSnapshotProcedure.class);
private HTableDescriptor modifiedHTableDescriptor;
@ -72,7 +69,6 @@ public class RestoreSnapshotProcedure
private Map<String, Pair<String, String>> parentsToChildrenPairMap =
new HashMap<String, Pair<String, String>>();
private User user;
private SnapshotDescription snapshot;
// Monitor
@ -97,13 +93,11 @@ public class RestoreSnapshotProcedure
final MasterProcedureEnv env,
final HTableDescriptor hTableDescriptor,
final SnapshotDescription snapshot) {
super(env);
// This is the new schema we are going to write out as this modification.
this.modifiedHTableDescriptor = hTableDescriptor;
// Snapshot information
this.snapshot = snapshot;
// User and owner information
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
// Monitor
getMonitorStatus();
@ -231,7 +225,7 @@ public class RestoreSnapshotProcedure
MasterProcedureProtos.RestoreSnapshotStateData.Builder restoreSnapshotMsg =
MasterProcedureProtos.RestoreSnapshotStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setSnapshot(this.snapshot)
.setModifiedTableSchema(ProtobufUtil.convertToTableSchema(modifiedHTableDescriptor));
@ -273,7 +267,7 @@ public class RestoreSnapshotProcedure
MasterProcedureProtos.RestoreSnapshotStateData restoreSnapshotMsg =
MasterProcedureProtos.RestoreSnapshotStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(restoreSnapshotMsg.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(restoreSnapshotMsg.getUserInfo()));
snapshot = restoreSnapshotMsg.getSnapshot();
modifiedHTableDescriptor =
ProtobufUtil.convertToHTableDesc(restoreSnapshotMsg.getModifiedTableSchema());
@ -316,19 +310,6 @@ public class RestoreSnapshotProcedure
}
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) {
return false;
}
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
}
/**
* Action before any real action of restoring from snapshot.
* @param env MasterProcedureEnv

View File

@ -40,28 +40,21 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.TruncateTableState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
@InterfaceAudience.Private
public class TruncateTableProcedure
extends StateMachineProcedure<MasterProcedureEnv, TruncateTableState>
implements TableProcedureInterface {
extends AbstractStateMachineTableProcedure<TruncateTableState> {
private static final Log LOG = LogFactory.getLog(TruncateTableProcedure.class);
private boolean preserveSplits;
private List<HRegionInfo> regions;
private User user;
private HTableDescriptor hTableDescriptor;
private TableName tableName;
// used for compatibility with old clients, until 2.0 the client had a sync behavior
private final ProcedurePrepareLatch syncLatch;
public TruncateTableProcedure() {
// Required by the Procedure framework to create the procedure on replay
syncLatch = null;
super();
}
public TruncateTableProcedure(final MasterProcedureEnv env, final TableName tableName,
@ -71,11 +64,9 @@ public class TruncateTableProcedure
public TruncateTableProcedure(final MasterProcedureEnv env, final TableName tableName,
boolean preserveSplits, ProcedurePrepareLatch latch) {
super(env, latch);
this.tableName = tableName;
this.preserveSplits = preserveSplits;
this.user = env.getRequestUser();
this.setOwner(this.user.getShortName());
this.syncLatch = latch;
}
@Override
@ -166,7 +157,7 @@ public class TruncateTableProcedure
@Override
protected void completionCleanup(final MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(syncLatch, this);
releaseSyncLatch();
}
@Override
@ -210,17 +201,6 @@ public class TruncateTableProcedure
return false;
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
}
@Override
public void toStringClassDetails(StringBuilder sb) {
sb.append(getClass().getSimpleName());
@ -237,7 +217,7 @@ public class TruncateTableProcedure
MasterProcedureProtos.TruncateTableStateData.Builder state =
MasterProcedureProtos.TruncateTableStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(this.user))
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setPreserveSplits(preserveSplits);
if (hTableDescriptor != null) {
state.setTableSchema(ProtobufUtil.convertToTableSchema(hTableDescriptor));
@ -258,7 +238,7 @@ public class TruncateTableProcedure
MasterProcedureProtos.TruncateTableStateData state =
MasterProcedureProtos.TruncateTableStateData.parseDelimitedFrom(stream);
user = MasterProcedureUtil.toUserInfo(state.getUserInfo());
setUser(MasterProcedureUtil.toUserInfo(state.getUserInfo()));
if (state.hasTableSchema()) {
hTableDescriptor = ProtobufUtil.convertToHTableDesc(state.getTableSchema());
tableName = hTableDescriptor.getTableName();
@ -291,7 +271,7 @@ public class TruncateTableProcedure
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
final TableName tableName = getTableName();
cpHost.preTruncateTableAction(tableName, user);
cpHost.preTruncateTableAction(tableName, getUser());
}
return true;
}
@ -301,7 +281,7 @@ public class TruncateTableProcedure
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) {
final TableName tableName = getTableName();
cpHost.postCompletedTruncateTableAction(tableName, user);
cpHost.postCompletedTruncateTableAction(tableName, getUser());
}
}
}

View File

@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import com.google.common.collect.Lists;
import java.io.IOException;
@ -61,10 +60,12 @@ import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -85,8 +86,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import com.google.protobuf.RpcController;
import static org.junit.Assert.fail;
/**
* This class is for testing HBaseConnectionManager features
@ -150,6 +150,12 @@ public class TestHCM {
final Get get, final List<Cell> results) throws IOException {
Threads.sleep(SLEEP_TIME);
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
}
public static class SleepWriteCoprocessor extends BaseRegionObserver {
@ -187,6 +193,8 @@ public class TestHCM {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
// simulate queue blocking in testDropTimeoutRequest
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
// Used in testServerBusyException
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
TEST_UTIL.startMiniCluster(2);
}
@ -1338,4 +1346,109 @@ public class TestHCM {
table.close();
connection.close();
}
private class TestPutThread extends Thread {
Table table;
int getServerBusyException = 0;
TestPutThread(Table table){
this.table = table;
}
@Override
public void run() {
try {
Put p = new Put(ROW);
p.addColumn(FAM_NAM, new byte[]{0}, new byte[]{0});
table.put(p);
} catch (RetriesExhaustedWithDetailsException e) {
if (e.exceptions.get(0).getCause() instanceof ServerTooBusyException) {
getServerBusyException = 1;
}
} catch (IOException ignore) {
}
}
}
private class TestGetThread extends Thread {
Table table;
int getServerBusyException = 0;
TestGetThread(Table table){
this.table = table;
}
@Override
public void run() {
try {
Get g = new Get(ROW);
g.addColumn(FAM_NAM, new byte[]{0});
table.get(g);
} catch (RetriesExhaustedException e) {
if (e.getCause().getCause() instanceof ServerTooBusyException) {
getServerBusyException = 1;
}
} catch (IOException ignore) {
}
}
}
@Test()
public void testServerBusyException() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testServerBusy");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
TestGetThread tg1 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg2 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg3 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg4 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestGetThread tg5 =
new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
tg1.start();
tg2.start();
tg3.start();
tg4.start();
tg5.start();
tg1.join();
tg2.join();
tg3.join();
tg4.join();
tg5.join();
assertEquals(2,
tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException
+ tg4.getServerBusyException + tg5.getServerBusyException);
// Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
// RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
TestPutThread tp1 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp2 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp3 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp4 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
TestPutThread tp5 =
new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
tp1.start();
tp2.start();
tp3.start();
tp4.start();
tp5.start();
tp1.join();
tp2.join();
tp3.join();
tp4.join();
tp5.join();
assertEquals(2,
tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException
+ tp4.getServerBusyException + tp5.getServerBusyException);
}
}

View File

@ -21,6 +21,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -55,6 +58,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.log4j.Level;
import org.junit.After;
@ -64,9 +68,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Here we test to make sure that scans return the expected Results when the server is sending the
* Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
@ -113,12 +114,10 @@ public class TestScannerHeartbeatMessages {
// In this test, we sleep after reading each row. So we should make sure after we get some number
// of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
// So set this to 200, we will get 3 rows and reach time limit at the start of 4th row, then sleep
// for the 4th time. Total time is 800 ms so we will not timeout.
private static int DEFAULT_ROW_SLEEP_TIME = 200;
private static int DEFAULT_ROW_SLEEP_TIME = 300;
// Similar with row sleep time.
private static int DEFAULT_CF_SLEEP_TIME = 200;
private static int DEFAULT_CF_SLEEP_TIME = 300;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -178,7 +177,6 @@ public class TestScannerHeartbeatMessages {
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.deleteTable(TABLE_NAME);
TEST_UTIL.shutdownMiniCluster();
}
@ -192,19 +190,6 @@ public class TestScannerHeartbeatMessages {
disableSleeping();
}
/**
* Test a variety of scan configurations to ensure that they return the expected Results when
* heartbeat messages are necessary. These tests are accumulated under one test case to ensure
* that they don't run in parallel. If the tests ran in parallel, they may conflict with each
* other due to changing static variables
*/
@Test
public void testScannerHeartbeatMessages() throws Exception {
testImportanceOfHeartbeats(testHeartbeatBetweenRows());
testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
testImportanceOfHeartbeats(testHeartbeatWithSparseFilter());
}
/**
* Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
* when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
@ -212,7 +197,7 @@ public class TestScannerHeartbeatMessages {
* @param testCallable
* @throws InterruptedException
*/
public void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
HeartbeatRPCServices.heartbeatsEnabled = true;
try {
@ -239,8 +224,9 @@ public class TestScannerHeartbeatMessages {
* fetched.
* @throws Exception
*/
public Callable<Void> testHeartbeatBetweenRows() throws Exception {
return new Callable<Void>() {
@Test
public void testHeartbeatBetweenRows() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() {
@Override
public Void call() throws Exception {
@ -253,15 +239,16 @@ public class TestScannerHeartbeatMessages {
testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
return null;
}
};
});
}
/**
* Test the case that the time limit for scans is reached in between column families
* @throws Exception
*/
public Callable<Void> testHeartbeatBetweenColumnFamilies() throws Exception {
return new Callable<Void>() {
@Test
public void testHeartbeatBetweenColumnFamilies() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() {
@Override
public Void call() throws Exception {
// Configure the scan so that it can read the entire table in a single RPC. We want to test
@ -278,7 +265,7 @@ public class TestScannerHeartbeatMessages {
testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
return null;
}
};
});
}
public static class SparseFilter extends FilterBase {
@ -286,13 +273,12 @@ public class TestScannerHeartbeatMessages {
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
try {
Thread.sleep(CLIENT_TIMEOUT/2 + 10);
Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ?
ReturnCode.INCLUDE :
ReturnCode.SKIP;
return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
: ReturnCode.SKIP;
}
public static Filter parseFrom(final byte[] pbBytes) {
@ -304,8 +290,9 @@ public class TestScannerHeartbeatMessages {
* Test the case that there is a filter which filters most of cells
* @throws Exception
*/
public Callable<Void> testHeartbeatWithSparseFilter() throws Exception {
return new Callable<Void>() {
@Test
public void testHeartbeatWithSparseFilter() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() {
@Override
public Void call() throws Exception {
Scan scan = new Scan();
@ -335,7 +322,7 @@ public class TestScannerHeartbeatMessages {
return null;
}
};
});
}
/**
@ -348,7 +335,7 @@ public class TestScannerHeartbeatMessages {
* that column family are fetched
* @throws Exception
*/
public void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
disableSleeping();
final ResultScanner scanner = TABLE.getScanner(scan);
@ -423,7 +410,7 @@ public class TestScannerHeartbeatMessages {
* Custom RSRpcServices instance that allows heartbeat support to be toggled
*/
private static class HeartbeatRPCServices extends RSRpcServices {
private static boolean heartbeatsEnabled = true;
private static volatile boolean heartbeatsEnabled = true;
public HeartbeatRPCServices(HRegionServer rs) throws IOException {
super(rs);
@ -445,17 +432,17 @@ public class TestScannerHeartbeatMessages {
*/
private static class HeartbeatHRegion extends HRegion {
// Row sleeps occur AFTER each row worth of cells is retrieved.
private static int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
private static boolean sleepBetweenRows = false;
private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
private static volatile boolean sleepBetweenRows = false;
// The sleep for column families can be initiated before or after we fetch the cells for the
// column family. If the sleep occurs BEFORE then the time limits will be reached inside
// StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
// limit will be reached inside RegionScanner after all the cells for a column family have been
// retrieved.
private static boolean sleepBeforeColumnFamily = false;
private static int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
private static boolean sleepBetweenColumnFamilies = false;
private static volatile boolean sleepBeforeColumnFamily = false;
private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
private static volatile boolean sleepBetweenColumnFamilies = false;
public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
HRegionInfo regionInfo, HTableDescriptor htd, RegionServerServices rsServices) {
@ -468,20 +455,14 @@ public class TestScannerHeartbeatMessages {
}
private static void columnFamilySleep() {
if (HeartbeatHRegion.sleepBetweenColumnFamilies) {
try {
Thread.sleep(HeartbeatHRegion.columnFamilySleepTime);
} catch (InterruptedException e) {
}
if (sleepBetweenColumnFamilies) {
Threads.sleepWithoutInterrupt(columnFamilySleepTime);
}
}
private static void rowSleep() {
try {
if (HeartbeatHRegion.sleepBetweenRows) {
Thread.sleep(HeartbeatHRegion.rowSleepTime);
}
} catch (InterruptedException e) {
if (sleepBetweenRows) {
Threads.sleepWithoutInterrupt(rowSleepTime);
}
}
@ -510,8 +491,7 @@ public class TestScannerHeartbeatMessages {
}
@Override
public boolean nextRaw(List<Cell> outResults, ScannerContext context)
throws IOException {
public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
boolean moreRows = super.nextRaw(outResults, context);
HeartbeatHRegion.rowSleep();
return moreRows;
@ -538,8 +518,7 @@ public class TestScannerHeartbeatMessages {
}
@Override
public boolean nextRaw(List<Cell> outResults, ScannerContext context)
throws IOException {
public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
boolean moreRows = super.nextRaw(outResults, context);
HeartbeatHRegion.rowSleep();
return moreRows;
@ -571,8 +550,7 @@ public class TestScannerHeartbeatMessages {
}
@Override
public boolean next(List<Cell> result, ScannerContext context)
throws IOException {
public boolean next(List<Cell> result, ScannerContext context) throws IOException {
if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
boolean moreRows = super.next(result, context);
if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
@ -591,8 +569,7 @@ public class TestScannerHeartbeatMessages {
}
@Override
public boolean next(List<Cell> result, ScannerContext context)
throws IOException {
public boolean next(List<Cell> result, ScannerContext context) throws IOException {
if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
boolean moreRows = super.next(result, context);
if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();

View File

@ -173,12 +173,9 @@ module Hbase
# Does Namespace exist
def namespace_exists?(namespace_name)
namespaceDesc = @admin.getNamespaceDescriptor(namespace_name)
if(namespaceDesc == nil)
return @admin.getNamespaceDescriptor(namespace_name) != nil
rescue org.apache.hadoop.hbase.NamespaceNotFoundException => e
return false
else
return true
end
end
# Make sure that security features are available

View File

@ -160,21 +160,9 @@ EOF
end
#----------------------------------------------------------------------------------------------
# Delete a cell
def _delete_internal(row, column,
# Create a Delete mutation
def _createdelete_internal(row, column = nil,
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {})
_deleteall_internal(row, column, timestamp, args)
end
#----------------------------------------------------------------------------------------------
# Delete a row
def _deleteall_internal(row, column = nil,
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {})
# delete operation doesn't need read permission. Retaining the read check for
# meta table as a part of HBASE-5837.
if is_meta_table?
raise ArgumentError, "Row Not Found" if _get_internal(row).nil?
end
temptimestamp = timestamp
if temptimestamp.kind_of?(Hash)
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP
@ -195,8 +183,61 @@ EOF
family, qualifier = parse_column_name(column)
d.addColumns(family, qualifier, timestamp)
end
return d
end
#----------------------------------------------------------------------------------------------
# Delete rows using prefix
def _deleterows_internal(row, column = nil,
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args={})
cache = row["CACHE"] ? row["CACHE"] : 100
prefix = row["ROWPREFIXFILTER"]
# create scan to get table names using prefix
scan = org.apache.hadoop.hbase.client.Scan.new
scan.setRowPrefixFilter(prefix.to_java_bytes)
# Run the scanner to get all rowkeys
scanner = @table.getScanner(scan)
# Create a list to store all deletes
list = java.util.ArrayList.new
# Iterate results
iter = scanner.iterator
while iter.hasNext
row = iter.next
key = org.apache.hadoop.hbase.util.Bytes::toStringBinary(row.getRow)
d = _createdelete_internal(key, column, timestamp, args)
list.add(d)
if list.size >= cache
@table.delete(list)
list.clear
end
end
@table.delete(list)
end
#----------------------------------------------------------------------------------------------
# Delete a cell
def _delete_internal(row, column,
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {})
_deleteall_internal(row, column, timestamp, args)
end
#----------------------------------------------------------------------------------------------
# Delete a row
def _deleteall_internal(row, column = nil,
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP, args = {})
# delete operation doesn't need read permission. Retaining the read check for
# meta table as a part of HBASE-5837.
if is_meta_table?
raise ArgumentError, "Row Not Found" if _get_internal(row).nil?
end
if row.kind_of?(Hash)
_deleterows_internal(row, column, timestamp, args)
else
d = _createdelete_internal(row, column, timestamp, args)
@table.delete(d)
end
end
#----------------------------------------------------------------------------------------------
# Increment a counter atomically

View File

@ -23,7 +23,8 @@ module Shell
def help
return <<-EOF
Delete all cells in a given row; pass a table name, row, and optionally
a column and timestamp. Examples:
a column and timestamp. Deleteall also support deleting a row range using a
row key prefix. Examples:
hbase> deleteall 'ns1:t1', 'r1'
hbase> deleteall 't1', 'r1'
@ -31,13 +32,21 @@ a column and timestamp. Examples:
hbase> deleteall 't1', 'r1', 'c1', ts1
hbase> deleteall 't1', 'r1', 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'}
ROWPREFIXFILTER can be used to delete row ranges
hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'}
hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'}, 'c1' //delete certain column family in the row ranges
hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'}, 'c1', ts1
hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix'}, 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'}
CACHE can be used to specify how many deletes batched to be sent to server at one time, default is 100
hbase> deleteall 't1', {ROWPREFIXFILTER => 'prefix', CACHE => 100}
The same commands also can be run on a table reference. Suppose you had a reference
t to table 't1', the corresponding command would be:
hbase> t.deleteall 'r1'
hbase> t.deleteall 'r1', 'c1'
hbase> t.deleteall 'r1', 'c1', ts1
hbase> t.deleteall 'r1', 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'}
hbase> t.deleteall {ROWPREFIXFILTER => 'prefix', CACHE => 100}, 'c1', ts1, {VISIBILITY=>'PRIVATE|SECRET'}
EOF
end

View File

@ -120,6 +120,10 @@ module Hbase
@test_table.put(105, "x:a", "3")
@test_table.put(105, "x:a", "4")
@test_table.put("111", "x:a", "5")
@test_table.put("111", "x:b", "6")
@test_table.put("112", "x:a", "5")
end
def teardown
@ -181,6 +185,14 @@ module Hbase
assert_nil(res)
end
define_test "deletall should work with row prefix" do
@test_table.deleteall({ROWPREFIXFILTER => '11'})
res1 = @test_table._get_internal('111')
assert_nil(res1)
res2 = @test_table._get_internal('112')
assert_nil(res2)
end
define_test "append should work with value" do
@test_table.append("123", 'x:cnt2', '123')
end

View File

@ -505,7 +505,7 @@ For the build to sign them for you, you a properly configured _settings.xml_ in
=== Making a Release Candidate
NOTE: These instructions are for building HBase 1.0.x.
For building earlier versions, the process is different.
For building earlier versions, e.g. 0.98.x, the process is different.
See this section under the respective release documentation folders.
.Point Releases
@ -521,20 +521,23 @@ You should also have tried recent branch tips out on a cluster under load, perha
[NOTE]
====
At this point you should tag the previous release branch (ex: 0.96.1) with the new point release tag (e.g.
0.96.1.1 tag). Any commits with changes for the point release should be applied to the new tag.
0.96.1.1 tag). Any commits with changes for the point release should go against the new tag.
====
The Hadoop link:http://wiki.apache.org/hadoop/HowToRelease[How To
Release] wiki page is used as a model for most of the instructions below, and may have more detail on particular sections, so it is worth review.
Release] wiki page is used as a model for most of the instructions below.
Although it now stale, it may have more detail on particular sections, so
it is worth review especially if you get stuck.
.Specifying the Heap Space for Maven on OSX
[NOTE]
====
On OSX, you may need to specify the heap space for Maven commands, by setting the `MAVEN_OPTS` variable to `-Xmx3g`.
On OSX, you may run into OutOfMemoryErrors building, particularly building the site and
documentation. Up the heap and permgen space for Maven by setting the `MAVEN_OPTS` variable.
You can prefix the variable to the Maven command, as in the following example:
----
MAVEN_OPTS="-Xmx2g" mvn package
MAVEN_OPTS="-Xmx4g -XX:MaxPermSize=256m" mvn package
----
You could also set this in an environment variable or alias in your shell.
@ -552,7 +555,8 @@ The script handles everything else, and comes in handy.
Update _CHANGES.txt_ with the changes since the last release.
Make sure the URL to the JIRA points to the proper location which lists fixes for this release.
Adjust the version in all the POM files appropriately.
If you are making a release candidate, you must remove the `-SNAPSHOT` label from all versions.
If you are making a release candidate, you must remove the `-SNAPSHOT` label from all versions
in all pom.xml files.
If you are running this receipe to publish a snapshot, you must keep the `-SNAPSHOT` suffix on the hbase version.
The link:http://mojo.codehaus.org/versions-maven-plugin/[Versions
Maven Plugin] can be of use here.
@ -564,7 +568,7 @@ To set a version in all the many poms of the hbase multi-module project, use a c
$ mvn clean org.codehaus.mojo:versions-maven-plugin:1.3.1:set -DnewVersion=0.96.0
----
+
Checkin the _CHANGES.txt_ and any version changes.
Make sure all versions in poms are changed! Checkin the _CHANGES.txt_ and any version changes.
. Update the documentation.
+
@ -590,7 +594,7 @@ Extract the tarball and make sure it looks good.
A good test for the src tarball being 'complete' is to see if you can build new tarballs from this source bundle.
If the source tarball is good, save it off to a _version directory_, a directory somewhere where you are collecting all of the tarballs you will publish as part of the release candidate.
For example if you were building an hbase-0.96.0 release candidate, you might call the directory _hbase-0.96.0RC0_.
Later you will publish this directory as our release candidate up on pass:[http://people.apache.org/~YOU].
Later you will publish this directory as our release candidate.
. Build the binary tarball.
+
@ -617,7 +621,7 @@ $ mvn install -DskipTests site assembly:single -Prelease
+
Otherwise, the build complains that hbase modules are not in the maven repository
when you try to do it at once, especially on fresh repository.
when you try to do it all in one step, especially on a fresh repository.
It seems that you need the install goal in both steps.
+
Extract the generated tarball and check it out.
@ -650,7 +654,8 @@ $ mvn deploy -DskipTests -Papache-release -Prelease
This command copies all artifacts up to a temporary staging Apache mvn repository in an 'open' state.
More work needs to be done on these maven artifacts to make them generally available.
+
We do not release HBase tarball to the Apache Maven repository. To avoid deploying the tarball, do not include the `assembly:single` goal in your `mvn deploy` command. Check the deployed artifacts as described in the next section.
We do not release HBase tarball to the Apache Maven repository. To avoid deploying the tarball, do not
include the `assembly:single` goal in your `mvn deploy` command. Check the deployed artifacts as described in the next section.
. Make the Release Candidate available.
+
@ -698,13 +703,7 @@ If you run the script, do your checks at this stage verifying the src and bin ta
Tag before you start the build.
You can always delete it if the build goes haywire.
. Sign, upload, and 'stage' your version directory to link:http://people.apache.org[people.apache.org] (TODO:
There is a new location to stage releases using svnpubsub. See
(link:https://issues.apache.org/jira/browse/HBASE-10554[HBASE-10554 Please delete old releases from mirroring system]).
+
If all checks out, next put the _version directory_ up on link:http://people.apache.org[people.apache.org].
You will need to sign and fingerprint them before you push them up.
In the _version directory_ run the following commands:
. Sign, fingerprint and then 'stage' your release candiate version directory via svnpubsub by committing your directory to link:https://dist.apache.org/repos/dist/dev/hbase/[The 'dev' distribution directory] (See comments on link:https://issues.apache.org/jira/browse/HBASE-10554[HBASE-10554 Please delete old releases from mirroring system] but in essence it is an svn checkout of https://dist.apache.org/repos/dist/dev/hbase -- releases are at https://dist.apache.org/repos/dist/release/hbase). In the _version directory_ run the following commands:
+
[source,bourne]
----
@ -714,11 +713,28 @@ $ for i in *.tar.gz; do echo $i; gpg --print-md MD5 $i > $i.md5 ; done
$ for i in *.tar.gz; do echo $i; gpg --print-md SHA512 $i > $i.sha ; done
$ for i in *.tar.gz; do echo $i; gpg --armor --output $i.asc --detach-sig $i ; done
$ cd ..
# Presuming our 'version directory' is named 0.96.0RC0, now copy it up to people.apache.org.
$ rsync -av 0.96.0RC0 people.apache.org:public_html
# Presuming our 'version directory' is named 0.96.0RC0, copy it to the svn checkout of the dist dev dir
# in this case named hbase.dist.dev.svn
$ cd /Users/stack/checkouts/hbase.dist.dev.svn
$ svn info
Path: .
Working Copy Root Path: /Users/stack/checkouts/hbase.dist.dev.svn
URL: https://dist.apache.org/repos/dist/dev/hbase
Repository Root: https://dist.apache.org/repos/dist
Repository UUID: 0d268c88-bc11-4956-87df-91683dc98e59
Revision: 15087
Node Kind: directory
Schedule: normal
Last Changed Author: ndimiduk
Last Changed Rev: 15045
Last Changed Date: 2016-08-28 11:13:36 -0700 (Sun, 28 Aug 2016)
$ mv 0.96.0RC0 /Users/stack/checkouts/hbase.dist.dev.svn
$ svn add 0.96.0RC0
$ svn commit ...
----
+
Make sure the link:http://people.apache.org[people.apache.org] directory is showing and that the mvn repo URLs are good.
Ensure it actually gets published by checking link:https://dist.apache.org/repos/dist/dev/hbase/[https://dist.apache.org/repos/dist/dev/hbase/].
Announce the release candidate on the mailing list and call a vote.