mirror of https://github.com/apache/druid.git
Fix sporadic fail of URIExtractionNamespaceFunctionFactoryTest#testReverseFunction
This commit is contained in:
parent
628643d80e
commit
31b205afcd
|
@ -336,150 +336,72 @@ public abstract class NamespaceExtractionCacheManager
|
|||
{
|
||||
final String namespaceName = namespace.getNamespace();
|
||||
log.debug("Trying to update namespace [%s]", namespaceName);
|
||||
final AtomicReference<NamespaceImplData> implDatum = new AtomicReference<>(implData.get(namespaceName));
|
||||
if (implDatum.get() != null) {
|
||||
synchronized (implDatum.get().enabled) {
|
||||
if (implDatum.get().enabled.get()) {
|
||||
final NamespaceImplData implDatum = implData.get(namespaceName);
|
||||
if (implDatum != null) {
|
||||
synchronized (implDatum.enabled) {
|
||||
if (implDatum.enabled.get()) {
|
||||
// We also check at the end of the function, but fail fast here
|
||||
throw new IAE("Namespace [%s] already exists! Leaving prior running", namespace.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
final long updateMs = namespace.getPollMs();
|
||||
final CountDownLatch startLatch = new CountDownLatch(1);
|
||||
|
||||
final Runnable command = new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
startLatch.await(); // wait for "election" to leadership or cancellation
|
||||
if (!Thread.currentThread().isInterrupted()) {
|
||||
final Map<String, String> cache = getCacheMap(cacheId);
|
||||
final String preVersion = lastVersion.get(namespaceName);
|
||||
final Callable<String> runnable = factory.getCachePopulator(namespace, preVersion, cache);
|
||||
|
||||
tasksStarted.incrementAndGet();
|
||||
final String newVersion = runnable.call();
|
||||
if (preVersion != null && preVersion.equals(newVersion)) {
|
||||
throw new IllegalStateException("Already exists");
|
||||
}
|
||||
if (newVersion != null) {
|
||||
lastVersion.put(namespaceName, newVersion);
|
||||
}
|
||||
postRunnable.run();
|
||||
log.debug("Namespace [%s] successfully updated", namespaceName);
|
||||
}
|
||||
}
|
||||
catch (Throwable t) {
|
||||
delete(cacheId);
|
||||
if (t instanceof CancellationException) {
|
||||
log.debug(t, "Namespace [%s] cancelled", namespaceName);
|
||||
} else {
|
||||
log.error(t, "Failed update namespace [%s]", namespace);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ListenableFuture<?> future;
|
||||
try {
|
||||
ListenableFuture<?> future = null;
|
||||
try {
|
||||
if (namespace.getPollMs() > 0) {
|
||||
final long updateMs = namespace.getPollMs();
|
||||
future = listeningScheduledExecutorService.scheduleAtFixedRate(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try {
|
||||
startLatch.await(); // wait for "election" to leadership or cancellation
|
||||
if (!Thread.interrupted()) {
|
||||
final Map<String, String> cache = getCacheMap(cacheId);
|
||||
final String preVersion = lastVersion.get(namespaceName);
|
||||
final Callable<String> runnable = factory.getCachePopulator(namespace, preVersion, cache);
|
||||
tasksStarted.incrementAndGet();
|
||||
final String newVersion = runnable.call();
|
||||
if (newVersion != null) {
|
||||
lastVersion.put(namespaceName, newVersion);
|
||||
}
|
||||
if (preVersion == null || !preVersion.equals(lastVersion.get(namespaceName))) {
|
||||
postRunnable.run();
|
||||
} else {
|
||||
delete(cacheId);
|
||||
}
|
||||
} else {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (e instanceof CancellationException) {
|
||||
log.debug("Thread for namespace[%s] cancelled", namespaceName);
|
||||
} else {
|
||||
log.error(e, "Error in listener for namespace [%s]", namespaceName);
|
||||
}
|
||||
// Don't leave stale cache on error
|
||||
delete(cacheId);
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
},
|
||||
0,
|
||||
updateMs,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
} else {
|
||||
final Map<String, String> cache = getCacheMap(cacheId);
|
||||
final Callable<String> runnable = factory.getCachePopulator(namespace, null, cache);
|
||||
final ListenableFuture<String> futureWithString = listeningScheduledExecutorService.schedule(
|
||||
new Callable<String>()
|
||||
{
|
||||
@Override
|
||||
public String call() throws Exception
|
||||
{
|
||||
startLatch.await(); // wait for "election" to leadership or cancellation
|
||||
tasksStarted.incrementAndGet();
|
||||
return runnable.call();
|
||||
}
|
||||
},
|
||||
0,
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
Futures.addCallback(
|
||||
futureWithString, new FutureCallback<String>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(String result)
|
||||
{
|
||||
try {
|
||||
postRunnable.run();
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
delete(cacheId);
|
||||
throw e;
|
||||
}
|
||||
// Must have been set in order to make it here
|
||||
if (implDatum.get().enabled.get()) {
|
||||
lastVersion.put(namespaceName, result);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t)
|
||||
{
|
||||
// NOOP
|
||||
}
|
||||
}
|
||||
);
|
||||
future = futureWithString;
|
||||
}
|
||||
if (updateMs > 0) {
|
||||
future = listeningScheduledExecutorService.scheduleAtFixedRate(command, 0, updateMs, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
future = listeningScheduledExecutorService.schedule(command, 0, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (future != null) {
|
||||
if (!future.isDone() && !future.cancel(true)) {
|
||||
log.warn("Could not cancel future for [%s]", namespaceName);
|
||||
}
|
||||
}
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
Futures.addCallback(
|
||||
future, new FutureCallback<Object>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(@Nullable Object result)
|
||||
{
|
||||
log.debug("Namespace [%s] successfully updated", namespaceName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t)
|
||||
{
|
||||
delete(cacheId);
|
||||
if (t instanceof CancellationException) {
|
||||
log.debug(t, "Namespace [%s] cancelled", namespaceName);
|
||||
} else {
|
||||
log.error(t, "Failed update namespace [%s]", namespace);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
final NamespaceImplData me = new NamespaceImplData(future, namespace, namespaceName);
|
||||
final NamespaceImplData other = implData.putIfAbsent(namespaceName, me);
|
||||
if (other != null) {
|
||||
if (!future.isDone() && !future.cancel(true)) {
|
||||
log.warn("Unable to cancle future for namespace[%s] on race loss", namespaceName);
|
||||
log.warn("Unable to cancel future for namespace[%s] on race loss", namespaceName);
|
||||
}
|
||||
throw new IAE("Namespace [%s] already exists! Leaving prior running", namespace);
|
||||
} else {
|
||||
if (!me.enabled.compareAndSet(false, true)) {
|
||||
log.wtf("How did someone enable this before ME?");
|
||||
}
|
||||
implDatum.set(me);
|
||||
log.debug("I own namespace [%s]", namespaceName);
|
||||
return future;
|
||||
}
|
||||
|
|
|
@ -308,10 +308,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
|
|||
Assert.assertNull(fnCache.get(namespace.getNamespace()));
|
||||
NamespaceExtractionCacheManagersTest.waitFor(manager.schedule(namespace));
|
||||
Function<String, String> fn = fnCache.get(namespace.getNamespace());
|
||||
while (fn == null) {
|
||||
Thread.sleep(1);
|
||||
fn = fnCache.get(namespace.getNamespace());
|
||||
}
|
||||
Assert.assertNotNull(fn);
|
||||
Assert.assertEquals("bar", fn.apply("foo"));
|
||||
Assert.assertEquals(null, fn.apply("baz"));
|
||||
}
|
||||
|
@ -355,10 +352,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
|
|||
for (int i = 0; i < size; ++i) {
|
||||
URIExtractionNamespace namespace = namespaces.get(i);
|
||||
Function<String, String> fn = fnCache.get(namespace.getNamespace());
|
||||
while (fn == null) {
|
||||
Thread.sleep(1);
|
||||
fn = fnCache.get(namespace.getNamespace());
|
||||
}
|
||||
Assert.assertNotNull(fn);
|
||||
Assert.assertEquals("bar", fn.apply("foo"));
|
||||
Assert.assertEquals(null, fn.apply("baz"));
|
||||
manager.delete(namespace.getNamespace());
|
||||
|
|
|
@ -67,6 +67,7 @@ import java.util.concurrent.ExecutionException;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -46,6 +46,8 @@ import java.util.List;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -145,31 +147,16 @@ public class NamespaceExtractionCacheManagersTest
|
|||
Assert.assertArrayEquals(nsList.toArray(), retvalList.toArray());
|
||||
}
|
||||
|
||||
public static void waitFor(ListenableFuture<?> future) throws InterruptedException
|
||||
public static void waitFor(Future<?> future) throws InterruptedException
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Futures.addCallback(
|
||||
future, new FutureCallback<Object>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(Object result)
|
||||
{
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t)
|
||||
{
|
||||
try {
|
||||
log.error(t, "Error waiting");
|
||||
throw Throwables.propagate(t);
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
latch.await();
|
||||
while (!future.isDone()) {
|
||||
try {
|
||||
future.get();
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
log.error(e.getCause(), "Error waiting");
|
||||
throw Throwables.propagate(e.getCause());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue