Reference counting, better error handling for resources in groupBy v2. (#3268)

Refcounting prevents releasing the merge buffer, or closing the concurrent
grouper, before the processing threads have all finished. The better
error handling prevents an avalanche of per-runner exceptions when grouping
resources are exhausted, by grouping those all up into a single merged
exception.
This commit is contained in:
Gian Merlino 2016-07-26 13:29:02 -07:00 committed by Nishant
parent 188a4bc89a
commit 9b5523add3
9 changed files with 366 additions and 115 deletions

View File

@ -21,14 +21,13 @@ package io.druid.collections;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Pool that pre-generates objects up to a limit, then permits possibly-blocking "take" operations.
@ -60,60 +59,22 @@ public class BlockingPool<T>
*
* @throws InterruptedException if interrupted while waiting for a resource to become available
*/
public ResourceHolder<T> take(final long timeout) throws InterruptedException
public ReferenceCountingResourceHolder<T> take(final long timeout) throws InterruptedException
{
Preconditions.checkState(objects != null, "Pool was initialized with limit = 0, there are no objects to take.");
final T theObject = timeout >= 0 ? objects.poll(timeout, TimeUnit.MILLISECONDS) : objects.take();
return theObject == null ? null : new ObjectResourceHolder(theObject);
}
/**
* Similar to StupidPool.ObjectResourceHolder, except this one has no objectsCacheMaxCount, and it returns objects
* to the pool on finalize.
*/
private class ObjectResourceHolder implements ResourceHolder<T>
{
private AtomicBoolean closed = new AtomicBoolean(false);
private final T object;
public ObjectResourceHolder(final T object)
{
this.object = object;
}
// WARNING: it is entirely possible for a caller to hold onto the object and call "close", then still use that
// object even though it will be offered to someone else in BlockingPool.take
@Override
public T get()
{
if (closed.get()) {
throw new ISE("Already Closed!");
}
return object;
}
@Override
public void close()
{
if (!closed.compareAndSet(false, true)) {
log.warn(new ISE("Already Closed!"), "Already closed");
return;
}
if (!objects.offer(object)) {
throw new ISE("WTF?! Queue offer failed");
}
}
@Override
protected void finalize() throws Throwable
{
if (closed.compareAndSet(false, true)) {
log.warn("Not closed! Object was[%s]. Returning to pool.", object);
if (!objects.offer(object)) {
log.error("WTF?! Queue offer failed during finalize, uh oh...");
return theObject == null ? null : new ReferenceCountingResourceHolder<>(
theObject,
new Closeable()
{
@Override
public void close() throws IOException
{
if (!objects.offer(theObject)) {
log.error("WTF?! Queue offer failed, uh oh...");
}
}
}
}
}
);
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.collections;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import java.io.Closeable;
import java.util.concurrent.atomic.AtomicBoolean;
public class ReferenceCountingResourceHolder<T> implements ResourceHolder<T>
{
private static final Logger log = new Logger(ReferenceCountingResourceHolder.class);
private final Object lock = new Object();
private final T object;
private final Closeable closer;
private int refcount = 1;
private boolean didClose = false;
public ReferenceCountingResourceHolder(final T object, final Closeable closer)
{
this.object = object;
this.closer = closer;
}
@Override
public T get()
{
synchronized (lock) {
if (refcount <= 0) {
throw new ISE("Already closed!");
}
return object;
}
}
public Releaser increment()
{
synchronized (lock) {
if (refcount <= 0) {
throw new ISE("Already closed!");
}
refcount++;
return new Releaser()
{
final AtomicBoolean didRelease = new AtomicBoolean();
@Override
public void close()
{
if (didRelease.compareAndSet(false, true)) {
decrement();
} else {
log.warn("WTF?! release called but we are already released!");
}
}
@Override
protected void finalize() throws Throwable
{
if (didRelease.compareAndSet(false, true)) {
log.warn("Not released! Object was[%s], releasing on finalize of releaser.", object);
decrement();
}
}
};
}
}
public int getReferenceCount()
{
synchronized (lock) {
return refcount;
}
}
@Override
public void close()
{
synchronized (lock) {
if (!didClose) {
didClose = true;
decrement();
} else {
log.warn(new ISE("Already closed!"), "Already closed");
}
}
}
@Override
protected void finalize() throws Throwable
{
synchronized (lock) {
if (!didClose) {
log.warn("Not closed! Object was[%s], closing on finalize of holder.", object);
didClose = true;
decrement();
}
}
}
private void decrement()
{
synchronized (lock) {
refcount--;
if (refcount <= 0) {
try {
closer.close();
}
catch (Exception e) {
log.error(e, "WTF?! Close failed, uh oh...");
}
}
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.collections;
import java.io.Closeable;
/**
* Releaser is like Closeable, but doesn't throw IOExceptions.
*/
public interface Releaser extends Closeable
{
@Override
void close();
}

View File

@ -20,24 +20,29 @@
package io.druid.query.groupby.epinephelinae;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
public class CloseableGrouperIterator<KeyType extends Comparable<KeyType>, T> implements Iterator<T>, Closeable
{
private final Grouper<KeyType> grouper;
private final Function<Grouper.Entry<KeyType>, T> transformer;
private final Closeable closer;
private final Iterator<Grouper.Entry<KeyType>> iterator;
public CloseableGrouperIterator(
final Grouper<KeyType> grouper,
final boolean sorted,
final Function<Grouper.Entry<KeyType>, T> transformer
final Function<Grouper.Entry<KeyType>, T> transformer,
final Closeable closer
)
{
this.grouper = grouper;
this.transformer = transformer;
this.closer = closer;
this.iterator = grouper.iterator(sorted);
}
@ -62,6 +67,13 @@ public class CloseableGrouperIterator<KeyType extends Comparable<KeyType>, T> im
@Override
public void close()
{
grouper.close();
if (closer != null) {
try {
closer.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
}

View File

@ -45,7 +45,8 @@ import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.logger.Logger;
import io.druid.collections.BlockingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.ReferenceCountingResourceHolder;
import io.druid.collections.Releaser;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.query.AbstractPrioritizedCallable;
@ -163,9 +164,10 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
final Number timeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = timeout == null ? -1L : startTime + timeout.longValue();
final ResourceHolder<ByteBuffer> mergeBufferHolder;
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
// This will potentially block if there are no merge buffers left in the pool.
mergeBufferHolder = mergeBufferPool.take(timeout != null && timeout.longValue() > 0 ? timeout.longValue() : -1);
}
catch (InterruptedException e) {
@ -204,6 +206,11 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
final Row row
)
{
if (theGrouper == null) {
// Pass-through null returns without doing more work.
return null;
}
final long timestamp = row.getTimestampFromEpoch();
final String[] dimensions = new String[query.getDimensions().size()];
@ -215,7 +222,8 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
columnSelectorFactory.setRow(row);
final boolean didAggregate = theGrouper.aggregate(new GroupByMergingKey(timestamp, dimensions));
if (!didAggregate) {
throw new ISE("Grouping resources exhausted");
// null return means grouping resources were exhausted.
return null;
}
columnSelectorFactory.setRow(null);
@ -225,53 +233,88 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
final int priority = BaseQuery.getContextPriority(query, 0);
ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<Void>>()
{
@Override
public ListenableFuture<Void> apply(final QueryRunner<Row> input)
{
if (input == null) {
throw new ISE(
"Null queryRunner! Looks to be some segment unmapping action happening"
);
}
return exec.submit(
new AbstractPrioritizedCallable<Void>(priority)
{
@Override
public Void call() throws Exception
{
try {
input.run(queryForRunners, responseContext)
.accumulate(grouper, accumulator);
return null;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
}
}
)
)
final ReferenceCountingResourceHolder<Grouper<GroupByMergingKey>> grouperHolder = new ReferenceCountingResourceHolder<>(
grouper,
new Closeable()
{
@Override
public void close() throws IOException
{
grouper.close();
}
}
);
try {
ListenableFuture<List<Boolean>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<Boolean>>()
{
@Override
public ListenableFuture<Boolean> apply(final QueryRunner<Row> input)
{
if (input == null) {
throw new ISE(
"Null queryRunner! Looks to be some segment unmapping action happening"
);
}
final Releaser bufferReleaser = mergeBufferHolder.increment();
try {
final Releaser grouperReleaser = grouperHolder.increment();
try {
return exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
public Boolean call() throws Exception
{
try {
final Object retVal = input.run(queryForRunners, responseContext)
.accumulate(grouper, accumulator);
// Return true if OK, false if resources were exhausted.
return retVal == grouper;
}
catch (QueryInterruptedException e) {
throw e;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
finally {
grouperReleaser.close();
bufferReleaser.close();
}
}
}
);
}
catch (Exception e) {
// Exception caught while submitting the task; release resources.
grouperReleaser.close();
throw e;
}
}
catch (Exception e) {
// Exception caught while submitting the task; release resources.
bufferReleaser.close();
throw e;
}
}
}
)
)
);
waitForFutureCompletion(query, futures, timeoutAt - processingStartTime);
}
catch (Exception e) {
grouper.close();
// Exception caught while creating or waiting for futures; release resources.
grouperHolder.close();
throw e;
}
@ -303,6 +346,14 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
theMap
);
}
},
new Closeable()
{
@Override
public void close() throws IOException
{
grouperHolder.close();
}
}
);
}
@ -335,18 +386,26 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner
private void waitForFutureCompletion(
GroupByQuery query,
ListenableFuture<?> future,
ListenableFuture<List<Boolean>> future,
long timeout
)
{
try {
final List<Boolean> results;
if (queryWatcher != null) {
queryWatcher.registerQuery(query, future);
}
if (timeout <= 0) {
future.get();
results = future.get();
} else {
future.get(timeout, TimeUnit.MILLISECONDS);
results = future.get(timeout, TimeUnit.MILLISECONDS);
}
for (Boolean result : results) {
if (!result) {
future.cancel(true);
throw new ISE("Grouping resources exhausted");
}
}
}
catch (InterruptedException e) {

View File

@ -314,6 +314,14 @@ outer:
return new MapBasedRow(timestamp, theMap);
}
},
new Closeable()
{
@Override
public void close() throws IOException
{
grouper.close();
}
}
);

View File

@ -59,10 +59,19 @@ public class LimitedTemporaryStorage implements Closeable
this.maxBytesUsed = maxBytesUsed;
}
/**
* Create a new temporary file. All methods of the returned output stream may throw
* {@link TemporaryStorageFullException} if the temporary storage area fills up.
*
* @return output stream to the file
*
* @throws TemporaryStorageFullException if the temporary storage area is full
* @throws IOException if something goes wrong while creating the file
*/
public LimitedOutputStream createFile() throws IOException
{
if (bytesUsed.get() >= maxBytesUsed) {
throwFullError();
throw new TemporaryStorageFullException(maxBytesUsed);
}
synchronized (files) {
@ -158,14 +167,9 @@ public class LimitedTemporaryStorage implements Closeable
private void grab(int n) throws IOException
{
if (bytesUsed.addAndGet(n) > maxBytesUsed) {
throwFullError();
throw new TemporaryStorageFullException(maxBytesUsed);
}
}
}
private void throwFullError() throws IOException
{
throw new IOException(String.format("Cannot write to disk, hit limit of %,d bytes.", maxBytesUsed));
}
}

