diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 75991883b0e..6a2e2e51b16 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -147,7 +147,7 @@ Issuing a GET request at the same URL will return the current worker config spec |Property|Description|Default| |--------|-----------|-------| -|`selectStrategy`|How to assign tasks to middlemanagers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, and `equalDistribution`.|fillCapacity| +|`selectStrategy`|How to assign tasks to middlemanagers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, `equalDistribution` and `javascript`.|fillCapacity| |`autoScaler`|Only used if autoscaling is enabled. See below.|null| To view the audit history of worker config issue a GET request to the URL - @@ -187,6 +187,33 @@ The workers with the least amount of tasks is assigned the task. |--------|-----------|-------| |`type`|`equalDistribution`.|fillCapacity| +##### Javascript + +Allows defining arbitrary logic for selecting workers to run task using a JavaScript function. +The function is passed remoteTaskRunnerConfig, map of workerId to available workers and task to be executed and returns the workerId on which the task should be run or null if the task cannot be run. +It can be used for rapid development of missing features where the worker selection logic is to be changed or tuned often. +If the selection logic is quite complex and cannot be easily tested in javascript environment, +its better to write a druid extension module with extending current worker selection strategies written in java. + + +|Property|Description|Default| +|--------|-----------|-------| +|`type`|`javascript`.|javascript| +|`function`|String representing javascript function|| + +Example: a function that sends batch_index_task to workers 10.0.0.1 and 10.0.0.2 and all other tasks to other available workers. + +``` +{ +"type":"javascript", +"function":"function (config, zkWorkers, task) {\nvar batch_workers = new java.util.ArrayList();\nbatch_workers.add(\"10.0.0.1\");\nbatch_workers.add(\"10.0.0.2\");\nworkers = zkWorkers.keySet().toArray();\nvar sortedWorkers = new Array()\n;for(var i = 0; i < workers.length; i++){\n sortedWorkers[i] = workers[i];\n}\nArray.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\nvar minWorkerVer = config.getMinWorkerVersion();\nfor (var i = 0; i < sortedWorkers.length; i++) {\n var worker = sortedWorkers[i];\n var zkWorker = zkWorkers.get(worker);\n if(zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)){\n if(task.getType() == 'index_hadoop' && batch_workers.contains(worker)){\n return worker;\n } else {\n if(task.getType() != 'index_hadoop' && !batch_workers.contains(worker)){\n return worker;\n }\n }\n }\n}\nreturn null;\n}" +} + + +``` + + + #### Autoscaler diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java new file mode 100644 index 00000000000..ff6841d2acd --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategy.java @@ -0,0 +1,110 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.client.repackaged.com.google.common.base.Preconditions; +import com.google.api.client.repackaged.com.google.common.base.Throwables; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import java.util.Map; +import javax.script.Compilable; +import javax.script.Invocable; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; + +public class JavaScriptWorkerSelectStrategy implements WorkerSelectStrategy +{ + public static interface SelectorFunction + { + public String apply(RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task); + } + + private final SelectorFunction fnSelector; + private final String function; + + @JsonCreator + public JavaScriptWorkerSelectStrategy(@JsonProperty("function") String fn) + { + Preconditions.checkNotNull(fn, "function must not be null"); + final ScriptEngine engine = new ScriptEngineManager().getEngineByName("javascript"); + try { + ((Compilable) engine).compile("var apply = " + fn).eval(); + } + catch (ScriptException e) { + throw Throwables.propagate(e); + } + this.function = fn; + this.fnSelector = ((Invocable) engine).getInterface(SelectorFunction.class); + } + + @Override + public Optional findWorkerForTask( + RemoteTaskRunnerConfig config, ImmutableMap zkWorkers, Task task + ) + { + String worker = fnSelector.apply(config, zkWorkers, task); + return Optional.fromNullable(worker == null ? null : zkWorkers.get(worker)); + } + + @JsonProperty + public String getFunction() + { + return function; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + JavaScriptWorkerSelectStrategy that = (JavaScriptWorkerSelectStrategy) o; + + if (function != null ? !function.equals(that.function) : that.function != null) { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return function.hashCode(); + } + + @Override + public String toString() + { + return "JavaScriptWorkerSelectStrategy{" + + "function='" + function + '\'' + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java index 93bf0482620..5a2359b7ddf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/setup/WorkerSelectStrategy.java @@ -32,7 +32,8 @@ import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig; @JsonSubTypes(value = { @JsonSubTypes.Type(name = "fillCapacity", value = FillCapacityWorkerSelectStrategy.class), @JsonSubTypes.Type(name = "fillCapacityWithAffinity", value = FillCapacityWithAffinityWorkerSelectStrategy.class), - @JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class) + @JsonSubTypes.Type(name = "equalDistribution", value = EqualDistributionWorkerSelectStrategy.class), + @JsonSubTypes.Type(name = "javascript", value = JavaScriptWorkerSelectStrategy.class) }) public interface WorkerSelectStrategy { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java new file mode 100644 index 00000000000..1c145a5e910 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.overlord.setup; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.ImmutableZkWorker; +import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig; +import io.druid.jackson.DefaultObjectMapper; +import org.easymock.EasyMock; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Test; + +public class JavaScriptWorkerSelectStrategyTest +{ + + final JavaScriptWorkerSelectStrategy strategy = new JavaScriptWorkerSelectStrategy( + "function (config, zkWorkers, task) {\n" + + "var batch_workers = new java.util.ArrayList();\n" + + "batch_workers.add(\"10.0.0.1\");\n" + + "batch_workers.add(\"10.0.0.2\");\n" + + "workers = zkWorkers.keySet().toArray();\n" + + "var sortedWorkers = new Array()\n;" + + "for(var i = 0; i < workers.length; i++){\n" + + " sortedWorkers[i] = workers[i];\n" + + "}\n" + + "Array.prototype.sort.call(sortedWorkers,function(a, b){return zkWorkers.get(b).getCurrCapacityUsed() - zkWorkers.get(a).getCurrCapacityUsed();});\n" + + "var minWorkerVer = config.getMinWorkerVersion();\n" + + "for (var i = 0; i < sortedWorkers.length; i++) {\n" + + " var worker = sortedWorkers[i];\n" + + " var zkWorker = zkWorkers.get(worker);\n" + + " if(zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)){\n" + + " if(task.getType() == 'index_hadoop' && batch_workers.contains(worker)){\n" + + " return worker;\n" + + " } else {\n" + + " if(task.getType() != 'index_hadoop' && !batch_workers.contains(worker)){\n" + + " return worker;\n" + + " }\n" + + " }\n" + + " }\n" + + "}\n" + + "return null;\n" + + "}" + ); + + @Test + public void testSerde() throws Exception + { + ObjectMapper mapper = new DefaultObjectMapper(); + Assert.assertEquals( + strategy, + mapper.readValue( + mapper.writeValueAsString(strategy), + JavaScriptWorkerSelectStrategy.class + ) + ); + } + + @Test + public void testFindWorkerForTask() + { + ImmutableZkWorker worker1 = createMockWorker(1, true, true); + ImmutableZkWorker worker2 = createMockWorker(1, true, true); + ImmutableMap workerMap = ImmutableMap.of( + "10.0.0.1", worker1, + "10.0.0.3", worker2 + ); + + ImmutableZkWorker workerForBatchTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("index_hadoop") + ).get(); + // batch tasks should be sent to worker1 + Assert.assertEquals(worker1, workerForBatchTask); + + ImmutableZkWorker workerForOtherTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("other_type") + ).get(); + // all other tasks should be sent to worker2 + Assert.assertEquals(worker2, workerForOtherTask); + } + + @Test + public void testIsolationOfBatchWorker() + { + ImmutableMap workerMap = ImmutableMap.of( + "10.0.0.1", createMockWorker(1, true, true), + "10.0.0.2", createMockWorker(1, true, true) + ); + Optional workerForOtherTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("other_type") + ); + Assert.assertFalse(workerForOtherTask.isPresent()); + } + + @Test + public void testNoValidWorker() + { + ImmutableMap workerMap = ImmutableMap.of( + "10.0.0.1", createMockWorker(1, true, false), + "10.0.0.4", createMockWorker(1, true, false) + ); + Optional workerForBatchTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("index_hadoop") + ); + Assert.assertFalse(workerForBatchTask.isPresent()); + + Optional workerForOtherTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("otherTask") + ); + // all other tasks should be sent to worker2 + Assert.assertFalse(workerForOtherTask.isPresent()); + } + + @Test + public void testNoWorkerCanRunTask() + { + ImmutableMap workerMap = ImmutableMap.of( + "10.0.0.1", createMockWorker(1, false, true), + "10.0.0.4", createMockWorker(1, false, true) + ); + Optional workerForBatchTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("index_hadoop") + ); + Assert.assertFalse(workerForBatchTask.isPresent()); + + Optional workerForOtherTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("otherTask") + ); + // all other tasks should be sent to worker2 + Assert.assertFalse(workerForOtherTask.isPresent()); + } + + @Test + public void testFillWorkerCapacity() + { + // tasks shoudl be assigned to the worker with maximum currCapacity used until its full + ImmutableMap workerMap = ImmutableMap.of( + "10.0.0.1", createMockWorker(1, true, true), + "10.0.0.2", createMockWorker(5, true, true) + ); + Optional workerForBatchTask = strategy.findWorkerForTask( + new TestRemoteTaskRunnerConfig(new Period("PT1S")), + workerMap, + createMockTask("index_hadoop") + ); + Assert.assertEquals(workerMap.get("10.0.0.2"), workerForBatchTask.get()); + + } + + private Task createMockTask(String type) + { + Task mock = EasyMock.createMock(Task.class); + EasyMock.expect(mock.getType()).andReturn(type).anyTimes(); + EasyMock.replay(mock); + return mock; + } + + private ImmutableZkWorker createMockWorker(int currCapacityUsed, boolean canRunTask, boolean isValidVersion) + { + ImmutableZkWorker worker = EasyMock.createMock(ImmutableZkWorker.class); + EasyMock.expect(worker.canRunTask(EasyMock.anyObject(Task.class))).andReturn(canRunTask).anyTimes(); + EasyMock.expect(worker.getCurrCapacityUsed()).andReturn(currCapacityUsed).anyTimes(); + EasyMock.expect(worker.isValidVersion(EasyMock.anyString())).andReturn(isValidVersion).anyTimes(); + EasyMock.replay(worker); + return worker; + } + +} diff --git a/processing/src/test/resources/druid.sample.json.bottom b/processing/src/test/resources/druid.sample.json.bottom index 75ea3502c6e..ac0ee8d73c6 100644 --- a/processing/src/test/resources/druid.sample.json.bottom +++ b/processing/src/test/resources/druid.sample.json.bottom @@ -44,6 +44,7 @@ { "timestamp":"2011-01-12T00:00:00.000Z", "market":["", "spot"], - "index":100.000000 + "index":100.000000, + "quality":"" } ] \ No newline at end of file diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index bc856b220e4..4a7122d1d8f 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -28,6 +28,16 @@ import java.util.Set; */ public interface IndexerMetadataStorageCoordinator { + /** + * Get all segments which may include any data in the interval and are flagged as used. + * + * @param dataSource The datasource to query + * @param interval The interval for which all applicable and used datasources are requested. Start is inclusive, end is exclusive + * + * @return The DataSegments which include data in the requested interval. These segments may contain data outside the requested interval. + * + * @throws IOException + */ public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) throws IOException; @@ -36,6 +46,7 @@ public interface IndexerMetadataStorageCoordinator * with identifiers already in the metadata storage will not be added). * * @param segments set of segments to add + * * @return set of segments actually added */ public Set announceHistoricalSegments(final Set segments) throws IOException; @@ -45,5 +56,13 @@ public interface IndexerMetadataStorageCoordinator public void deleteSegments(final Set segments) throws IOException; + /** + * Get all segments which include ONLY data within the given interval and are not flagged as used. + * + * @param dataSource The datasource the segments belong to + * @param interval Filter the data segments to ones that include data in this interval exclusively. Start is inclusive, end is exclusive + * + * @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval + */ public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index df208a05093..9d3ba1f2856 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -88,11 +88,13 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor final ResultIterator dbSegments = handle.createQuery( String.format( - "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource", + "SELECT payload FROM %s WHERE used = true AND dataSource = :dataSource AND start <= :end and \"end\" >= :start AND used = true", dbTables.getSegmentsTable() ) ) .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) .map(ByteArrayMapper.FIRST) .iterator(); diff --git a/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java new file mode 100644 index 00000000000..1ad21c420c3 --- /dev/null +++ b/server/src/test/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; +import org.joda.time.Interval; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.io.IOException; +import java.util.Set; + +public class IndexerSQLMetadataStorageCoordinatorTest +{ + + private final MetadataStorageTablesConfig tablesConfig = MetadataStorageTablesConfig.fromBase("test"); + private final TestDerbyConnector derbyConnector = new TestDerbyConnector( + Suppliers.ofInstance(new MetadataStorageConnectorConfig()), + Suppliers.ofInstance(tablesConfig) + ); + private final ObjectMapper mapper = new DefaultObjectMapper(); + private final DataSegment defaultSegment = new DataSegment( + "dataSource", + Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(0), + 9, + 100 + ); + private final Set segments = ImmutableSet.of(defaultSegment); + IndexerSQLMetadataStorageCoordinator coordinator; + + @Before + public void setUp() + { + mapper.registerSubtypes(LinearShardSpec.class); + derbyConnector.createTaskTables(); + derbyConnector.createSegmentTable(); + coordinator = new IndexerSQLMetadataStorageCoordinator( + mapper, + tablesConfig, + derbyConnector + ); + } + + @After + public void tearDown() + { + derbyConnector.tearDown(); + } + + private void unUseSegment() + { + Assert.assertEquals( + 1, (int) derbyConnector.getDBI().withHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) throws Exception + { + return handle.createStatement( + String.format("UPDATE %s SET used = false WHERE id = :id", tablesConfig.getSegmentsTable()) + ) + .bind("id", defaultSegment.getIdentifier()) + .execute(); + } + } + ) + ); + } + + @Test + public void testSimpleAnnounce() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertArrayEquals( + mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"), + derbyConnector.lookup( + tablesConfig.getSegmentsTable(), + "id", + "payload", + defaultSegment.getIdentifier() + ) + ); + } + + @Test + public void testSimpleUsedList() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) + ) + ); + } + + @Test + public void testSimpleUnUsedList() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval() + ) + ) + ); + } + + + @Test + public void testUsedOverlapLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Set actualSegments = ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2014-12-31T23:59:59.999Z/2015-01-01T00:00:00.001Z") // end is exclusive + ) + ); + Assert.assertEquals( + segments, + actualSegments + ); + } + + + @Test + public void testUsedOverlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2015-1-1T23:59:59.999Z/2015-02-01T00Z") + ) + ) + ); + } + + @Test + public void testUsedOutOfBoundsLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertTrue( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart()) + ).isEmpty() + ); + } + + + @Test + public void testUsedOutOfBoundsHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertTrue( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getEnd(), defaultSegment.getInterval().getEnd().plusDays(10)) + ).isEmpty() + ); + } + + @Test + public void testUsedWithinBoundsEnd() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().minusMillis(1)) + ) + ) + ); + } + + @Test + public void testUsedOverlapEnd() throws IOException + { + coordinator.announceHistoricalSegments(segments); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUsedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusMillis(1)) + ) + ) + ); + } + + + @Test + public void testUnUsedOverlapLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().minus(1), defaultSegment.getInterval().getStart().plus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedUnderlapLow() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart().plus(1), defaultSegment.getInterval().getEnd()) + ).isEmpty() + ); + } + + + @Test + public void testUnUsedUnderlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + new Interval(defaultSegment.getInterval().getStart(), defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedOverlapHigh() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertTrue( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getEnd().minus(1)) + ).isEmpty() + ); + } + + @Test + public void testUnUsedBigOverlap() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + Interval.parse("2000/2999") + ) + ) + ); + } + + @Test + public void testUnUsedLowRange() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minus(1)) + ) + ) + ); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withStart(defaultSegment.getInterval().getStart().minusYears(1)) + ) + ) + ); + } + + @Test + public void testUnUsedHighRange() throws IOException + { + coordinator.announceHistoricalSegments(segments); + unUseSegment(); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plus(1)) + ) + ) + ); + Assert.assertEquals( + segments, + ImmutableSet.copyOf( + coordinator.getUnusedSegmentsForInterval( + defaultSegment.getDataSource(), + defaultSegment.getInterval().withEnd(defaultSegment.getInterval().getEnd().plusYears(1)) + ) + ) + ); + } +}