HBASE-21828 Make sure we do not return CompletionException when locating region
This commit is contained in:
parent
946bc19242
commit
89c02dc0cc
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.RegionLocations;
|
|||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -66,12 +67,13 @@ class AsyncRegionLocator {
|
|||||||
}
|
}
|
||||||
future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
|
future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
|
||||||
}, timeoutNs, TimeUnit.NANOSECONDS);
|
}, timeoutNs, TimeUnit.NANOSECONDS);
|
||||||
return future.whenComplete((loc, error) -> {
|
FutureUtils.addListener(future, (loc, error) -> {
|
||||||
if (error != null && error.getClass() != TimeoutIOException.class) {
|
if (error != null && error.getClass() != TimeoutIOException.class) {
|
||||||
// cancel timeout task if we are not completed by it.
|
// cancel timeout task if we are not completed by it.
|
||||||
timeoutTask.cancel();
|
timeoutTask.cancel();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isMeta(TableName tableName) {
|
private boolean isMeta(TableName tableName) {
|
||||||
|
@ -28,14 +28,17 @@ import static org.junit.Assert.fail;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CompletionException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
@ -53,11 +56,11 @@ import org.junit.Test;
|
|||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({ MediumTests.class, ClientTests.class })
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
public class TestAsyncRegionLocatorTimeout {
|
public class TestAsyncRegionLocator {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestAsyncRegionLocatorTimeout.class);
|
HBaseClassTestRule.forClass(TestAsyncRegionLocator.class);
|
||||||
|
|
||||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
@ -96,7 +99,7 @@ public class TestAsyncRegionLocatorTimeout {
|
|||||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||||
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
|
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
|
||||||
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
|
CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
|
||||||
registry.getClusterId().get(), User.getCurrent());
|
registry.getClusterId().get(), User.getCurrent());
|
||||||
LOCATOR = CONN.getLocator();
|
LOCATOR = CONN.getLocator();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,7 +110,7 @@ public class TestAsyncRegionLocatorTimeout {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws InterruptedException, ExecutionException {
|
public void testTimeout() throws InterruptedException, ExecutionException {
|
||||||
SLEEP_MS = 1000;
|
SLEEP_MS = 1000;
|
||||||
long startNs = System.nanoTime();
|
long startNs = System.nanoTime();
|
||||||
try {
|
try {
|
||||||
@ -115,7 +118,6 @@ public class TestAsyncRegionLocatorTimeout {
|
|||||||
TimeUnit.MILLISECONDS.toNanos(500)).get();
|
TimeUnit.MILLISECONDS.toNanos(500)).get();
|
||||||
fail();
|
fail();
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
e.printStackTrace();
|
|
||||||
assertThat(e.getCause(), instanceOf(TimeoutIOException.class));
|
assertThat(e.getCause(), instanceOf(TimeoutIOException.class));
|
||||||
}
|
}
|
||||||
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
|
long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
|
||||||
@ -129,4 +131,22 @@ public class TestAsyncRegionLocatorTimeout {
|
|||||||
assertEquals(loc.getServerName(),
|
assertEquals(loc.getServerName(),
|
||||||
TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName());
|
TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoCompletionException() {
|
||||||
|
// make sure that we do not get CompletionException
|
||||||
|
SLEEP_MS = 0;
|
||||||
|
AtomicReference<Throwable> errorHolder = new AtomicReference<>();
|
||||||
|
try {
|
||||||
|
LOCATOR.getRegionLocation(TableName.valueOf("NotExist"), EMPTY_START_ROW,
|
||||||
|
RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1))
|
||||||
|
.whenComplete((r, e) -> errorHolder.set(e)).join();
|
||||||
|
fail();
|
||||||
|
} catch (CompletionException e) {
|
||||||
|
// join will return a CompletionException, which is OK
|
||||||
|
assertThat(e.getCause(), instanceOf(TableNotFoundException.class));
|
||||||
|
}
|
||||||
|
// but we need to make sure that we do not get a CompletionException in the callback
|
||||||
|
assertThat(errorHolder.get(), instanceOf(TableNotFoundException.class));
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user