HBASE-14689 Addendum and unit test for HBASE-13471
Conflicts: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
This commit is contained in:
parent
d76dbb4f84
commit
ec021a7b25
|
@ -2990,11 +2990,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.warn("Failed getting lock in batch put, row="
|
LOG.warn("Failed getting lock in batch put, row="
|
||||||
+ Bytes.toStringBinary(mutation.getRow()), ioe);
|
+ Bytes.toStringBinary(mutation.getRow()), ioe);
|
||||||
|
throw ioe;
|
||||||
}
|
}
|
||||||
if (rowLock == null) {
|
if (rowLock == null) {
|
||||||
// We failed to grab another lock
|
// We failed to grab another lock
|
||||||
assert false: "Should never fail to get lock when blocking";
|
throw new IOException("Failed getting lock in batch put, row=" +
|
||||||
break; // stop acquiring more rows for this batch
|
Bytes.toStringBinary(mutation.getRow()));
|
||||||
} else {
|
} else {
|
||||||
acquiredRowLocks.add(rowLock);
|
acquiredRowLocks.add(rowLock);
|
||||||
}
|
}
|
||||||
|
@ -5068,6 +5069,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
* @param readLock is the lock reader or writer. True indicates that a non-exlcusive
|
* @param readLock is the lock reader or writer. True indicates that a non-exlcusive
|
||||||
* lock is requested
|
* lock is requested
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
|
public RowLock getRowLock(byte[] row, boolean readLock) throws IOException {
|
||||||
// Make sure the row is inside of this region before getting the lock for it.
|
// Make sure the row is inside of this region before getting the lock for it.
|
||||||
checkRow(row, "row lock");
|
checkRow(row, "row lock");
|
||||||
|
@ -7031,7 +7033,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
*/
|
*/
|
||||||
private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
|
private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
|
||||||
if (cell.getTagsLength() <= 0) return tags;
|
if (cell.getTagsLength() <= 0) return tags;
|
||||||
List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags;
|
List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags;
|
||||||
Iterator<Tag> i =
|
Iterator<Tag> i =
|
||||||
CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
|
CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
|
||||||
while (i.hasNext()) newTags.add(i.next());
|
while (i.hasNext()) newTags.add(i.next());
|
||||||
|
@ -7322,7 +7324,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
// They are subtley different in quiet a few ways. This came out only
|
// They are subtley different in quiet a few ways. This came out only
|
||||||
// after study. I am not sure that many of the differences are intentional.
|
// after study. I am not sure that many of the differences are intentional.
|
||||||
// TODO: St.Ack 20150907
|
// TODO: St.Ack 20150907
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result increment(Increment mutation, long nonceGroup, long nonce)
|
public Result increment(Increment mutation, long nonceGroup, long nonce)
|
||||||
|
@ -7336,7 +7338,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
boolean writeToWAL = durability != Durability.SKIP_WAL;
|
boolean writeToWAL = durability != Durability.SKIP_WAL;
|
||||||
WALEdit walEdits = null;
|
WALEdit walEdits = null;
|
||||||
List<Cell> allKVs = new ArrayList<Cell>(mutation.size());
|
List<Cell> allKVs = new ArrayList<Cell>(mutation.size());
|
||||||
|
|
||||||
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
|
Map<Store, List<Cell>> tempMemstore = new HashMap<Store, List<Cell>>();
|
||||||
long size = 0;
|
long size = 0;
|
||||||
long txid = 0;
|
long txid = 0;
|
||||||
|
@ -8138,7 +8140,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
|
WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
|
||||||
getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
|
getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null,
|
||||||
HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
|
HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC());
|
||||||
|
|
||||||
// Call append but with an empty WALEdit. The returned sequence id will not be associated
|
// Call append but with an empty WALEdit. The returned sequence id will not be associated
|
||||||
// with any edit and we can be sure it went in after all outstanding appends.
|
// with any edit and we can be sure it went in after all outstanding appends.
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -56,7 +56,11 @@ import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -100,6 +104,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
|
@ -6131,7 +6136,7 @@ public class TestHRegion {
|
||||||
key.setWriteEntry(we);
|
key.setWriteEntry(we);
|
||||||
return 1L;
|
return 1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
return wal;
|
return wal;
|
||||||
}
|
}
|
||||||
|
@ -6154,7 +6159,7 @@ public class TestHRegion {
|
||||||
// capture append() calls
|
// capture append() calls
|
||||||
WAL wal = mockWAL();
|
WAL wal = mockWAL();
|
||||||
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
|
when(rss.getWAL((HRegionInfo) any())).thenReturn(wal);
|
||||||
|
|
||||||
|
|
||||||
// open a region first so that it can be closed later
|
// open a region first so that it can be closed later
|
||||||
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
|
region = HRegion.openHRegion(hri, htd, rss.getWAL(hri),
|
||||||
|
@ -6494,6 +6499,53 @@ public class TestHRegion {
|
||||||
qual2, 0, qual2.length));
|
qual2, 0, qual2.length));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testBatchMutateWithWrongRegionException() throws IOException, InterruptedException {
|
||||||
|
final byte[] a = Bytes.toBytes("a");
|
||||||
|
final byte[] b = Bytes.toBytes("b");
|
||||||
|
final byte[] c = Bytes.toBytes("c"); // exclusive
|
||||||
|
|
||||||
|
int prevLockTimeout = CONF.getInt("hbase.rowlock.wait.duration", 30000);
|
||||||
|
CONF.setInt("hbase.rowlock.wait.duration", 3000);
|
||||||
|
final HRegion region = initHRegion(tableName, a, c, name.getMethodName(), CONF, false, fam1);
|
||||||
|
|
||||||
|
Mutation[] mutations = new Mutation[] {
|
||||||
|
new Put(a).addImmutable(fam1, null, null),
|
||||||
|
new Put(c).addImmutable(fam1, null, null), // this is outside the region boundary
|
||||||
|
new Put(b).addImmutable(fam1, null, null),
|
||||||
|
};
|
||||||
|
|
||||||
|
OperationStatus[] status = region.batchMutate(mutations);
|
||||||
|
assertEquals(status[0].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||||
|
assertEquals(status[1].getOperationStatusCode(), OperationStatusCode.SANITY_CHECK_FAILURE);
|
||||||
|
assertEquals(status[2].getOperationStatusCode(), OperationStatusCode.SUCCESS);
|
||||||
|
|
||||||
|
// test with a leaked row lock
|
||||||
|
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||||
|
exec.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
region.getRowLock(b);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
exec.shutdown();
|
||||||
|
exec.awaitTermination(30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
mutations = new Mutation[] {
|
||||||
|
new Put(a).addImmutable(fam1, null, null),
|
||||||
|
new Put(b).addImmutable(fam1, null, null),
|
||||||
|
};
|
||||||
|
|
||||||
|
try {
|
||||||
|
status = region.batchMutate(mutations);
|
||||||
|
fail("Failed to throw exception");
|
||||||
|
} catch (IOException expected) {
|
||||||
|
}
|
||||||
|
|
||||||
|
CONF.setInt("hbase.rowlock.wait.duration", prevLockTimeout);
|
||||||
|
}
|
||||||
|
|
||||||
static HRegion initHRegion(byte[] tableName, String callingMethod,
|
static HRegion initHRegion(byte[] tableName, String callingMethod,
|
||||||
byte[]... families) throws IOException {
|
byte[]... families) throws IOException {
|
||||||
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
|
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
|
||||||
|
|
Loading…
Reference in New Issue