HBASE-17519 Rollback the removed cells (ChiaPing Tsai)
This commit is contained in:
parent
19f9a1a643
commit
ed023058d2
@ -552,7 +552,7 @@ public class DefaultMemStore implements MemStore {
|
||||
// 'now' and a 0 memstoreTS == immediately visible
|
||||
List<Cell> cells = new ArrayList<Cell>(1);
|
||||
cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
|
||||
return upsert(cells, 1L);
|
||||
return upsert(cells, 1L, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -571,13 +571,14 @@ public class DefaultMemStore implements MemStore {
|
||||
*
|
||||
* @param cells
|
||||
* @param readpoint readpoint below which we can safely remove duplicate KVs
|
||||
* @param removedCells collect the removed cells. It can be null.
|
||||
* @return change in memstore size
|
||||
*/
|
||||
@Override
|
||||
public long upsert(Iterable<Cell> cells, long readpoint) {
|
||||
public long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) {
|
||||
long size = 0;
|
||||
for (Cell cell : cells) {
|
||||
size += upsert(cell, readpoint);
|
||||
size += upsert(cell, readpoint, removedCells);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
@ -596,7 +597,7 @@ public class DefaultMemStore implements MemStore {
|
||||
* @param cell
|
||||
* @return change in size of MemStore
|
||||
*/
|
||||
private long upsert(Cell cell, long readpoint) {
|
||||
private long upsert(Cell cell, long readpoint, List<Cell> removedCells) {
|
||||
// Add the Cell to the MemStore
|
||||
// Use the internalAdd method here since we (a) already have a lock
|
||||
// and (b) cannot safely use the MSLAB here without potentially
|
||||
@ -635,6 +636,9 @@ public class DefaultMemStore implements MemStore {
|
||||
long delta = heapSizeChange(cur, true);
|
||||
addedSize -= delta;
|
||||
this.size.addAndGet(-delta);
|
||||
if (removedCells != null) {
|
||||
removedCells.add(cur);
|
||||
}
|
||||
it.remove();
|
||||
setOldestEditTimeToNow();
|
||||
} else {
|
||||
|
@ -4025,11 +4025,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
* the wal. This method is then invoked to rollback the memstore.
|
||||
*/
|
||||
private void rollbackMemstore(List<Cell> memstoreCells) {
|
||||
int kvsRolledback = 0;
|
||||
rollbackMemstore(null, memstoreCells);
|
||||
}
|
||||
|
||||
private void rollbackMemstore(final Store defaultStore, List<Cell> memstoreCells) {
|
||||
int kvsRolledback = 0;
|
||||
for (Cell cell : memstoreCells) {
|
||||
byte[] family = CellUtil.cloneFamily(cell);
|
||||
Store store = getStore(family);
|
||||
Store store = defaultStore;
|
||||
if (store == null) {
|
||||
byte[] family = CellUtil.cloneFamily(cell);
|
||||
store = getStore(family);
|
||||
}
|
||||
store.rollback(cell);
|
||||
kvsRolledback++;
|
||||
}
|
||||
@ -7586,12 +7592,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
byte[] row = mutate.getRow();
|
||||
checkRow(row, op.toString());
|
||||
checkFamilies(mutate.getFamilyCellMap().keySet());
|
||||
boolean flush = false;
|
||||
Durability durability = getEffectiveDurability(mutate.getDurability());
|
||||
boolean writeToWAL = durability != Durability.SKIP_WAL;
|
||||
WALEdit walEdits = null;
|
||||
List<Cell> allKVs = new ArrayList<Cell>(mutate.size());
|
||||
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
|
||||
Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
|
||||
long size = 0;
|
||||
long txid = 0;
|
||||
checkReadOnly();
|
||||
@ -7718,26 +7724,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
}
|
||||
|
||||
// Actually write to Memstore now
|
||||
if (!tempMemstore.isEmpty()) {
|
||||
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
|
||||
Store store = entry.getKey();
|
||||
if (store.getFamily().getMaxVersions() == 1) {
|
||||
// upsert if VERSIONS for this CF == 1
|
||||
size += store.upsert(entry.getValue(), getSmallestReadPoint());
|
||||
} else {
|
||||
// otherwise keep older versions around
|
||||
size += store.add(entry.getValue());
|
||||
if (!entry.getValue().isEmpty()) {
|
||||
doRollBackMemstore = true;
|
||||
}
|
||||
doRollBackMemstore = !tempMemstore.isEmpty();
|
||||
for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) {
|
||||
Store store = entry.getKey();
|
||||
if (store.getFamily().getMaxVersions() == 1) {
|
||||
List<Cell> removedCells = removedCellsForMemStore.get(store);
|
||||
if (removedCells == null) {
|
||||
removedCells = new ArrayList<>();
|
||||
removedCellsForMemStore.put(store, removedCells);
|
||||
}
|
||||
// We add to all KVs here whereas when doing increment, we do it
|
||||
// earlier... why?
|
||||
allKVs.addAll(entry.getValue());
|
||||
// upsert if VERSIONS for this CF == 1
|
||||
size += store.upsert(entry.getValue(), getSmallestReadPoint(), removedCells);
|
||||
} else {
|
||||
// otherwise keep older versions around
|
||||
size += store.add(entry.getValue());
|
||||
}
|
||||
|
||||
size = this.addAndGetGlobalMemstoreSize(size);
|
||||
flush = isFlushSize(size);
|
||||
// We add to all KVs here whereas when doing increment, we do it
|
||||
// earlier... why?
|
||||
allKVs.addAll(entry.getValue());
|
||||
}
|
||||
} finally {
|
||||
this.updatesLock.readLock().unlock();
|
||||
@ -7763,7 +7767,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
// if the wal sync was unsuccessful, remove keys from memstore
|
||||
WriteEntry we = walKey != null? walKey.getWriteEntry(): null;
|
||||
if (doRollBackMemstore) {
|
||||
rollbackMemstore(allKVs);
|
||||
for (Map.Entry<Store, List<Cell>> entry: tempMemstore.entrySet()) {
|
||||
rollbackMemstore(entry.getKey(), entry.getValue());
|
||||
}
|
||||
for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
|
||||
entry.getKey().add(entry.getValue());
|
||||
}
|
||||
if (we != null) mvcc.complete(we);
|
||||
} else if (we != null) {
|
||||
mvcc.completeAndWait(we);
|
||||
@ -7775,12 +7784,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
if (this.metricsRegion != null) {
|
||||
this.metricsRegion.updateAppend();
|
||||
}
|
||||
|
||||
if (flush) {
|
||||
// Request a cache flush. Do it outside update lock.
|
||||
requestFlush();
|
||||
}
|
||||
|
||||
if (isFlushSize(this.addAndGetGlobalMemstoreSize(size))) requestFlush();
|
||||
return mutate.isReturnResults() ? Result.create(allKVs) : null;
|
||||
}
|
||||
|
||||
@ -7887,7 +7891,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
boolean doRollBackMemstore = false;
|
||||
long accumulatedResultSize = 0;
|
||||
List<Cell> allKVs = new ArrayList<Cell>(increment.size());
|
||||
List<Cell> memstoreCells = new ArrayList<Cell>();
|
||||
Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>();
|
||||
Map<Store, List<Cell>> forMemStore = new HashMap<>();
|
||||
Durability effectiveDurability = getEffectiveDurability(increment.getDurability());
|
||||
try {
|
||||
rowLock = getRowLockInternal(increment.getRow(), false);
|
||||
@ -7907,7 +7912,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
WALEdit walEdits = null;
|
||||
// Process increments a Store/family at a time.
|
||||
// Accumulate edits for memstore to add later after we've added to WAL.
|
||||
Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>();
|
||||
for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) {
|
||||
byte [] columnFamilyName = entry.getKey();
|
||||
List<Cell> increments = entry.getValue();
|
||||
@ -7956,19 +7960,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
}
|
||||
|
||||
// Now write to memstore, a family at a time.
|
||||
doRollBackMemstore = !forMemStore.isEmpty();
|
||||
for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
|
||||
Store store = entry.getKey();
|
||||
List<Cell> results = entry.getValue();
|
||||
if (store.getFamily().getMaxVersions() == 1) {
|
||||
List<Cell> removedCells = removedCellsForMemStore.get(store);
|
||||
if (removedCells == null) {
|
||||
removedCells = new ArrayList<>();
|
||||
removedCellsForMemStore.put(store, removedCells);
|
||||
}
|
||||
// Upsert if VERSIONS for this CF == 1
|
||||
accumulatedResultSize += store.upsert(results, getSmallestReadPoint());
|
||||
// TODO: St.Ack 20151222 Why no rollback in this case?
|
||||
accumulatedResultSize += store.upsert(results, getSmallestReadPoint(), removedCells);
|
||||
} else {
|
||||
// Otherwise keep older versions around
|
||||
accumulatedResultSize += store.add(entry.getValue());
|
||||
if (!entry.getValue().isEmpty()) {
|
||||
doRollBackMemstore = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
@ -7993,7 +7999,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||
}
|
||||
// if the wal sync was unsuccessful, remove keys from memstore
|
||||
if (doRollBackMemstore) {
|
||||
rollbackMemstore(memstoreCells);
|
||||
for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
|
||||
rollbackMemstore(entry.getKey(), entry.getValue());
|
||||
}
|
||||
for (Map.Entry<Store, List<Cell>> entry: removedCellsForMemStore.entrySet()) {
|
||||
entry.getKey().add(entry.getValue());
|
||||
}
|
||||
if (walKey != null) mvcc.complete(walKey.getWriteEntry());
|
||||
} else {
|
||||
if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry());
|
||||
|
@ -2388,10 +2388,11 @@ public class HStore implements Store {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long upsert(Iterable<Cell> cells, long readpoint) throws IOException {
|
||||
public long upsert(Iterable<Cell> cells, long readpoint,
|
||||
List<Cell> removedCells) throws IOException {
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
return this.memstore.upsert(cells, readpoint);
|
||||
return this.memstore.upsert(cells, readpoint, removedCells);
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
@ -135,9 +135,10 @@ public interface MemStore extends HeapSize {
|
||||
* only see each KeyValue update as atomic.
|
||||
* @param cells
|
||||
* @param readpoint readpoint below which we can safely remove duplicate Cells.
|
||||
* @param removedCells collect the removed cells. It can be null.
|
||||
* @return change in memstore size
|
||||
*/
|
||||
long upsert(Iterable<Cell> cells, long readpoint);
|
||||
long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells);
|
||||
|
||||
/**
|
||||
* @return scanner over the memstore. This might include scanner over the snapshot when one is
|
||||
|
@ -138,10 +138,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
|
||||
* across all of them.
|
||||
* @param cells
|
||||
* @param readpoint readpoint below which we can safely remove duplicate KVs
|
||||
* @param removedCells collect the removed cells. It can be null.
|
||||
* @return memstore size delta
|
||||
* @throws IOException
|
||||
*/
|
||||
long upsert(Iterable<Cell> cells, long readpoint) throws IOException;
|
||||
long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) throws IOException;
|
||||
|
||||
/**
|
||||
* Adds a value to the memstore
|
||||
|
@ -0,0 +1,352 @@
|
||||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestRollbackFromClient {
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
private final static HBaseTestingUtility TEST_UTIL
|
||||
= new HBaseTestingUtility();
|
||||
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
|
||||
private static final int SLAVES = 3;
|
||||
private static final byte[] ROW = Bytes.toBytes("testRow");
|
||||
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
|
||||
private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2");
|
||||
private static final byte[] VALUE = Bytes.toBytes("testValue");
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, FailedDefaultWALProvider.class.getName());
|
||||
TEST_UTIL.startMiniCluster(SLAVES);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendRollback() throws IOException {
|
||||
Updater updateForEmptyTable = new Updater() {
|
||||
@Override
|
||||
public int updateData(Table table, byte[] family) {
|
||||
try {
|
||||
Append append = new Append(ROW);
|
||||
append.add(FAMILY, QUALIFIER, VALUE);
|
||||
append.add(FAMILY, QUALIFIER_V2, VALUE);
|
||||
FailedHLog.SHOULD_FAIL.set(true);
|
||||
table.append(append);
|
||||
} catch (IOException e) {
|
||||
// It should fail because the WAL fail also
|
||||
} finally {
|
||||
FailedHLog.SHOULD_FAIL.set(false);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
testRollback(updateForEmptyTable, 1, null);
|
||||
testRollback(updateForEmptyTable, 2, null);
|
||||
|
||||
final Append preAppend = new Append(ROW);
|
||||
preAppend.add(FAMILY, QUALIFIER, VALUE);
|
||||
Cell initCell = preAppend.getCellList(FAMILY).get(0);
|
||||
Updater updateForNonEmptyTable = new Updater() {
|
||||
@Override
|
||||
public int updateData(Table table, byte[] family) throws IOException {
|
||||
table.append(preAppend);
|
||||
try {
|
||||
Append append = new Append(ROW);
|
||||
append.add(FAMILY, QUALIFIER, VALUE);
|
||||
append.add(FAMILY, QUALIFIER_V2, VALUE);
|
||||
FailedHLog.SHOULD_FAIL.set(true);
|
||||
table.append(append);
|
||||
Assert.fail("It should fail because the WAL sync is failed");
|
||||
} catch (IOException e) {
|
||||
} finally {
|
||||
FailedHLog.SHOULD_FAIL.set(false);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
testRollback(updateForNonEmptyTable, 1, initCell);
|
||||
testRollback(updateForNonEmptyTable, 2, initCell);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementRollback() throws IOException {
|
||||
Updater updateForEmptyTable = new Updater() {
|
||||
@Override
|
||||
public int updateData(Table table, byte[] family) {
|
||||
try {
|
||||
Increment inc = new Increment(ROW);
|
||||
inc.addColumn(FAMILY, QUALIFIER, 1);
|
||||
inc.addColumn(FAMILY, QUALIFIER_V2, 2);
|
||||
FailedHLog.SHOULD_FAIL.set(true);
|
||||
table.increment(inc);
|
||||
} catch (IOException e) {
|
||||
// It should fail because the WAL fail also
|
||||
} finally {
|
||||
FailedHLog.SHOULD_FAIL.set(false);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
testRollback(updateForEmptyTable, 1, null);
|
||||
testRollback(updateForEmptyTable, 2, null);
|
||||
|
||||
final Increment preIncrement = new Increment(ROW);
|
||||
preIncrement.addColumn(FAMILY, QUALIFIER, 1);
|
||||
Cell initCell = preIncrement.getCellList(FAMILY).get(0);
|
||||
Updater updateForNonEmptyTable = new Updater() {
|
||||
@Override
|
||||
public int updateData(Table table, byte[] family) throws IOException {
|
||||
table.increment(preIncrement);
|
||||
try {
|
||||
Increment inc = new Increment(ROW);
|
||||
inc.addColumn(FAMILY, QUALIFIER, 1);
|
||||
inc.addColumn(FAMILY, QUALIFIER_V2, 2);
|
||||
FailedHLog.SHOULD_FAIL.set(true);
|
||||
table.increment(inc);
|
||||
Assert.fail("It should fail because the WAL sync is failed");
|
||||
} catch (IOException e) {
|
||||
} finally {
|
||||
FailedHLog.SHOULD_FAIL.set(false);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
testRollback(updateForNonEmptyTable, 1, initCell);
|
||||
testRollback(updateForNonEmptyTable, 2, initCell);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutRollback() throws IOException {
|
||||
Updater updateForEmptyTable = new Updater() {
|
||||
@Override
|
||||
public int updateData(Table table, byte[] family) {
|
||||
try {
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
FailedHLog.SHOULD_FAIL.set(true);
|
||||
table.put(put);
|
||||
Assert.fail("It should fail because the WAL sync is failed");
|
||||
} catch (IOException e) {
|
||||
} finally {
|
||||
FailedHLog.SHOULD_FAIL.set(false);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
testRollback(updateForEmptyTable, 1, null);
|
||||
testRollback(updateForEmptyTable, 2, null);
|
||||
|
||||
final Put prePut = new Put(ROW);
|
||||
prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa"));
|
||||
Cell preCell = prePut.getCellList(FAMILY).get(0);
|
||||
Updater updateForNonEmptyTable = new Updater() {
|
||||
@Override
|
||||
public int updateData(Table table, byte[] family) throws IOException {
|
||||
table.put(prePut);
|
||||
try {
|
||||
Put put = new Put(ROW);
|
||||
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||
FailedHLog.SHOULD_FAIL.set(true);
|
||||
table.put(put);
|
||||
Assert.fail("It should fail because the WAL sync is failed");
|
||||
} catch (IOException e) {
|
||||
} finally {
|
||||
FailedHLog.SHOULD_FAIL.set(false);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
};
|
||||
testRollback(updateForNonEmptyTable, 1, preCell);
|
||||
testRollback(updateForNonEmptyTable, 2, preCell);
|
||||
}
|
||||
|
||||
private void testRollback(Updater updater, int versions, Cell initCell) throws IOException {
|
||||
TableName tableName = TableName.valueOf(this.name.getMethodName());
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor col = new HColumnDescriptor(FAMILY);
|
||||
col.setMaxVersions(versions);
|
||||
desc.addFamily(col);
|
||||
TEST_UTIL.getHBaseAdmin().createTable(desc);
|
||||
int expected;
|
||||
List<Cell> cells;
|
||||
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
Table table = conn.getTable(tableName)) {
|
||||
expected = updater.updateData(table, FAMILY);
|
||||
cells = getAllCells(table);
|
||||
}
|
||||
TEST_UTIL.getHBaseAdmin().disableTable(tableName);
|
||||
TEST_UTIL.getHBaseAdmin().deleteTable(tableName);
|
||||
assertEquals(expected, cells.size());
|
||||
if (initCell != null && cells.isEmpty()) {
|
||||
Cell cell = cells.get(0);
|
||||
assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell));
|
||||
assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell));
|
||||
assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell));
|
||||
assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell));
|
||||
}
|
||||
}
|
||||
|
||||
interface Updater {
|
||||
int updateData(Table table, byte[] family) throws IOException;
|
||||
}
|
||||
|
||||
private static List<Cell> getAllCells(Table table) throws IOException {
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
try (ResultScanner scanner = table.getScanner(new Scan())) {
|
||||
for (Result r : scanner) {
|
||||
cells.addAll(r.listCells());
|
||||
}
|
||||
return cells;
|
||||
}
|
||||
}
|
||||
|
||||
public static class FailedDefaultWALProvider extends DefaultWALProvider {
|
||||
@Override
|
||||
public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException {
|
||||
WAL wal = super.getWAL(identifier, namespace);
|
||||
return new FailedHLog(wal);
|
||||
}
|
||||
}
|
||||
|
||||
public static class FailedHLog implements WAL {
|
||||
private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false);
|
||||
private final WAL delegation;
|
||||
FailedHLog(final WAL delegation) {
|
||||
this.delegation = delegation;
|
||||
}
|
||||
@Override
|
||||
public void registerWALActionsListener(WALActionsListener listener) {
|
||||
delegation.registerWALActionsListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean unregisterWALActionsListener(WALActionsListener listener) {
|
||||
return delegation.unregisterWALActionsListener(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[][] rollWriter() throws FailedLogCloseException, IOException {
|
||||
return delegation.rollWriter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException {
|
||||
return delegation.rollWriter(force);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
delegation.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegation.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) throws IOException {
|
||||
return delegation.append(htd, info, key, edits, inMemstore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
delegation.sync();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sync(long txid) throws IOException {
|
||||
if (SHOULD_FAIL.get()) {
|
||||
throw new IOException("[TESTING] we need the failure!!!");
|
||||
}
|
||||
delegation.sync(txid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) {
|
||||
return delegation.startCacheFlush(encodedRegionName, families);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeCacheFlush(byte[] encodedRegionName) {
|
||||
delegation.completeCacheFlush(encodedRegionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortCacheFlush(byte[] encodedRegionName) {
|
||||
delegation.abortCacheFlush(encodedRegionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WALCoprocessorHost getCoprocessorHost() {
|
||||
return delegation.getCoprocessorHost();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
|
||||
return delegation.getEarliestMemstoreSeqNum(encodedRegionName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
|
||||
return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -866,7 +866,7 @@ public class TestDefaultMemStore extends TestCase {
|
||||
kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1);
|
||||
l.add(kv1); l.add(kv2); l.add(kv3);
|
||||
|
||||
this.memstore.upsert(l, 2);// readpoint is 2
|
||||
this.memstore.upsert(l, 2, null);// readpoint is 2
|
||||
long newSize = this.memstore.size.get();
|
||||
assert(newSize > oldSize);
|
||||
//The kv1 should be removed.
|
||||
@ -875,7 +875,7 @@ public class TestDefaultMemStore extends TestCase {
|
||||
KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
|
||||
kv4.setSequenceId(1);
|
||||
l.clear(); l.add(kv4);
|
||||
this.memstore.upsert(l, 3);
|
||||
this.memstore.upsert(l, 3, null);
|
||||
assertEquals(newSize, this.memstore.size.get());
|
||||
//The kv2 should be removed.
|
||||
assert(memstore.cellSet.size() == 2);
|
||||
@ -919,7 +919,7 @@ public class TestDefaultMemStore extends TestCase {
|
||||
KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
|
||||
kv1.setSequenceId(100);
|
||||
l.add(kv1);
|
||||
memstore.upsert(l, 1000);
|
||||
memstore.upsert(l, 1000, null);
|
||||
t = memstore.timeOfOldestEdit();
|
||||
assertTrue(t == 1234);
|
||||
} finally {
|
||||
|
Loading…
x
Reference in New Issue
Block a user