UTs update for indexing service

This commit is contained in:
Himanshu Gupta 2015-02-16 17:07:10 -06:00 committed by Xavier Léauté
parent 30f64ff19e
commit bd5cecdd44
5 changed files with 257 additions and 46 deletions

View File

@ -46,12 +46,13 @@ public class FileTaskLogs implements TaskLogs
@Override @Override
public void pushTaskLog(final String taskid, File file) throws IOException public void pushTaskLog(final String taskid, File file) throws IOException
{ {
if (!config.getDirectory().exists()) { if (config.getDirectory().exists() || config.getDirectory().mkdirs()) {
config.getDirectory().mkdir(); final File outputFile = fileForTask(taskid);
Files.copy(file, outputFile);
log.info("Wrote task log to: %s", outputFile);
} else {
throw new IOException(String.format("Unable to create task log dir[%s]", config.getDirectory()));
} }
final File outputFile = fileForTask(taskid);
Files.copy(file, outputFile);
log.info("Wrote task log to: %s", outputFile);
} }
@Override @Override

View File

@ -0,0 +1,42 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
import org.joda.time.Duration;
public class RetryPolicyFactoryTest
{
@Test
public void testMakeRetryPolicy()
{
RetryPolicyConfig config = new RetryPolicyConfig()
.setMinWait(new Period("PT1S"))
.setMaxWait(new Period("PT10S"))
.setMaxRetryCount(1);
RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(config);
RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
Assert.assertEquals(new Duration("PT1S"),retryPolicy.getAndIncrementRetryDelay());
Assert.assertTrue(retryPolicy.hasExceededRetryThreshold());
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.FilteredServerView;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.OmniSegmentLoader;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
public class TaskToolboxTest
{
private TaskToolboxFactory taskToolbox = null;
private TaskActionClientFactory mockTaskActionClientFactory = EasyMock.createMock(TaskActionClientFactory.class);
private ServiceEmitter mockEmitter = EasyMock.createMock(ServiceEmitter.class);
private DataSegmentPusher mockSegmentPusher = EasyMock.createMock(DataSegmentPusher.class);
private DataSegmentKiller mockDataSegmentKiller = EasyMock.createMock(DataSegmentKiller.class);
private DataSegmentMover mockDataSegmentMover = EasyMock.createMock(DataSegmentMover.class);
private DataSegmentArchiver mockDataSegmentArchiver = EasyMock.createMock(DataSegmentArchiver.class);
private DataSegmentAnnouncer mockSegmentAnnouncer = EasyMock.createMock(DataSegmentAnnouncer.class);
private FilteredServerView mockNewSegmentServerView = EasyMock.createMock(FilteredServerView.class);
private QueryRunnerFactoryConglomerate mockQueryRunnerFactoryConglomerate
= EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class);
private ObjectMapper ObjectMapper = new ObjectMapper();
private OmniSegmentLoader mockOmniSegmentLoader = EasyMock.createMock(OmniSegmentLoader.class);
private Task task = EasyMock.createMock(Task.class);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp() throws IOException
{
EasyMock.expect(task.getId()).andReturn("task_id").anyTimes();
EasyMock.replay(task);
taskToolbox = new TaskToolboxFactory(
new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null),
mockTaskActionClientFactory,
mockEmitter,
mockSegmentPusher,
mockDataSegmentKiller,
mockDataSegmentMover,
mockDataSegmentArchiver,
mockSegmentAnnouncer,
mockNewSegmentServerView,
mockQueryRunnerFactoryConglomerate,
mockQueryExecutorService,
mockMonitorScheduler,
new SegmentLoaderFactory(mockOmniSegmentLoader),
ObjectMapper
);
}
@Test
public void testGetDataSegmentArchiver()
{
Assert.assertEquals(mockDataSegmentArchiver,taskToolbox.build(task).getDataSegmentArchiver());
}
@Test
public void testGetSegmentAnnouncer()
{
Assert.assertEquals(mockSegmentAnnouncer,taskToolbox.build(task).getSegmentAnnouncer());
}
@Test
public void testGetNewSegmentServerView()
{
Assert.assertEquals(mockNewSegmentServerView,taskToolbox.build(task).getNewSegmentServerView());
}
@Test
public void testGetQueryRunnerFactoryConglomerate()
{
Assert.assertEquals(mockQueryRunnerFactoryConglomerate,taskToolbox.build(task).getQueryRunnerFactoryConglomerate());
}
@Test
public void testGetQueryExecutorService()
{
Assert.assertEquals(mockQueryExecutorService,taskToolbox.build(task).getQueryExecutorService());
}
@Test
public void testGetMonitorScheduler()
{
Assert.assertEquals(mockMonitorScheduler,taskToolbox.build(task).getMonitorScheduler());
}
@Test
public void testGetObjectMapper()
{
Assert.assertEquals(ObjectMapper,taskToolbox.build(task).getObjectMapper());
}
@Test
public void testFetchSegments() throws SegmentLoadingException, IOException
{
File expectedFile = temporaryFolder.newFile();
EasyMock.expect(mockOmniSegmentLoader.getSegmentFiles((DataSegment)EasyMock.anyObject()))
.andReturn(expectedFile).anyTimes();
EasyMock.expect(mockOmniSegmentLoader.withConfig((SegmentLoaderConfig)EasyMock.anyObject()))
.andReturn(mockOmniSegmentLoader).anyTimes();
EasyMock.replay(mockOmniSegmentLoader);
DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(new Interval("2012-01-01/P1D")).version("1").size(1).build();
List<DataSegment> segments = ImmutableList.of
(
dataSegment
);
Map actualFetchedSegment = taskToolbox.build(task).fetchSegments(segments);
Assert.assertEquals(expectedFile, actualFetchedSegment.get(dataSegment));
}
@Test
public void testGetEmitter()
{
Assert.assertEquals(mockEmitter,taskToolbox.build(task).getEmitter());
}
@Test
public void testGetDataSegmentKiller()
{
Assert.assertEquals(mockDataSegmentKiller, taskToolbox.build(task).getDataSegmentKiller());
}
@Test
public void testGetDataSegmentMover()
{
Assert.assertEquals(mockDataSegmentMover, taskToolbox.build(task).getDataSegmentMover());
}
}

