diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java deleted file mode 100644 index 2b1700c3d23..00000000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java +++ /dev/null @@ -1,631 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.*; - -import java.io.EOFException; -import java.io.IOException; -import java.io.SyncFailedException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; -import java.nio.channels.ClosedChannelException; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; -import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.ipc.RemoteException; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ SmallTests.class, ClientTests.class }) -public class TestFastFailWithoutTestUtil { - private static final Log LOG = LogFactory.getLog(TestFastFailWithoutTestUtil.class); - - @Test - public void testInterceptorFactoryMethods() { - Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); - RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory( - conf); - - RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory - .build(); - assertTrue("We should be getting a PreemptiveFastFailInterceptor", - interceptorBeforeCast instanceof PreemptiveFastFailInterceptor); - PreemptiveFastFailInterceptor interceptor = (PreemptiveFastFailInterceptor) interceptorBeforeCast; - - RetryingCallerInterceptorContext contextBeforeCast = interceptor - .createEmptyContext(); - assertTrue( - "We should be getting a FastFailInterceptorContext since we are interacting with the" - + " PreemptiveFastFailInterceptor", - contextBeforeCast instanceof FastFailInterceptorContext); - - FastFailInterceptorContext context = (FastFailInterceptorContext) contextBeforeCast; - assertTrue(context != null); - - conf = HBaseConfiguration.create(); - interceptorFactory = new RetryingCallerInterceptorFactory(conf); - - interceptorBeforeCast = interceptorFactory.build(); - assertTrue( - "We should be getting a NoOpRetryableCallerInterceptor since we disabled PFFE", - interceptorBeforeCast instanceof NoOpRetryableCallerInterceptor); - - contextBeforeCast = interceptorBeforeCast.createEmptyContext(); - assertTrue( - "We should be getting a NoOpRetryingInterceptorContext from NoOpRetryableCallerInterceptor", - contextBeforeCast instanceof NoOpRetryingInterceptorContext); - - assertTrue(context != null); - } - - @Test - public void testInterceptorContextClear() { - PreemptiveFastFailInterceptor interceptor = createPreemptiveInterceptor(); - FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor - .createEmptyContext(); - context.clear(); - assertFalse(context.getCouldNotCommunicateWithServer().booleanValue()); - assertEquals(context.didTry(), false); - assertEquals(context.getFailureInfo(), null); - assertEquals(context.getServer(), null); - assertEquals(context.getTries(), 0); - } - - @Test - public void testInterceptorContextPrepare() throws IOException { - PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil - .createPreemptiveInterceptor(); - FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor - .createEmptyContext(); - RetryingCallable callable = new RegionServerCallable(null, - null, null) { - @Override - public Boolean call(int callTimeout) throws Exception { - return true; - } - - @Override - protected HRegionLocation getLocation() { - return new HRegionLocation(null, ServerName.valueOf("localhost", 1234, - 987654321)); - } - }; - context.prepare(callable); - ServerName server = getSomeServerName(); - assertEquals(context.getServer(), server); - context.clear(); - context.prepare(callable, 2); - assertEquals(context.getServer(), server); - } - - @Ignore @Test - public void testInterceptorIntercept50Times() throws IOException, - InterruptedException { - for (int i = 0; i < 50; i++) { - testInterceptorIntercept(); - } - } - - public void testInterceptorIntercept() throws IOException, - InterruptedException { - Configuration conf = HBaseConfiguration.create(); - long CLEANUP_TIMEOUT = 50; - long FAST_FAIL_THRESHOLD = 10; - conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); - conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, - CLEANUP_TIMEOUT); - conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, - FAST_FAIL_THRESHOLD); - - PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil - .createPreemptiveInterceptor(conf); - FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor - .createEmptyContext(); - - RetryingCallable callable = getDummyRetryingCallable(getSomeServerName()); - - // Lets simulate some work flow here. - int tries = 0; - context.prepare(callable, tries); - interceptor.intercept(context); - interceptor.handleFailure(context, new ConnectException( - "Failed to connect to server")); - interceptor.updateFailureInfo(context); - assertTrue("Interceptor should have updated didTry to true", - context.didTry()); - assertTrue( - "The call shouldn't have been successful if there was a ConnectException", - context.getCouldNotCommunicateWithServer().booleanValue()); - assertNull( - "Once a failure is identified, the first time the FailureInfo is generated for the server," - + " but it is not assigned to the context yet. It would be assigned on the next" - + " intercept.", context.getFailureInfo()); - assertEquals(context.getTries(), tries); - assertFalse( - "We are still in the first attempt and so we dont set this variable to true yet.", - context.isRetryDespiteFastFailMode()); - - Thread.sleep(FAST_FAIL_THRESHOLD + 1); // We sleep so as to make sure that - // we - // actually consider this server as a - // dead server in the next attempt. - tries++; - - context.prepare(callable, tries); - interceptor.intercept(context); - interceptor.handleFailure(context, new ConnectException( - "Failed to connect to server")); - interceptor.updateFailureInfo(context); - assertTrue("didTru should remain true", context.didTry()); - assertTrue( - "The call shouldn't have been successful if there was a ConnectException", - context.getCouldNotCommunicateWithServer().booleanValue()); - assertNotNull( - "The context this time is updated with a failureInfo, since we already gave it a try.", - context.getFailureInfo()); - assertEquals(context.getTries(), tries); - assertTrue( - "Since we are alone here we would be given the permission to retryDespiteFailures.", - context.isRetryDespiteFastFailMode()); - context.clear(); - - Thread.sleep(CLEANUP_TIMEOUT); // Lets try and cleanup the data in the fast - // fail failure maps. - - tries++; - - context.clear(); - context.prepare(callable, tries); - interceptor.occasionallyCleanupFailureInformation(); - assertNull("The cleanup should have cleared the server", - interceptor.repeatedFailuresMap.get(context.getServer())); - interceptor.intercept(context); - interceptor.handleFailure(context, new ConnectException( - "Failed to connect to server")); - interceptor.updateFailureInfo(context); - assertTrue("didTru should remain true", context.didTry()); - assertTrue( - "The call shouldn't have been successful if there was a ConnectException", - context.getCouldNotCommunicateWithServer().booleanValue()); - assertNull("The failureInfo is cleared off from the maps.", - context.getFailureInfo()); - assertEquals(context.getTries(), tries); - assertFalse( - "Since we are alone here we would be given the permission to retryDespiteFailures.", - context.isRetryDespiteFastFailMode()); - context.clear(); - - } - - private RetryingCallable getDummyRetryingCallable( - ServerName someServerName) { - return new RegionServerCallable(null, null, null) { - @Override - public T call(int callTimeout) throws Exception { - return null; - } - - @Override - protected HRegionLocation getLocation() { - return new HRegionLocation(null, serverName); - } - }; - } - - @Test - public void testExceptionsIdentifiedByInterceptor() throws IOException { - Throwable[] networkexceptions = new Throwable[] { - new ConnectException("Mary is unwell"), - new SocketTimeoutException("Mike is too late"), - new ClosedChannelException(), - new SyncFailedException("Dave is not on the same page"), - new TimeoutException("Mike is late again"), - new EOFException("This is the end... "), - new ConnectionClosingException("Its closing") }; - final String INDUCED = "Induced"; - Throwable[] nonNetworkExceptions = new Throwable[] { - new IOException("Bob died"), - new RemoteException("Bob's cousin died", null), - new NoSuchMethodError(INDUCED), new NullPointerException(INDUCED), - new DoNotRetryIOException(INDUCED), new Error(INDUCED) }; - - Configuration conf = HBaseConfiguration.create(); - long CLEANUP_TIMEOUT = 0; - long FAST_FAIL_THRESHOLD = 1000000; - conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); - conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, - CLEANUP_TIMEOUT); - conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, - FAST_FAIL_THRESHOLD); - for (Throwable e : networkexceptions) { - PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil - .createPreemptiveInterceptor(conf); - FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor - .createEmptyContext(); - - RetryingCallable callable = getDummyRetryingCallable(getSomeServerName()); - context.prepare(callable, 0); - interceptor.intercept(context); - interceptor.handleFailure(context, e); - interceptor.updateFailureInfo(context); - assertTrue( - "The call shouldn't have been successful if there was a ConnectException", - context.getCouldNotCommunicateWithServer().booleanValue()); - } - for (Throwable e : nonNetworkExceptions) { - try { - PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil - .createPreemptiveInterceptor(conf); - FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor - .createEmptyContext(); - - RetryingCallable callable = getDummyRetryingCallable(getSomeServerName()); - context.prepare(callable, 0); - interceptor.intercept(context); - interceptor.handleFailure(context, e); - interceptor.updateFailureInfo(context); - assertFalse( - "The call shouldn't have been successful if there was a ConnectException", - context.getCouldNotCommunicateWithServer().booleanValue()); - } catch (NoSuchMethodError t) { - assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); - } catch (NullPointerException t) { - assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); - } catch (DoNotRetryIOException t) { - assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); - } catch (Error t) { - assertTrue("Exception not induced", t.getMessage().contains(INDUCED)); - } - } - } - - protected static PreemptiveFastFailInterceptor createPreemptiveInterceptor( - Configuration conf) { - conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); - RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory( - conf); - RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory - .build(); - return (PreemptiveFastFailInterceptor) interceptorBeforeCast; - } - - static PreemptiveFastFailInterceptor createPreemptiveInterceptor() { - Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); - return createPreemptiveInterceptor(conf); - } - - @Test(timeout = 120000) - public void testPreemptiveFastFailException50Times() - throws InterruptedException, ExecutionException { - for (int i = 0; i < 50; i++) { - testPreemptiveFastFailException(); - } - } - - /*** - * This test tries to create a thread interleaving of the 2 threads trying to do a - * Retrying operation using a {@link PreemptiveFastFailInterceptor}. The goal here is to make sure - * that the second thread will be attempting the operation while the first thread is in the - * process of making an attempt after it has marked the server in fast fail. - * - * The thread execution is as follows : - * The PreemptiveFastFailInterceptor is extended in this test to achieve a good interleaving - * behavior without using any thread sleeps. - * - * Privileged Thread 1 NonPrivileged Thread 2 - * - * Retry 0 : intercept - * - * Retry 0 : handleFailure - * latches[0].countdown - * latches2[0].await - * latches[0].await - * intercept : Retry 0 - * - * handleFailure : Retry 0 - * - * updateFailureinfo : Retry 0 - * latches2[0].countdown - * - * Retry 0 : updateFailureInfo - * - * Retry 1 : intercept - * - * Retry 1 : handleFailure - * latches[1].countdown - * latches2[1].await - * - * latches[1].await - * intercept : Retry 1 - * (throws PFFE) - * handleFailure : Retry 1 - * - * updateFailureinfo : Retry 1 - * latches2[1].countdown - * Retry 1 : updateFailureInfo - * - * - * See getInterceptor() for more details on the interceptor implementation to make sure this - * thread interleaving is achieved. - * - * We need 2 sets of latches of size MAX_RETRIES. We use an AtomicInteger done to make sure that - * we short circuit the Thread 1 after we hit the PFFE on Thread 2 - * - * - * @throws InterruptedException - * @throws ExecutionException - */ - private void testPreemptiveFastFailException() throws InterruptedException, - ExecutionException { - LOG.debug("Setting up the counters to start the test"); - priviRetryCounter.set(0); - nonPriviRetryCounter.set(0); - done.set(0); - - for (int i = 0; i <= RETRIES; i++) { - latches[i] = new CountDownLatch(1); - latches2[i] = new CountDownLatch(1); - } - - PreemptiveFastFailInterceptor interceptor = getInterceptor(); - - final RpcRetryingCaller priviCaller = getRpcRetryingCaller( - PAUSE_TIME, RETRIES, interceptor); - final RpcRetryingCaller nonPriviCaller = getRpcRetryingCaller( - PAUSE_TIME, RETRIES, interceptor); - - LOG.debug("Submitting the thread 1"); - Future priviFuture = executor.submit(new Callable() { - @Override - public Boolean call() throws Exception { - try { - isPriviThreadLocal.get().set(true); - priviCaller - .callWithRetries( - getRetryingCallable(serverName, exception), - CLEANUP_TIMEOUT); - } catch (RetriesExhaustedException e) { - return true; - } catch (PreemptiveFastFailException e) { - return false; - } - return false; - } - }); - LOG.debug("Submitting the thread 2"); - Future nonPriviFuture = executor.submit(new Callable() { - @Override - public Boolean call() throws Exception { - try { - isPriviThreadLocal.get().set(false); - nonPriviCaller.callWithRetries( - getRetryingCallable(serverName, exception), - CLEANUP_TIMEOUT); - } catch (PreemptiveFastFailException e) { - return true; - } - return false; - } - }); - LOG.debug("Waiting for Thread 2 to finish"); - try { - nonPriviFuture.get(30, TimeUnit.SECONDS); - assertTrue(nonPriviFuture.get()); - } catch (TimeoutException e) { - Threads.printThreadInfo(System.out, - "This should not hang but seems to sometimes...FIX! Here is a thread dump!"); - } - - LOG.debug("Waiting for Thread 1 to finish"); - try { - priviFuture.get(30, TimeUnit.SECONDS); - assertTrue(priviFuture.get()); - } catch (TimeoutException e) { - // There is something wrong w/ the latching but don't have time to fix. If timesout, just - // let it go for now till someone has time to look. Meantime, here is thread dump. - Threads.printThreadInfo(System.out, - "This should not hang but seems to sometimes...FIX! Here is a thread dump!"); - } - - // Now that the server in fast fail mode. Lets try to make contact with the - // server with a third thread. And make sure that when there is no - // exception, - // the fast fail gets cleared up. - assertTrue(interceptor.isServerInFailureMap(serverName)); - final RpcRetryingCaller priviCallerNew = getRpcRetryingCaller( - PAUSE_TIME, RETRIES, interceptor); - executor.submit(new Callable() { - @Override - public Boolean call() throws Exception { - priviCallerNew.callWithRetries( - getRetryingCallable(serverName, null), CLEANUP_TIMEOUT); - return false; - } - }).get(); - assertFalse("The server was supposed to be removed from the map", - interceptor.isServerInFailureMap(serverName)); - } - - ExecutorService executor = Executors.newCachedThreadPool(); - - /** - * Some timeouts to make the test execution resonable. - */ - final int PAUSE_TIME = 10; - final int RETRIES = 3; - final int CLEANUP_TIMEOUT = 10000; - final long FAST_FAIL_THRESHOLD = PAUSE_TIME / 1; - - /** - * The latches necessary to make the thread interleaving possible. - */ - final CountDownLatch[] latches = new CountDownLatch[RETRIES + 1]; - final CountDownLatch[] latches2 = new CountDownLatch[RETRIES + 1]; - final AtomicInteger done = new AtomicInteger(0); - - /** - * Global retry counters that give us an idea about which iteration of the retry we are in - */ - final AtomicInteger priviRetryCounter = new AtomicInteger(); - final AtomicInteger nonPriviRetryCounter = new AtomicInteger(); - final ServerName serverName = getSomeServerName(); - - /** - * The variable which is used as an identifier within the 2 threads. - */ - public final ThreadLocal isPriviThreadLocal = new ThreadLocal() { - @Override - public AtomicBoolean initialValue() { - return new AtomicBoolean(true); - } - }; - final Exception exception = new ConnectionClosingException("The current connection is closed"); - - public PreemptiveFastFailInterceptor getInterceptor() { - final Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true); - conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS, - CLEANUP_TIMEOUT); - conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, - FAST_FAIL_THRESHOLD); - - return new PreemptiveFastFailInterceptor( - conf) { - @Override - public void updateFailureInfo(RetryingCallerInterceptorContext context) { - boolean pffe = false; - if (!isPriviThreadLocal.get().get()) { - pffe = !((FastFailInterceptorContext)context).isRetryDespiteFastFailMode(); - } - if (isPriviThreadLocal.get().get()) { - try { - // Thread 2 should be done by 2 iterations. We should short circuit Thread 1 because - // Thread 2 would be dead and can't do a countdown. - if (done.get() <= 1) { - latches2[priviRetryCounter.get()].await(); - } - } catch (InterruptedException e) { - fail(); - } - } - super.updateFailureInfo(context); - if (!isPriviThreadLocal.get().get()) { - if (pffe) done.incrementAndGet(); - latches2[nonPriviRetryCounter.get()].countDown(); - } - } - - @Override - public void intercept(RetryingCallerInterceptorContext context) - throws PreemptiveFastFailException { - if (!isPriviThreadLocal.get().get()) { - try { - latches[nonPriviRetryCounter.getAndIncrement()].await(); - } catch (InterruptedException e) { - fail(); - } - } - super.intercept(context); - } - - @Override - public void handleFailure(RetryingCallerInterceptorContext context, - Throwable t) throws IOException { - super.handleFailure(context, t); - if (isPriviThreadLocal.get().get()) { - latches[priviRetryCounter.getAndIncrement()].countDown(); - } - } - }; - } - - public RpcRetryingCaller getRpcRetryingCaller(int pauseTime, - int retries, RetryingCallerInterceptor interceptor) { - return new RpcRetryingCallerImpl(pauseTime, retries, interceptor, 9) { - @Override - public Void callWithRetries(RetryingCallable callable, - int callTimeout) throws IOException, RuntimeException { - Void ret = super.callWithRetries(callable, callTimeout); - return ret; - } - }; - } - - protected static ServerName getSomeServerName() { - return ServerName.valueOf("localhost", 1234, 987654321); - } - - private RegionServerCallable getRetryingCallable( - final ServerName serverName, final Exception e) { - return new RegionServerCallable(null, null, null) { - @Override - public void prepare(boolean reload) throws IOException { - this.location = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, - serverName); - } - - @Override - public Void call(int callTimeout) throws Exception { - if (e != null) - throw e; - return null; - } - - @Override - protected HRegionLocation getLocation() { - return new HRegionLocation(null, serverName); - } - - @Override - public void throwable(Throwable t, boolean retrying) { - // Do nothing - } - - @Override - public long sleep(long pause, int tries) { - return ConnectionUtils.getPauseTime(pause, tries + 1); - } - }; - } -}