View File

@ -93,7 +93,15 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
return true;
} else {
// Warning: this can potentially block up a processing thread for a while.
spill();
try {
spill();
}
catch (TemporaryStorageFullException e) {
return false;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
return grouper.aggregate(key, keyHash);
}
}
@ -154,14 +162,16 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
return Groupers.mergeIterators(iterators, sorted);
}
private void spill()
private void spill() throws IOException
{
final File outFile;
try (
final LimitedTemporaryStorage.LimitedOutputStream out = temporaryStorage.createFile();
final LZ4BlockOutputStream compressedOut = new LZ4BlockOutputStream(out);
final JsonGenerator jsonGenerator = spillMapper.getFactory().createGenerator(compressedOut)
) {
files.add(out.getFile());
outFile = out.getFile();
final Iterator<Entry<KeyType>> it = grouper.iterator(true);
while (it.hasNext()) {
if (Thread.interrupted()) {
@ -171,10 +181,8 @@ public class SpillingGrouper<KeyType extends Comparable<KeyType>> implements Gro
jsonGenerator.writeObject(it.next());
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}
files.add(outFile);
grouper.reset();
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.groupby.epinephelinae;
import java.io.IOException;
public class TemporaryStorageFullException extends IOException
{
public TemporaryStorageFullException(final long maxBytesUsed)
{
super(String.format("Cannot write to disk, hit limit of %,d bytes.", maxBytesUsed));
}
}