View File

@ -28,6 +28,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.Map; import java.util.Map;
public class FileTaskLogsTest public class FileTaskLogsTest
@ -37,7 +38,7 @@ public class FileTaskLogsTest
{ {
final File tmpDir = Files.createTempDir(); final File tmpDir = Files.createTempDir();
try { try {
final File logDir = new File(tmpDir, "logs"); final File logDir = new File(tmpDir, "druid/logs");
final File logFile = new File(tmpDir, "log"); final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8); Files.write("blah", logFile, Charsets.UTF_8);
final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir)); final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
@ -54,4 +55,34 @@ public class FileTaskLogsTest
FileUtils.deleteDirectory(tmpDir); FileUtils.deleteDirectory(tmpDir);
} }
} }
@Test
public void testPushTaskLogDirCreationFails() throws Exception
{
final File tmpDir = Files.createTempDir();
try {
IOException thrown = null;
final File logDir = new File(tmpDir, "druid/logs");
final File logFile = new File(tmpDir, "log");
Files.write("blah", logFile, Charsets.UTF_8);
if(!tmpDir.setWritable(false)) {
new RuntimeException("failed to make tmp dir read-only");
}
final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir));
try {
taskLogs.pushTaskLog("foo", logFile);
} catch(IOException ex) {
thrown = ex;
}
Assert.assertNotNull("pushTaskLog should fail with exception of dir creation error", thrown);
}
finally {
tmpDir.setWritable(true);
FileUtils.deleteDirectory(tmpDir);
}
}
} }

View File

@ -18,17 +18,11 @@
package io.druid.indexing.worker; package io.druid.indexing.worker;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.introspect.GuiceAnnotationIntrospector;
import com.fasterxml.jackson.databind.introspect.GuiceInjectableValues;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.indexing.common.IndexingServiceCondition; import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.SegmentLoaderFactory; import io.druid.indexing.common.SegmentLoaderFactory;
@ -42,7 +36,6 @@ import io.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
import io.druid.indexing.overlord.ThreadPoolTaskRunner; import io.druid.indexing.overlord.ThreadPoolTaskRunner;
import io.druid.indexing.worker.config.WorkerConfig; import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.loading.DataSegmentPuller; import io.druid.segment.loading.DataSegmentPuller;
import io.druid.segment.loading.LocalDataSegmentPuller; import io.druid.segment.loading.LocalDataSegmentPuller;
import io.druid.segment.loading.OmniSegmentLoader; import io.druid.segment.loading.OmniSegmentLoader;
@ -67,39 +60,6 @@ import java.util.List;
public class WorkerTaskMonitorTest public class WorkerTaskMonitorTest
{ {
private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final Injector injector = Guice.createInjector(
new com.google.inject.Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(ColumnConfig.class).toInstance(
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 1024 * 1024;
}
}
);
}
}
);
static {
final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
jsonMapper.setInjectableValues(new GuiceInjectableValues(injector));
jsonMapper.setAnnotationIntrospectors(
new AnnotationIntrospectorPair(
guiceIntrospector, jsonMapper.getSerializationConfig().getAnnotationIntrospector()
),
new AnnotationIntrospectorPair(
guiceIntrospector, jsonMapper.getDeserializationConfig().getAnnotationIntrospector()
)
);
}
private static final Joiner joiner = Joiner.on("/"); private static final Joiner joiner = Joiner.on("/");
private static final String basePath = "/test/druid"; private static final String basePath = "/test/druid";