Proper handling of the exceptions from auto persisting in AppenderatorImpl.add() (#5932)

This commit is contained in:
Jihoon Son 2018-07-04 23:42:41 -07:00 committed by Gian Merlino
parent 39371b0ff8
commit 4cd14e8158
2 changed files with 42 additions and 8 deletions

View File

@ -35,7 +35,7 @@ public abstract class ThreadRenamingCallable<T> implements Callable<T>
} }
@Override @Override
public final T call() public final T call() throws Exception
{ {
final Thread currThread = Thread.currentThread(); final Thread currThread = Thread.currentThread();
String currName = currThread.getName(); String currName = currThread.getName();
@ -48,5 +48,5 @@ public abstract class ThreadRenamingCallable<T> implements Callable<T>
} }
} }
public abstract T doCall(); public abstract T doCall() throws Exception;
} }

View File

@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
@ -46,6 +47,7 @@ import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE; import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.concurrent.Execs;
@ -137,6 +139,8 @@ public class AppenderatorImpl implements Appenderator
// and abandon threads do not step over each other // and abandon threads do not step over each other
private final Lock commitLock = new ReentrantLock(); private final Lock commitLock = new ReentrantLock();
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile ListeningExecutorService persistExecutor = null; private volatile ListeningExecutorService persistExecutor = null;
private volatile ListeningExecutorService pushExecutor = null; private volatile ListeningExecutorService pushExecutor = null;
// use intermediate executor so that deadlock conditions can be prevented // use intermediate executor so that deadlock conditions can be prevented
@ -146,7 +150,8 @@ public class AppenderatorImpl implements Appenderator
private volatile long nextFlush; private volatile long nextFlush;
private volatile FileLock basePersistDirLock = null; private volatile FileLock basePersistDirLock = null;
private volatile FileChannel basePersistDirLockChannel = null; private volatile FileChannel basePersistDirLockChannel = null;
private AtomicBoolean closed = new AtomicBoolean(false);
private volatile Throwable persistError;
AppenderatorImpl( AppenderatorImpl(
DataSchema schema, DataSchema schema,
@ -204,6 +209,13 @@ public class AppenderatorImpl implements Appenderator
return retVal; return retVal;
} }
private void throwPersistErrorIfExists()
{
if (persistError != null) {
throw new RE(persistError, "Error while persisting");
}
}
@Override @Override
public AppenderatorAddResult add( public AppenderatorAddResult add(
final SegmentIdentifier identifier, final SegmentIdentifier identifier,
@ -212,6 +224,8 @@ public class AppenderatorImpl implements Appenderator
final boolean allowIncrementalPersists final boolean allowIncrementalPersists
) throws IndexSizeExceededException, SegmentNotWritableException ) throws IndexSizeExceededException, SegmentNotWritableException
{ {
throwPersistErrorIfExists();
if (!identifier.getDataSource().equals(schema.getDataSource())) { if (!identifier.getDataSource().equals(schema.getDataSource())) {
throw new IAE( throw new IAE(
"Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!", "Expected dataSource[%s] but was asked to insert row for dataSource[%s]?!",
@ -286,7 +300,23 @@ public class AppenderatorImpl implements Appenderator
if (allowIncrementalPersists) { if (allowIncrementalPersists) {
// persistAll clears rowsCurrentlyInMemory, no need to update it. // persistAll clears rowsCurrentlyInMemory, no need to update it.
log.info("Persisting rows in memory due to: [%s]", String.join(",", persistReasons)); log.info("Persisting rows in memory due to: [%s]", String.join(",", persistReasons));
persistAll(committerSupplier == null ? null : committerSupplier.get()); Futures.addCallback(
persistAll(committerSupplier == null ? null : committerSupplier.get()),
new FutureCallback<Object>()
{
@Override
public void onSuccess(@Nullable Object result)
{
// do nothing
}
@Override
public void onFailure(Throwable t)
{
persistError = t;
}
}
);
} else { } else {
isPersistRequired = true; isPersistRequired = true;
} }
@ -401,6 +431,8 @@ public class AppenderatorImpl implements Appenderator
// Drop commit metadata, then abandon all segments. // Drop commit metadata, then abandon all segments.
try { try {
throwPersistErrorIfExists();
if (persistExecutor != null) { if (persistExecutor != null) {
final ListenableFuture<?> uncommitFuture = persistExecutor.submit( final ListenableFuture<?> uncommitFuture = persistExecutor.submit(
new Callable<Object>() new Callable<Object>()
@ -434,7 +466,7 @@ public class AppenderatorImpl implements Appenderator
} }
} }
catch (ExecutionException e) { catch (ExecutionException e) {
throw Throwables.propagate(e); throw new RuntimeException(e);
} }
} }
@ -452,6 +484,8 @@ public class AppenderatorImpl implements Appenderator
@Override @Override
public ListenableFuture<Object> persistAll(@Nullable final Committer committer) public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
{ {
throwPersistErrorIfExists();
final Map<String, Integer> currentHydrants = Maps.newHashMap(); final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList(); final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0; int numPersistedRows = 0;
@ -490,7 +524,7 @@ public class AppenderatorImpl implements Appenderator
new ThreadRenamingCallable<Object>(threadName) new ThreadRenamingCallable<Object>(threadName)
{ {
@Override @Override
public Object doCall() public Object doCall() throws IOException
{ {
try { try {
for (Pair<FireHydrant, SegmentIdentifier> pair : indexesToPersist) { for (Pair<FireHydrant, SegmentIdentifier> pair : indexesToPersist) {
@ -532,9 +566,9 @@ public class AppenderatorImpl implements Appenderator
// return null if committer is null // return null if committer is null
return commitMetadata; return commitMetadata;
} }
catch (Exception e) { catch (IOException e) {
metrics.incrementFailedPersists(); metrics.incrementFailedPersists();
throw Throwables.propagate(e); throw e;
} }
finally { finally {
metrics.incrementNumPersists(); metrics.incrementNumPersists();