mirror of https://github.com/apache/druid.git
Adjust BufferAggregator.get() impls to return copies (#7464)
* Adjust BufferAggregator.get() impls to return copies * Update BufferAggregator docs, more agg fixes * Update BufferAggregator get() doc
This commit is contained in:
parent
1d9450da81
commit
7d9cb6944b
|
@ -108,7 +108,7 @@ public class HllSketchBuildBufferAggregator implements BufferAggregator
|
|||
final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
|
||||
lock.lock();
|
||||
try {
|
||||
return sketchCache.get(buf).get(position);
|
||||
return sketchCache.get(buf).get(position).copy();
|
||||
}
|
||||
finally {
|
||||
lock.unlock();
|
||||
|
|
|
@ -69,7 +69,7 @@ public class DoublesSketchBuildBufferAggregator implements BufferAggregator
|
|||
@Override
|
||||
public synchronized Object get(final ByteBuffer buffer, final int position)
|
||||
{
|
||||
return sketches.get(buffer).get(position);
|
||||
return sketches.get(buffer).get(position).compact();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -66,7 +66,11 @@ public abstract class BaseBloomFilterBufferAggregator<TSelector extends BaseNull
|
|||
// | k (byte) | numLongs (int) | bitset (long[numLongs]) |
|
||||
int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES);
|
||||
mutationBuffer.limit(position + sizeBytes);
|
||||
return mutationBuffer.slice();
|
||||
|
||||
ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
|
||||
resultCopy.put(mutationBuffer.slice());
|
||||
resultCopy.rewind();
|
||||
return resultCopy;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -72,7 +72,16 @@ public interface BufferAggregator extends HotLoopCallee
|
|||
*
|
||||
* Converts the given byte buffer representation into an intermediate aggregate Object
|
||||
*
|
||||
* <b>Implementations must not change the position, limit or mark of the given buffer</b>
|
||||
* <b>Implementations must not change the position, limit or mark of the given buffer.</b>
|
||||
*
|
||||
* <b>
|
||||
* The object returned must not have any references to the given buffer (i.e., make a copy), since the
|
||||
* underlying buffer is a shared resource and may be given to another processing thread
|
||||
* while the objects returned by this aggregator are still in use.
|
||||
* </b>
|
||||
*
|
||||
* <b>If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator
|
||||
* expects its inputs to be mutable, then the object returned by this method must be mutable.</b>
|
||||
*
|
||||
* @param buf byte buffer storing the byte array representation of the aggregate
|
||||
* @param position offset within the byte buffer at which the aggregate value is stored
|
||||
|
|
Loading…
Reference in New Issue