mirror of https://github.com/apache/druid.git
Add a readOnly() method for PartitionedOutputChannel (#13755)
With SuperSorter using the PartitionedOutputChannels for sorting, it might OOM on inputs of reasonable size because the channel consists of both the writable frame channel and the frame allocator, both of which are not required once the output channel has been written to. This change adds a readOnly to the output channel which contains only the readable channel, due to which unnecessary memory references to the writable channel and the memory allocator are lost once the output channel has been written to, preventing the OOM.
This commit is contained in:
parent
bf39b4d313
commit
5b0b3a9b2c
|
@ -22,6 +22,9 @@ package org.apache.druid.frame.channel;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.druid.frame.processor.OutputChannel;
|
||||
import org.apache.druid.frame.processor.PartitionedOutputChannel;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.query.ResourceLimitExceededException;
|
||||
|
||||
|
@ -38,16 +41,29 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
public class ComposingWritableFrameChannel implements WritableFrameChannel
|
||||
{
|
||||
private final List<Supplier<WritableFrameChannel>> channels;
|
||||
@Nullable
|
||||
private final List<Supplier<OutputChannel>> outputChannelSuppliers;
|
||||
|
||||
@Nullable
|
||||
private final List<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSuppliers;
|
||||
|
||||
private final List<Supplier<WritableFrameChannel>> writableChannelSuppliers;
|
||||
private final Map<Integer, HashSet<Integer>> partitionToChannelMap;
|
||||
private int currentIndex;
|
||||
|
||||
public ComposingWritableFrameChannel(
|
||||
List<Supplier<WritableFrameChannel>> channels,
|
||||
@Nullable List<Supplier<OutputChannel>> outputChannelSuppliers,
|
||||
@Nullable List<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSuppliers,
|
||||
List<Supplier<WritableFrameChannel>> writableChannelSuppliers,
|
||||
Map<Integer, HashSet<Integer>> partitionToChannelMap
|
||||
)
|
||||
{
|
||||
this.channels = Preconditions.checkNotNull(channels, "channels is null");
|
||||
if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers != null) {
|
||||
throw new IAE("Atmost one of outputChannelSuppliers and partitionedOutputChannelSuppliers can be provided");
|
||||
}
|
||||
this.outputChannelSuppliers = outputChannelSuppliers;
|
||||
this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers;
|
||||
this.writableChannelSuppliers = Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers is null");
|
||||
this.partitionToChannelMap =
|
||||
Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap is null");
|
||||
this.currentIndex = 0;
|
||||
|
@ -56,12 +72,12 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
|
|||
@Override
|
||||
public void write(FrameWithPartition frameWithPartition) throws IOException
|
||||
{
|
||||
if (currentIndex >= channels.size()) {
|
||||
throw new ISE("No more channels available to write. Total available channels : " + channels.size());
|
||||
if (currentIndex >= writableChannelSuppliers.size()) {
|
||||
throw new ISE("No more channels available to write. Total available channels : " + writableChannelSuppliers.size());
|
||||
}
|
||||
|
||||
try {
|
||||
channels.get(currentIndex).get().write(frameWithPartition);
|
||||
writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition);
|
||||
partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k -> Sets.newHashSetWithExpectedSize(1))
|
||||
.add(currentIndex);
|
||||
}
|
||||
|
@ -70,9 +86,19 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
|
|||
// exception is automatically passed up to the user incase all the channels are exhausted. If in future, more
|
||||
// cases come up to dictate control flow, then we can switch to returning a custom object from the channel's write
|
||||
// operation.
|
||||
channels.get(currentIndex).get().close();
|
||||
writableChannelSuppliers.get(currentIndex).get().close();
|
||||
|
||||
// We are converting the corresponding channel to read only after exhausting it because that channel won't be used
|
||||
// for writes anymore
|
||||
if (outputChannelSuppliers != null) {
|
||||
outputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
|
||||
}
|
||||
if (partitionedOutputChannelSuppliers != null) {
|
||||
partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
|
||||
}
|
||||
|
||||
currentIndex++;
|
||||
if (currentIndex >= channels.size()) {
|
||||
if (currentIndex >= writableChannelSuppliers.size()) {
|
||||
throw rlee;
|
||||
}
|
||||
write(frameWithPartition);
|
||||
|
@ -82,7 +108,7 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
|
|||
@Override
|
||||
public void fail(@Nullable Throwable cause) throws IOException
|
||||
{
|
||||
for (Supplier<WritableFrameChannel> channel : channels) {
|
||||
for (Supplier<WritableFrameChannel> channel : writableChannelSuppliers) {
|
||||
channel.get().fail(cause);
|
||||
}
|
||||
}
|
||||
|
@ -90,21 +116,21 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
|
|||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
if (currentIndex < channels.size()) {
|
||||
channels.get(currentIndex).get().close();
|
||||
currentIndex = channels.size();
|
||||
if (currentIndex < writableChannelSuppliers.size()) {
|
||||
writableChannelSuppliers.get(currentIndex).get().close();
|
||||
currentIndex = writableChannelSuppliers.size();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed()
|
||||
{
|
||||
return currentIndex == channels.size();
|
||||
return currentIndex == writableChannelSuppliers.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<?> writabilityFuture()
|
||||
{
|
||||
return channels.get(currentIndex).get().writabilityFuture();
|
||||
return writableChannelSuppliers.get(currentIndex).get().writabilityFuture();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
|
|||
{
|
||||
ImmutableList.Builder<Supplier<WritableFrameChannel>> writableFrameChannelSuppliersBuilder = ImmutableList.builder();
|
||||
ImmutableList.Builder<Supplier<ReadableFrameChannel>> readableFrameChannelSuppliersBuilder = ImmutableList.builder();
|
||||
ImmutableList.Builder<Supplier<OutputChannel>> outputChannelSupplierBuilder = ImmutableList.builder();
|
||||
for (OutputChannelFactory channelFactory : channelFactories) {
|
||||
// open channel lazily
|
||||
Supplier<OutputChannel> channel =
|
||||
|
@ -71,14 +72,19 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
|
|||
throw new UncheckedIOException(e);
|
||||
}
|
||||
})::get;
|
||||
outputChannelSupplierBuilder.add(channel);
|
||||
writableFrameChannelSuppliersBuilder.add(() -> channel.get().getWritableChannel());
|
||||
readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get());
|
||||
// We read the output channel once they have been written to, and therefore it is space efficient and safe to
|
||||
// save their read only copies
|
||||
readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get());
|
||||
}
|
||||
|
||||
// the map maintains a mapping of channels which have the data for a given partition.
|
||||
// it is useful to identify the readable channels to open in the composition while reading the partition data.
|
||||
Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
|
||||
ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel(
|
||||
outputChannelSupplierBuilder.build(),
|
||||
null,
|
||||
writableFrameChannelSuppliersBuilder.build(),
|
||||
partitionToChannelMap
|
||||
);
|
||||
|
@ -103,6 +109,7 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
|
|||
ImmutableList.Builder<Supplier<WritableFrameChannel>> writableFrameChannelsBuilder = ImmutableList.builder();
|
||||
ImmutableList.Builder<Supplier<PartitionedReadableFrameChannel>> readableFrameChannelSuppliersBuilder =
|
||||
ImmutableList.builder();
|
||||
ImmutableList.Builder<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSupplierBuilder = ImmutableList.builder();
|
||||
for (OutputChannelFactory channelFactory : channelFactories) {
|
||||
Supplier<PartitionedOutputChannel> channel =
|
||||
Suppliers.memoize(() -> {
|
||||
|
@ -113,14 +120,19 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
|
|||
throw new UncheckedIOException(e);
|
||||
}
|
||||
})::get;
|
||||
partitionedOutputChannelSupplierBuilder.add(channel);
|
||||
writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel());
|
||||
readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get());
|
||||
// We read the output channel once they have been written to, and therefore it is space efficient and safe to
|
||||
// save their read only copies
|
||||
readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get());
|
||||
}
|
||||
// the map maintains a mapping of channels which have the data for a given partition.
|
||||
// it is useful to identify the readable channels to open in the composition while reading the partition data.
|
||||
|
||||
Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
|
||||
ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel(
|
||||
null,
|
||||
partitionedOutputChannelSupplierBuilder.build(),
|
||||
writableFrameChannelsBuilder.build(),
|
||||
partitionToChannelMap
|
||||
);
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.frame.processor;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||
import org.apache.druid.frame.allocation.MemoryAllocator;
|
||||
import org.apache.druid.frame.channel.FrameWithPartition;
|
||||
import org.apache.druid.frame.channel.ReadableFrameChannel;
|
||||
|
@ -42,11 +43,16 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
public class OutputChannel
|
||||
{
|
||||
@GuardedBy("this")
|
||||
@Nullable
|
||||
private final WritableFrameChannel writableChannel;
|
||||
private WritableFrameChannel writableChannel;
|
||||
|
||||
@GuardedBy("this")
|
||||
@Nullable
|
||||
private final MemoryAllocator frameMemoryAllocator;
|
||||
private MemoryAllocator frameMemoryAllocator;
|
||||
|
||||
private final Supplier<ReadableFrameChannel> readableChannelSupplier;
|
||||
|
||||
private final boolean readableChannelUsableWhileWriting;
|
||||
private final int partitionNumber;
|
||||
|
||||
|
@ -157,12 +163,14 @@ public class OutputChannel
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the writable channel of this pair. The producer writes to this channel.
|
||||
* Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is
|
||||
* read only.
|
||||
*/
|
||||
public WritableFrameChannel getWritableChannel()
|
||||
public synchronized WritableFrameChannel getWritableChannel()
|
||||
{
|
||||
if (writableChannel == null) {
|
||||
throw new ISE("Writable channel is not available");
|
||||
throw new ISE("Writable channel is not available. The output channel might be marked as read-only,"
|
||||
+ " hence no writes are allowed.");
|
||||
} else {
|
||||
return writableChannel;
|
||||
}
|
||||
|
@ -170,11 +178,13 @@ public class OutputChannel
|
|||
|
||||
/**
|
||||
* Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel.
|
||||
* Throws ISE if the output channel is read only.
|
||||
*/
|
||||
public MemoryAllocator getFrameMemoryAllocator()
|
||||
public synchronized MemoryAllocator getFrameMemoryAllocator()
|
||||
{
|
||||
if (frameMemoryAllocator == null) {
|
||||
throw new ISE("Writable channel is not available");
|
||||
throw new ISE("Frame allocator is not available. The output channel might be marked as read-only,"
|
||||
+ " hence memory allocator is not required.");
|
||||
} else {
|
||||
return frameMemoryAllocator;
|
||||
}
|
||||
|
@ -197,7 +207,7 @@ public class OutputChannel
|
|||
/**
|
||||
* Whether {@link #getReadableChannel()} is ready to use.
|
||||
*/
|
||||
public boolean isReadableChannelReady()
|
||||
public synchronized boolean isReadableChannelReady()
|
||||
{
|
||||
return readableChannelUsableWhileWriting || writableChannel == null || writableChannel.isClosed();
|
||||
}
|
||||
|
@ -212,7 +222,7 @@ public class OutputChannel
|
|||
return partitionNumber;
|
||||
}
|
||||
|
||||
public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
|
||||
public synchronized OutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
|
||||
{
|
||||
if (writableChannel == null) {
|
||||
return this;
|
||||
|
@ -235,4 +245,14 @@ public class OutputChannel
|
|||
{
|
||||
return OutputChannel.readOnly(readableChannelSupplier, partitionNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making
|
||||
* it more efficient
|
||||
*/
|
||||
public synchronized void convertToReadOnly()
|
||||
{
|
||||
this.writableChannel = null;
|
||||
this.frameMemoryAllocator = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.frame.processor;
|
|||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.errorprone.annotations.concurrent.GuardedBy;
|
||||
import org.apache.druid.frame.allocation.MemoryAllocator;
|
||||
import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
|
||||
import org.apache.druid.frame.channel.WritableFrameChannel;
|
||||
|
@ -37,10 +38,15 @@ import java.util.function.Supplier;
|
|||
*/
|
||||
public class PartitionedOutputChannel
|
||||
{
|
||||
|
||||
@GuardedBy("this")
|
||||
@Nullable
|
||||
private final WritableFrameChannel writableChannel;
|
||||
private WritableFrameChannel writableChannel;
|
||||
|
||||
@GuardedBy("this")
|
||||
@Nullable
|
||||
private final MemoryAllocator frameMemoryAllocator;
|
||||
private MemoryAllocator frameMemoryAllocator;
|
||||
|
||||
private final Supplier<PartitionedReadableFrameChannel> readableChannelSupplier;
|
||||
|
||||
private PartitionedOutputChannel(
|
||||
|
@ -76,12 +82,14 @@ public class PartitionedOutputChannel
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns the writable channel of this pair. The producer writes to this channel.
|
||||
* Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is
|
||||
* read only.
|
||||
*/
|
||||
public WritableFrameChannel getWritableChannel()
|
||||
public synchronized WritableFrameChannel getWritableChannel()
|
||||
{
|
||||
if (writableChannel == null) {
|
||||
throw new ISE("Writable channel is not available");
|
||||
throw new ISE("Writable channel is not available. The output channel might be marked as read-only,"
|
||||
+ " hence no writes are allowed.");
|
||||
} else {
|
||||
return writableChannel;
|
||||
}
|
||||
|
@ -89,11 +97,13 @@ public class PartitionedOutputChannel
|
|||
|
||||
/**
|
||||
* Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel.
|
||||
* Throws ISE if the output channel is read only.
|
||||
*/
|
||||
public MemoryAllocator getFrameMemoryAllocator()
|
||||
public synchronized MemoryAllocator getFrameMemoryAllocator()
|
||||
{
|
||||
if (frameMemoryAllocator == null) {
|
||||
throw new ISE("Writable channel is not available");
|
||||
throw new ISE("Frame allocator is not available. The output channel might be marked as read-only,"
|
||||
+ " hence memory allocator is not required.");
|
||||
} else {
|
||||
return frameMemoryAllocator;
|
||||
}
|
||||
|
@ -102,12 +112,12 @@ public class PartitionedOutputChannel
|
|||
/**
|
||||
* Returns the partitioned readable channel supplier of this pair. The consumer reads from this channel.
|
||||
*/
|
||||
public Supplier<PartitionedReadableFrameChannel> getReadableChannelSupplier()
|
||||
public synchronized Supplier<PartitionedReadableFrameChannel> getReadableChannelSupplier()
|
||||
{
|
||||
return readableChannelSupplier;
|
||||
}
|
||||
|
||||
public PartitionedOutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
|
||||
public synchronized PartitionedOutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
|
||||
{
|
||||
if (writableChannel == null) {
|
||||
return this;
|
||||
|
@ -119,4 +129,24 @@ public class PartitionedOutputChannel
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a read-only version of this instance. Read-only versions have neither {@link #getWritableChannel()} nor
|
||||
* {@link #getFrameMemoryAllocator()}, and therefore require substantially less memory.
|
||||
*/
|
||||
public PartitionedOutputChannel readOnly()
|
||||
{
|
||||
return new PartitionedOutputChannel(null, null, readableChannelSupplier);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making
|
||||
* it more efficient
|
||||
*/
|
||||
public synchronized void convertToReadOnly()
|
||||
{
|
||||
this.writableChannel = null;
|
||||
this.frameMemoryAllocator = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -611,7 +611,11 @@ public class SuperSorter
|
|||
);
|
||||
writableChannel = partitionedOutputChannel.getWritableChannel();
|
||||
frameAllocatorFactory = new SingleMemoryAllocatorFactory(partitionedOutputChannel.getFrameMemoryAllocator());
|
||||
levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel);
|
||||
|
||||
// We add the readOnly() channel even though we require the writableChannel and the frame allocator because
|
||||
// the original partitionedOutputChannel would contain the reference to those, which would get cleaned up
|
||||
// appropriately and not be held up in the class level map
|
||||
levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel.readOnly());
|
||||
}
|
||||
|
||||
final FrameChannelMerger worker =
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.frame.channel;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.apache.druid.frame.Frame;
|
||||
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
|
||||
import org.apache.druid.frame.processor.OutputChannel;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.query.ResourceLimitExceededException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
|
||||
public class ComposingWritableFrameChannelTest
|
||||
{
|
||||
@Test
|
||||
public void testComposingWritableChannelSwitchesProperly() throws IOException
|
||||
{
|
||||
|
||||
// This frame channel writes a single frame
|
||||
WritableFrameChannel writableFrameChannel1 = new LimitedWritableFrameChannel(2);
|
||||
WritableFrameChannel writableFrameChannel2 = new LimitedWritableFrameChannel(100);
|
||||
|
||||
Supplier<ReadableFrameChannel> readableFrameChannelSupplier1 = () -> null;
|
||||
Supplier<ReadableFrameChannel> readableFrameChannelSupplier2 = () -> null;
|
||||
|
||||
OutputChannel outputChannel1 = OutputChannel.pair(
|
||||
writableFrameChannel1,
|
||||
ArenaMemoryAllocator.createOnHeap(1),
|
||||
readableFrameChannelSupplier1,
|
||||
1
|
||||
);
|
||||
OutputChannel outputChannel2 = OutputChannel.pair(
|
||||
writableFrameChannel2,
|
||||
ArenaMemoryAllocator.createOnHeap(1),
|
||||
readableFrameChannelSupplier2,
|
||||
2
|
||||
);
|
||||
|
||||
Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
|
||||
|
||||
ComposingWritableFrameChannel composingWritableFrameChannel = new ComposingWritableFrameChannel(
|
||||
ImmutableList.of(
|
||||
() -> outputChannel1,
|
||||
() -> outputChannel2
|
||||
),
|
||||
null,
|
||||
ImmutableList.of(
|
||||
() -> writableFrameChannel1,
|
||||
() -> writableFrameChannel2
|
||||
),
|
||||
partitionToChannelMap
|
||||
);
|
||||
|
||||
composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 1));
|
||||
composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 2));
|
||||
composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 3));
|
||||
|
||||
// Assert the location of the channels where the frames have been written to
|
||||
Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(1));
|
||||
Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2));
|
||||
Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3));
|
||||
|
||||
// Test if the older channel has been converted to read only
|
||||
Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel);
|
||||
}
|
||||
|
||||
static class LimitedWritableFrameChannel implements WritableFrameChannel
|
||||
{
|
||||
private final int maxFrames;
|
||||
private int curFrame = 0;
|
||||
|
||||
public LimitedWritableFrameChannel(int maxFrames)
|
||||
{
|
||||
this.maxFrames = maxFrames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(FrameWithPartition frameWithPartition)
|
||||
{
|
||||
if (curFrame >= maxFrames) {
|
||||
throw new ResourceLimitExceededException("Cannot write more frames to the channel");
|
||||
}
|
||||
++curFrame;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Frame frame)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(@Nullable Throwable cause)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<?> writabilityFuture()
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,14 +44,16 @@ public class OutputChannelTest
|
|||
final IllegalStateException e1 = Assert.assertThrows(IllegalStateException.class, channel::getWritableChannel);
|
||||
MatcherAssert.assertThat(
|
||||
e1,
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
|
||||
"Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed."))
|
||||
);
|
||||
|
||||
// No writable channel: cannot call getFrameMemoryAllocator.
|
||||
final IllegalStateException e2 = Assert.assertThrows(IllegalStateException.class, channel::getFrameMemoryAllocator);
|
||||
MatcherAssert.assertThat(
|
||||
e2,
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
|
||||
"Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
|
||||
);
|
||||
|
||||
// Mapping the writable channel of a nil channel has no effect, because there is no writable channel.
|
||||
|
|
|
@ -86,7 +86,8 @@ public class OutputChannelsTest
|
|||
|
||||
MatcherAssert.assertThat(
|
||||
e,
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
|
||||
"Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed."))
|
||||
);
|
||||
|
||||
final IllegalStateException e2 = Assert.assertThrows(
|
||||
|
@ -96,7 +97,8 @@ public class OutputChannelsTest
|
|||
|
||||
MatcherAssert.assertThat(
|
||||
e2,
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
|
||||
"Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue