HBASE-19966 The WriteEntry for WALKey maybe null if we failed to call WAL.append
This commit is contained in:
parent
2b63af376e
commit
32f235abee
|
@ -156,7 +156,9 @@ public class WALUtil {
|
|||
// Call complete only here because these are markers only. They are not for clients to read.
|
||||
mvcc.complete(walKey.getWriteEntry());
|
||||
} catch (IOException ioe) {
|
||||
mvcc.complete(walKey.getWriteEntry());
|
||||
if (walKey.getWriteEntry() != null) {
|
||||
mvcc.complete(walKey.getWriteEntry());
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
return walKey;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,35 +17,32 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.SequenceId;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||
// imports for things that haven't moved from regionserver.wal yet.
|
||||
import org.apache.hadoop.hbase.regionserver.SequenceId;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* Default implementation of Key for an Entry in the WAL.
|
||||
|
@ -78,8 +74,7 @@ public class WALKeyImpl implements WALKey {
|
|||
* @return A WriteEntry gotten from local WAL subsystem.
|
||||
* @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry)
|
||||
*/
|
||||
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
|
||||
assert this.writeEntry != null;
|
||||
public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() {
|
||||
return this.writeEntry;
|
||||
}
|
||||
|
||||
|
|
|
@ -17,11 +17,13 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -32,7 +34,6 @@ import java.util.Set;
|
|||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -41,16 +42,17 @@ import org.apache.hadoop.hbase.CellScanner;
|
|||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
@ -153,7 +155,7 @@ public abstract class AbstractTestFSWAL {
|
|||
}
|
||||
}
|
||||
|
||||
protected void addEdits(WAL log, RegionInfo hri, HTableDescriptor htd, int times,
|
||||
protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
|
||||
MultiVersionConcurrencyControl mvcc, NavigableMap<byte[], Integer> scopes)
|
||||
throws IOException {
|
||||
final byte[] row = Bytes.toBytes("row");
|
||||
|
@ -249,26 +251,20 @@ public abstract class AbstractTestFSWAL {
|
|||
conf1.setInt("hbase.regionserver.maxlogs", 1);
|
||||
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
|
||||
HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
|
||||
HTableDescriptor t1 =
|
||||
new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
|
||||
HTableDescriptor t2 =
|
||||
new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row"));
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName())
|
||||
.setStartKey(HConstants.EMPTY_START_ROW)
|
||||
.setEndKey(HConstants.EMPTY_END_ROW)
|
||||
.build();
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName())
|
||||
.setStartKey(HConstants.EMPTY_START_ROW)
|
||||
.setEndKey(HConstants.EMPTY_END_ROW)
|
||||
.build();
|
||||
TableDescriptor t1 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t1"))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
TableDescriptor t2 = TableDescriptorBuilder.newBuilder(TableName.valueOf("t2"))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
RegionInfo hri1 = RegionInfoBuilder.newBuilder(t1.getTableName()).build();
|
||||
RegionInfo hri2 = RegionInfoBuilder.newBuilder(t2.getTableName()).build();
|
||||
// add edits and roll the wal
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
NavigableMap<byte[], Integer> scopes1 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : t1.getFamiliesKeys()) {
|
||||
for (byte[] fam : t1.getColumnFamilyNames()) {
|
||||
scopes1.put(fam, 0);
|
||||
}
|
||||
NavigableMap<byte[], Integer> scopes2 = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : t2.getFamiliesKeys()) {
|
||||
for (byte[] fam : t2.getColumnFamilyNames()) {
|
||||
scopes2.put(fam, 0);
|
||||
}
|
||||
try {
|
||||
|
@ -293,12 +289,12 @@ public abstract class AbstractTestFSWAL {
|
|||
assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]);
|
||||
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
|
||||
// remain.
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
// only one wal should remain now (that is for the second region).
|
||||
assertEquals(1, wal.getNumRolledLogFiles());
|
||||
// flush the second region
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
// no wal should remain now.
|
||||
assertEquals(0, wal.getNumRolledLogFiles());
|
||||
|
@ -315,14 +311,14 @@ public abstract class AbstractTestFSWAL {
|
|||
regionsToFlush = wal.findRegionsToForceFlush();
|
||||
assertEquals(2, regionsToFlush.length);
|
||||
// flush both regions
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys());
|
||||
flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getColumnFamilyNames());
|
||||
wal.rollWriter(true);
|
||||
assertEquals(0, wal.getNumRolledLogFiles());
|
||||
// Add an edit to region1, and roll the wal.
|
||||
addEdits(wal, hri1, t1, 2, mvcc, scopes1);
|
||||
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
|
||||
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys());
|
||||
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
|
||||
wal.rollWriter();
|
||||
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
|
||||
assertEquals(1, wal.getNumRolledLogFiles());
|
||||
|
@ -364,15 +360,15 @@ public abstract class AbstractTestFSWAL {
|
|||
final TableName tableName = TableName.valueOf(testName);
|
||||
final RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
final byte[] rowName = tableName.getName();
|
||||
final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor("f"));
|
||||
final TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of("f")).build();
|
||||
HRegion r = HBaseTestingUtility.createRegionAndWAL(hri, TEST_UTIL.getDefaultRootDirPath(),
|
||||
TEST_UTIL.getConfiguration(), htd);
|
||||
HBaseTestingUtility.closeRegionAndWAL(r);
|
||||
final int countPerFamily = 10;
|
||||
final AtomicBoolean goslow = new AtomicBoolean(false);
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : htd.getFamiliesKeys()) {
|
||||
for (byte[] fam : htd.getColumnFamilyNames()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
// subclass and doctor a method.
|
||||
|
@ -392,15 +388,15 @@ public abstract class AbstractTestFSWAL {
|
|||
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
try {
|
||||
List<Put> puts = null;
|
||||
for (HColumnDescriptor hcd : htd.getFamilies()) {
|
||||
for (byte[] fam : htd.getColumnFamilyNames()) {
|
||||
puts =
|
||||
TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
|
||||
TestWALReplay.addRegionEdits(rowName, fam, countPerFamily, ee, region, "x");
|
||||
}
|
||||
|
||||
// Now assert edits made it in.
|
||||
final Get g = new Get(rowName);
|
||||
Result result = region.get(g);
|
||||
assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
|
||||
assertEquals(countPerFamily * htd.getColumnFamilyNames().size(), result.size());
|
||||
|
||||
// Construct a WALEdit and add it a few times to the WAL.
|
||||
WALEdit edits = new WALEdit();
|
||||
|
@ -445,4 +441,36 @@ public abstract class AbstractTestFSWAL {
|
|||
wal.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteEntryCanBeNull() throws IOException {
|
||||
String testName = currentTest.getMethodName();
|
||||
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
|
||||
CONF, null, true, null, null);
|
||||
wal.close();
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
RegionInfo ri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : td.getColumnFamilyNames()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
long timestamp = System.currentTimeMillis();
|
||||
byte[] row = Bytes.toBytes("row");
|
||||
WALEdit cols = new WALEdit();
|
||||
cols.add(new KeyValue(row, row, row, timestamp, row));
|
||||
WALKeyImpl key =
|
||||
new WALKeyImpl(ri.getEncodedNameAsBytes(), td.getTableName(), SequenceId.NO_SEQUENCE_ID,
|
||||
timestamp, WALKey.EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE, mvcc, scopes);
|
||||
try {
|
||||
wal.append(ri, key, cols, true);
|
||||
fail("Should fail since the wal has already been closed");
|
||||
} catch (IOException e) {
|
||||
// expected
|
||||
assertThat(e.getMessage(), containsString("log is closed"));
|
||||
// the WriteEntry should be null since we fail before setting it.
|
||||
assertNull(key.getWriteEntry());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,12 +32,14 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
|
||||
|
@ -97,7 +99,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
|||
SecurityException, IllegalArgumentException, IllegalAccessException {
|
||||
final String name = this.name.getMethodName();
|
||||
FSHLog log = new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME,
|
||||
CONF, null, true, null, null);
|
||||
CONF, null, true, null, null);
|
||||
try {
|
||||
Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler");
|
||||
ringBufferEventHandlerField.setAccessible(true);
|
||||
|
@ -107,14 +109,14 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
|||
FSHLog.RingBufferEventHandler.class.getDeclaredField("syncRunnerIndex");
|
||||
syncRunnerIndexField.setAccessible(true);
|
||||
syncRunnerIndexField.set(ringBufferEventHandler, Integer.MAX_VALUE - 1);
|
||||
HTableDescriptor htd =
|
||||
new HTableDescriptor(TableName.valueOf(this.name.getMethodName())).addFamily(new HColumnDescriptor("row"));
|
||||
TableDescriptor htd =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of("row")).build();
|
||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
for (byte[] fam : htd.getFamiliesKeys()) {
|
||||
for (byte[] fam : htd.getColumnFamilyNames()) {
|
||||
scopes.put(fam, 0);
|
||||
}
|
||||
HRegionInfo hri =
|
||||
new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
addEdits(log, hri, htd, 1, mvcc, scopes);
|
||||
|
@ -127,7 +129,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
|||
/**
|
||||
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
|
||||
*/
|
||||
@Test (timeout = 30000)
|
||||
@Test
|
||||
public void testUnflushedSeqIdTracking() throws IOException, InterruptedException {
|
||||
final String name = this.name.getMethodName();
|
||||
final byte[] b = Bytes.toBytes("b");
|
||||
|
@ -156,10 +158,10 @@ public class TestFSHLog extends AbstractTestFSWAL {
|
|||
});
|
||||
|
||||
// open a new region which uses this WAL
|
||||
HTableDescriptor htd =
|
||||
new HTableDescriptor(TableName.valueOf(this.name.getMethodName())).addFamily(new HColumnDescriptor(b));
|
||||
HRegionInfo hri =
|
||||
new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
|
||||
TableDescriptor htd =
|
||||
TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName()))
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
|
||||
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
|
||||
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
||||
final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log);
|
||||
ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||
|
|
Loading…
Reference in New Issue