HBASE-19996 Some nonce procs might not be cleaned up (follow up HBASE-19756)
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
335b8a8e14
commit
e65004aeea
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator
|
|||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue;
|
||||
import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
@ -178,17 +179,19 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
// TODO: Select TTL based on Procedure type
|
||||
if ((procInfo.hasClientAckTime() && (now - procInfo.getClientAckTime()) >= evictAckTtl) ||
|
||||
(now - procInfo.getLastUpdate()) >= evictTtl) {
|
||||
// Failed Procedures aren't persisted in WAL.
|
||||
if (!(procInfo instanceof FailedProcedureInfo)) {
|
||||
store.delete(entry.getKey());
|
||||
}
|
||||
it.remove();
|
||||
|
||||
NonceKey nonceKey = procInfo.getNonceKey();
|
||||
if (nonceKey != null) {
|
||||
nonceKeysToProcIdsMap.remove(nonceKey);
|
||||
}
|
||||
if (isDebugEnabled) {
|
||||
LOG.debug("Evict completed procedure: " + procInfo);
|
||||
}
|
||||
NonceKey nonceKey = procInfo.getNonceKey();
|
||||
// Nonce procedures aren't persisted in WAL.
|
||||
if (nonceKey == null) {
|
||||
store.delete(entry.getKey());
|
||||
} else {
|
||||
nonceKeysToProcIdsMap.remove(nonceKey);
|
||||
}
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -696,7 +699,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
if (procId == null || completed.containsKey(procId)) return;
|
||||
|
||||
final long currentTime = EnvironmentEdgeManager.currentTime();
|
||||
final ProcedureInfo result = new ProcedureInfo(
|
||||
final ProcedureInfo result = new FailedProcedureInfo(
|
||||
procId.longValue(),
|
||||
procName,
|
||||
procOwner != null ? procOwner.getShortName() : null,
|
||||
|
@ -710,6 +713,17 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
completed.putIfAbsent(procId, result);
|
||||
}
|
||||
|
||||
public static class FailedProcedureInfo extends ProcedureInfo {
|
||||
|
||||
public FailedProcedureInfo(long procId, String procName, String procOwner,
|
||||
ProcedureState procState, long parentId, NonceKey nonceKey,
|
||||
ErrorHandlingProtos.ForeignExceptionMessage exception, long lastUpdate, long startTime,
|
||||
byte[] result) {
|
||||
super(procId, procName, procOwner, procState, parentId, nonceKey, exception, lastUpdate,
|
||||
startTime, result);
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================================================================
|
||||
// Submit/Abort Procedure
|
||||
// ==========================================================================
|
||||
|
|
|
@ -14,8 +14,9 @@
|
|||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
package org.apache.hadoop.hbase.procedure;
|
||||
|
||||
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -30,14 +31,13 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.ProcedureInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
|
||||
import org.apache.hadoop.hbase.security.AccessDeniedException;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
@ -46,24 +46,23 @@ import org.junit.experimental.categories.Category;
|
|||
* Check if CompletedProcedureCleaner cleans up failed nonce procedures.
|
||||
*/
|
||||
@Category(MediumTests.class)
|
||||
public class TestNonceProcCleanerOnFailure {
|
||||
private static final Log LOG = LogFactory.getLog(TestNonceProcCleanerOnFailure.class);
|
||||
public class TestFailedProcCleanup {
|
||||
private static final Log LOG = LogFactory.getLog(TestFailedProcCleanup.class);
|
||||
|
||||
protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static Configuration conf;
|
||||
private static final TableName TABLE = TableName.valueOf("test");
|
||||
private static final byte[] FAMILY = Bytes.toBytesBinary("f");
|
||||
private static final int evictionDelay = 10 * 1000;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
public static void setUpBeforeClass() {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
conf.setInt("hbase.procedure.cleaner.evict.ttl", evictionDelay);
|
||||
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, CreateFailObserver.class.getName());
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
TEST_UTIL.cleanupTestDir();
|
||||
TEST_UTIL.cleanupDataTestDirOnTestFS();
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
|
@ -71,6 +70,28 @@ public class TestNonceProcCleanerOnFailure {
|
|||
|
||||
@Test
|
||||
public void testFailCreateTable() throws Exception {
|
||||
conf.set(MASTER_COPROCESSOR_CONF_KEY, CreateFailObserver.class.getName());
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
try {
|
||||
TEST_UTIL.createTable(TABLE, FAMILY);
|
||||
} catch (AccessDeniedException e) {
|
||||
LOG.debug("Ignoring exception: ", e);
|
||||
Thread.sleep(evictionDelay * 3);
|
||||
}
|
||||
List<ProcedureInfo> procedureInfos =
|
||||
TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().listProcedures();
|
||||
for (ProcedureInfo procedureInfo : procedureInfos) {
|
||||
if (procedureInfo.getProcName().equals("CreateTableProcedure")
|
||||
&& procedureInfo.getProcState() == ProcedureProtos.ProcedureState.ROLLEDBACK) {
|
||||
fail("Found procedure " + procedureInfo + " that hasn't been cleaned up");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailCreateTableHandler() throws Exception {
|
||||
conf.set(MASTER_COPROCESSOR_CONF_KEY, CreateFailObserverHandler.class.getName());
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
try {
|
||||
TEST_UTIL.createTable(TABLE, FAMILY);
|
||||
} catch (AccessDeniedException e) {
|
||||
|
@ -98,4 +119,17 @@ public class TestNonceProcCleanerOnFailure {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class CreateFailObserverHandler extends BaseMasterObserver {
|
||||
|
||||
@Override
|
||||
public void preCreateTableHandler(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
HTableDescriptor desc, HRegionInfo[] regions) throws IOException {
|
||||
|
||||
if (desc.getTableName().equals(TABLE)) {
|
||||
throw new AccessDeniedException("Don't allow creation of table");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue