HBASE-17937 Memstore size becomes negative in case of expensive postPut/Delete Coprocessor call
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
6ea6915245
commit
d69a6366f6
|
@ -3503,6 +3503,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
doRollBackMemstore = false;
|
doRollBackMemstore = false;
|
||||||
|
// update memstore size
|
||||||
|
this.addAndGetGlobalMemstoreSize(addedSize);
|
||||||
|
|
||||||
// calling the post CP hook for batch mutation
|
// calling the post CP hook for batch mutation
|
||||||
if (!isInReplay && coprocessorHost != null) {
|
if (!isInReplay && coprocessorHost != null) {
|
||||||
MiniBatchOperationInProgress<Mutation> miniBatchOp =
|
MiniBatchOperationInProgress<Mutation> miniBatchOp =
|
||||||
|
@ -3560,7 +3563,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
if (writeEntry != null) mvcc.complete(writeEntry);
|
if (writeEntry != null) mvcc.complete(writeEntry);
|
||||||
} else {
|
} else {
|
||||||
this.addAndGetGlobalMemstoreSize(addedSize);
|
|
||||||
if (writeEntry != null) {
|
if (writeEntry != null) {
|
||||||
mvcc.completeAndWait(writeEntry);
|
mvcc.completeAndWait(writeEntry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that verifies we do not have memstore size negative when a postPut/Delete hook is
|
||||||
|
* slow/expensive and a flush is triggered at the same time the coprocessow is doing its work. To
|
||||||
|
* simulate this we call flush from the coprocessor itself
|
||||||
|
*/
|
||||||
|
@Category(LargeTests.class)
|
||||||
|
public class TestNegativeMemstoreSizeWithSlowCoprocessor {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestNegativeMemstoreSizeWithSlowCoprocessor.class);
|
||||||
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static final byte[] tableName = Bytes.toBytes("test_table");
|
||||||
|
private static final byte[] family = Bytes.toBytes("f");
|
||||||
|
private static final byte[] qualifier = Bytes.toBytes("q");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupBeforeClass() throws Exception {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
FlushingRegionObserver.class.getName());
|
||||||
|
conf.setBoolean(CoprocessorHost.ABORT_ON_ERROR_KEY, true);
|
||||||
|
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
|
||||||
|
TEST_UTIL.startMiniCluster(1);
|
||||||
|
TEST_UTIL.createTable(TableName.valueOf(tableName), family);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNegativeMemstoreSize() throws IOException, InterruptedException {
|
||||||
|
boolean IOEthrown = false;
|
||||||
|
Table table = null;
|
||||||
|
try {
|
||||||
|
table = TEST_UTIL.getConnection().getTable(TableName.valueOf(tableName));
|
||||||
|
|
||||||
|
// Adding data
|
||||||
|
Put put1 = new Put(Bytes.toBytes("row1"));
|
||||||
|
put1.addColumn(family, qualifier, Bytes.toBytes("Value1"));
|
||||||
|
table.put(put1);
|
||||||
|
Put put2 = new Put(Bytes.toBytes("row2"));
|
||||||
|
put2.addColumn(family, qualifier, Bytes.toBytes("Value2"));
|
||||||
|
table.put(put2);
|
||||||
|
table.put(put2);
|
||||||
|
} catch (IOException e) {
|
||||||
|
IOEthrown = true;
|
||||||
|
} finally {
|
||||||
|
Assert.assertFalse("Shouldn't have thrown an exception", IOEthrown);
|
||||||
|
if (table != null) {
|
||||||
|
table.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class FlushingRegionObserver extends SimpleRegionObserver {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
|
||||||
|
final WALEdit edit, final Durability durability) throws IOException {
|
||||||
|
HRegion region = (HRegion) c.getEnvironment().getRegion();
|
||||||
|
super.postPut(c, put, edit, durability);
|
||||||
|
|
||||||
|
if (Bytes.equals(put.getRow(), Bytes.toBytes("row2"))) {
|
||||||
|
region.flush(false);
|
||||||
|
Assert.assertTrue(region.addAndGetGlobalMemstoreSize(0) >= 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue