mirror of https://github.com/apache/druid.git
parent
92d7316ed8
commit
21864a9407
|
@ -69,6 +69,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -87,6 +88,7 @@ public class BatchServerInventoryViewTest
|
|||
private Set<DataSegment> testSegments;
|
||||
private BatchServerInventoryView batchServerInventoryView;
|
||||
private BatchServerInventoryView filteredBatchServerInventoryView;
|
||||
private final AtomicInteger inventoryUpdateCounter = new AtomicInteger();
|
||||
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
@ -163,7 +165,7 @@ public class BatchServerInventoryViewTest
|
|||
);
|
||||
|
||||
batchServerInventoryView.start();
|
||||
|
||||
inventoryUpdateCounter.set(0);
|
||||
filteredBatchServerInventoryView = new BatchServerInventoryView(
|
||||
new ZkPathsConfig()
|
||||
{
|
||||
|
@ -183,8 +185,17 @@ public class BatchServerInventoryViewTest
|
|||
return dataSegment.getInterval().getStart().isBefore(SEGMENT_INTERVAL_START.plusDays(INITIAL_SEGMENTS));
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
){
|
||||
@Override
|
||||
protected DruidServer addInnerInventory(
|
||||
DruidServer container, String inventoryKey, Set<DataSegment> inventory
|
||||
)
|
||||
{
|
||||
DruidServer server = super.addInnerInventory(container, inventoryKey, inventory);
|
||||
inventoryUpdateCounter.incrementAndGet();
|
||||
return server;
|
||||
}
|
||||
};
|
||||
filteredBatchServerInventoryView.start();
|
||||
}
|
||||
|
||||
|
@ -244,14 +255,17 @@ public class BatchServerInventoryViewTest
|
|||
Set<DataSegment> segments = Sets.newHashSet(server.getSegments().values());
|
||||
|
||||
Assert.assertEquals(testSegments, segments);
|
||||
|
||||
int prevUpdateCount = inventoryUpdateCounter.get();
|
||||
// segment outside the range of default filter
|
||||
DataSegment segment1 = makeSegment(101);
|
||||
segmentAnnouncer.announceSegment(segment1);
|
||||
testSegments.add(segment1);
|
||||
|
||||
exception.expect(ISE.class);
|
||||
waitForSync(filteredBatchServerInventoryView, testSegments);
|
||||
waitForUpdateEvents(prevUpdateCount + 1);
|
||||
Assert.assertNull(
|
||||
Iterables.getOnlyElement(filteredBatchServerInventoryView.getInventory())
|
||||
.getSegment(segment1.getIdentifier())
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -367,13 +381,26 @@ public class BatchServerInventoryViewTest
|
|||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
while (Iterables.isEmpty(batchServerInventoryView.getInventory())
|
||||
|| Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments().size() != testSegments.size()) {
|
||||
Thread.sleep(500);
|
||||
Thread.sleep(100);
|
||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
|
||||
throw new ISE("BatchServerInventoryView is not updating");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForUpdateEvents(int count)
|
||||
throws Exception
|
||||
{
|
||||
final Timing forWaitingTiming = timing.forWaiting();
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
while (inventoryUpdateCounter.get() != count) {
|
||||
Thread.sleep(100);
|
||||
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
|
||||
throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", count, inventoryUpdateCounter.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSameTimeZnode() throws Exception
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue