diff --git a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java index 2fd6877a965..811fe59ce01 100644 --- a/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java +++ b/common/src/main/java/io/druid/common/guava/ThreadRenamingCallable.java @@ -35,7 +35,7 @@ public abstract class ThreadRenamingCallable implements Callable } @Override - public final T call() + public final T call() throws Exception { final Thread currThread = Thread.currentThread(); String currName = currThread.getName(); @@ -48,5 +48,5 @@ public abstract class ThreadRenamingCallable implements Callable } } - public abstract T doCall(); + public abstract T doCall() throws Exception; } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index a9eb40611d0..e769d0efa04 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -33,6 +33,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -46,6 +47,7 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RE; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.Execs; @@ -137,6 +139,8 @@ public class AppenderatorImpl implements Appenderator // and abandon threads do not step over each other private final Lock commitLock = new ReentrantLock(); + private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile ListeningExecutorService persistExecutor = null; private volatile ListeningExecutorService pushExecutor = null; // use intermediate executor so that deadlock conditions can be prevented @@ -146,7 +150,8 @@ public class AppenderatorImpl implements Appenderator private volatile long nextFlush; private volatile FileLock basePersistDirLock = null; private volatile FileChannel basePersistDirLockChannel = null; - private AtomicBoolean closed = new AtomicBoolean(false); + + private volatile Throwable persistError; AppenderatorImpl( DataSchema schema, @@ -204,6 +209,13 @@ public class AppenderatorImpl implements Appenderator return retVal; } + private void throwPersistErrorIfExists() + { + if (persistError != null) { + throw new RE(persistError, "Error while persisting"); + } + } + @Override public AppenderatorAddResult add( final SegmentIdentifier identifier, @@ -212,6 +224,8 @@ public class AppenderatorImpl implements Appenderator final boolean allowIncrementalPersists ) throws IndexSizeExceededException, SegmentNotWritableException { + throwPersistErrorIfExists(); + if (!identifier.getDataSource().equals(schema.getDataSource())) { throw new IAE( "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", @@ -286,7 +300,23 @@ public class AppenderatorImpl implements Appenderator if (allowIncrementalPersists) { // persistAll clears rowsCurrentlyInMemory, no need to update it. log.info("Persisting rows in memory due to: [%s]", String.join(",", persistReasons)); - persistAll(committerSupplier == null ? null : committerSupplier.get()); + Futures.addCallback( + persistAll(committerSupplier == null ? null : committerSupplier.get()), + new FutureCallback() + { + @Override + public void onSuccess(@Nullable Object result) + { + // do nothing + } + + @Override + public void onFailure(Throwable t) + { + persistError = t; + } + } + ); } else { isPersistRequired = true; } @@ -401,6 +431,8 @@ public class AppenderatorImpl implements Appenderator // Drop commit metadata, then abandon all segments. try { + throwPersistErrorIfExists(); + if (persistExecutor != null) { final ListenableFuture uncommitFuture = persistExecutor.submit( new Callable() @@ -434,7 +466,7 @@ public class AppenderatorImpl implements Appenderator } } catch (ExecutionException e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } @@ -452,6 +484,8 @@ public class AppenderatorImpl implements Appenderator @Override public ListenableFuture persistAll(@Nullable final Committer committer) { + throwPersistErrorIfExists(); + final Map currentHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); int numPersistedRows = 0; @@ -490,7 +524,7 @@ public class AppenderatorImpl implements Appenderator new ThreadRenamingCallable(threadName) { @Override - public Object doCall() + public Object doCall() throws IOException { try { for (Pair pair : indexesToPersist) { @@ -532,9 +566,9 @@ public class AppenderatorImpl implements Appenderator // return null if committer is null return commitMetadata; } - catch (Exception e) { + catch (IOException e) { metrics.incrementFailedPersists(); - throw Throwables.propagate(e); + throw e; } finally { metrics.incrementNumPersists();