HBASE-21978 Should close AsyncRegistry if we fail to get cluster id when creating AsyncConnection
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
0d882bbc2b
commit
00e6208de0
|
@ -277,7 +277,6 @@ public class ConnectionFactory {
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
* @param user the user the asynchronous connection is for
|
* @param user the user the asynchronous connection is for
|
||||||
* @return AsyncConnection object wrapped by CompletableFuture
|
* @return AsyncConnection object wrapped by CompletableFuture
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
|
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
|
||||||
final User user) {
|
final User user) {
|
||||||
|
@ -285,10 +284,12 @@ public class ConnectionFactory {
|
||||||
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
|
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
|
||||||
addListener(registry.getClusterId(), (clusterId, error) -> {
|
addListener(registry.getClusterId(), (clusterId, error) -> {
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
|
registry.close();
|
||||||
future.completeExceptionally(error);
|
future.completeExceptionally(error);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (clusterId == null) {
|
if (clusterId == null) {
|
||||||
|
registry.close();
|
||||||
future.completeExceptionally(new IOException("clusterid came back null"));
|
future.completeExceptionally(new IOException("clusterid came back null"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -299,6 +300,7 @@ public class ConnectionFactory {
|
||||||
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
|
user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
|
||||||
.newInstance(clazz, conf, registry, clusterId, user)));
|
.newInstance(clazz, conf, registry, clusterId, user)));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
registry.close();
|
||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ ClientTests.class, SmallTests.class })
|
||||||
|
public class TestAsyncRegistryLeak {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestAsyncRegistryLeak.class);
|
||||||
|
|
||||||
|
public static final class AsyncRegistryForTest extends DoNothingAsyncRegistry {
|
||||||
|
|
||||||
|
private boolean closed = false;
|
||||||
|
|
||||||
|
public AsyncRegistryForTest(Configuration conf) {
|
||||||
|
super(conf);
|
||||||
|
CREATED.add(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<String> getClusterId() {
|
||||||
|
return FutureUtils.failedFuture(new IOException("inject error"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final List<AsyncRegistryForTest> CREATED = new ArrayList<>();
|
||||||
|
|
||||||
|
private static Configuration CONF = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() {
|
||||||
|
CONF.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, AsyncRegistryForTest.class,
|
||||||
|
AsyncRegistry.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws InterruptedException {
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
try {
|
||||||
|
ConnectionFactory.createAsyncConnection(CONF).get();
|
||||||
|
fail();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(10, CREATED.size());
|
||||||
|
CREATED.forEach(r -> assertTrue(r.closed));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue