HBASE-19972 Should rethrow the RetriesExhaustedWithDetailsException when failed to apply the batch in ReplicationSink

This commit is contained in:
huzheng 2018-02-11 14:48:21 +08:00
parent 0593dda663
commit 629eaf8b74
2 changed files with 40 additions and 1 deletions

View File

@ -404,9 +404,10 @@ public class ReplicationSink {
} catch (RetriesExhaustedWithDetailsException rewde) {
for (Throwable ex : rewde.getCauses()) {
if (ex instanceof TableNotFoundException) {
throw new TableNotFoundException("'"+tableName+"'");
throw new TableNotFoundException("'" + tableName + "'");
}
}
throw rewde;
} catch (InterruptedException ix) {
throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
} finally {

View File

@ -44,12 +44,15 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -58,6 +61,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@ -271,6 +275,40 @@ public class TestReplicationSink {
assertEquals(0, res.size());
}
@Test
public void testRethrowRetriesExhaustedWithDetailsException() throws Exception {
TableName notExistTable = TableName.valueOf("notExistTable");
List<WALEntry> entries = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
for (int i = 0; i < 10; i++) {
entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
}
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw TableNotFoundException.");
} catch (TableNotFoundException e) {
}
entries.clear();
cells.clear();
for (int i = 0; i < 10; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
try (Admin admin = conn.getAdmin()) {
admin.disableTable(TABLE_NAME1);
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw RetriesExhaustedWithDetailsException.");
} catch (RetriesExhaustedWithDetailsException e) {
} finally {
admin.enableTable(TABLE_NAME1);
}
}
}
}
/**
* Test replicateEntries with a bulk load entry for 25 HFiles
*/