HBASE-17937 Memstore size becomes negative in case of expensive postPut/Delete Coprocessor call

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Abhishek Singh Chouhan 2017-04-19 11:22:23 +05:30 committed by zhangduo
parent 40cc666ac9
commit 49cba2c237
2 changed files with 107 additions and 1 deletions

View File

@ -3392,6 +3392,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
}
// update memstore size
this.addAndGetMemstoreSize(memstoreSize);
// calling the post CP hook for batch mutation
if (!replay && coprocessorHost != null) {
MiniBatchOperationInProgress<Mutation> miniBatchOp =
@ -3444,7 +3447,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} finally {
// Call complete rather than completeAndWait because we probably had error if walKey != null
if (writeEntry != null) mvcc.complete(writeEntry);
this.addAndGetMemstoreSize(memstoreSize);
if (locked) {
this.updatesLock.readLock().unlock();
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MemstoreSize;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test that verifies we do not have memstore size negative when a postPut/Delete hook is
* slow/expensive and a flush is triggered at the same time the coprocessow is doing its work. To
* simulate this we call flush from the coprocessor itself
*/
@Category(LargeTests.class)
public class TestNegativeMemstoreSizeWithSlowCoprocessor {
static final Log LOG = LogFactory.getLog(TestNegativeMemstoreSizeWithSlowCoprocessor.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] tableName = Bytes.toBytes("test_table");
private static final byte[] family = Bytes.toBytes("f");
private static final byte[] qualifier = Bytes.toBytes("q");
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
FlushingRegionObserver.class.getName());
conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TableName.valueOf(tableName), family);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testNegativeMemstoreSize() throws IOException, InterruptedException {
boolean IOEthrown = false;
Table table = null;
try {
table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
// Adding data
Put put1 = new Put(Bytes.toBytes("row1"));
put1.addColumn(family, qualifier, Bytes.toBytes("Value1"));
table.put(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.addColumn(family, qualifier, Bytes.toBytes("Value2"));
table.put(put2);
table.put(put2);
} catch (IOException e) {
IOEthrown = true;
} finally {
Assert.assertFalse("Shouldn't have thrown an exception", IOEthrown);
if (table != null) {
table.close();
}
}
}
public static class FlushingRegionObserver extends SimpleRegionObserver {
@Override
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
HRegion region = (HRegion) c.getEnvironment().getRegion();
super.postPut(c, put, edit, durability);
if (Bytes.equals(put.getRow(), Bytes.toBytes("row2"))) {
region.flush(false);
Assert.assertTrue(region.addAndGetMemstoreSize(new MemstoreSize()) >= 0);
}
}
}
}