Make URIExtraction not require FileSystem impls for URIs it understands (#2929)

* Make URIExtraction not require FileSystem impls for URIs it understands
* Fixes #2928

* Preserve URI information

* Simply case for exact matching

* Move unused variable
This commit is contained in:
Charles Allen 2016-05-08 10:53:53 -07:00 committed by Nishant
parent 8b570ab130
commit 90b0b0a4ad
2 changed files with 115 additions and 57 deletions

View File

@ -37,20 +37,17 @@ import io.druid.data.input.MapPopulator;
import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory; import io.druid.query.extraction.namespace.ExtractionNamespaceFunctionFactory;
import io.druid.query.extraction.namespace.URIExtractionNamespace; import io.druid.query.extraction.namespace.URIExtractionNamespace;
import io.druid.segment.loading.URIDataPuller; import io.druid.segment.loading.URIDataPuller;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import javax.annotation.Nullable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
/** /**
* *
@ -140,37 +137,36 @@ public class URIExtractionNamespaceFunctionFactory implements ExtractionNamespac
); );
} }
final URIDataPuller puller = (URIDataPuller) pullerRaw; final URIDataPuller puller = (URIDataPuller) pullerRaw;
final Pattern versionRegex; final URI uri;
final URI uriBase;
if (doSearch) { if (doSearch) {
uriBase = extractionNamespace.getUriPrefix(); final Pattern versionRegex;
if (extractionNamespace.getFileRegex() != null) { if (extractionNamespace.getFileRegex() != null) {
versionRegex = Pattern.compile(extractionNamespace.getFileRegex()); versionRegex = Pattern.compile(extractionNamespace.getFileRegex());
} else { } else {
versionRegex = null; versionRegex = null;
} }
} else { uri = pullerRaw.getLatestVersion(
final Path filePath = Paths.get(extractionNamespace.getUri()); extractionNamespace.getUriPrefix(),
versionRegex = Pattern.compile(Pattern.quote(filePath.getFileName().toString())); versionRegex
uriBase = filePath.getParent().toUri();
}
final URI uri = pullerRaw.getLatestVersion(
uriBase,
versionRegex
);
if (uri == null) {
throw new RuntimeException(
new FileNotFoundException(
String.format(
"Could not find match for pattern `%s` in [%s] for %s",
versionRegex,
originalUri,
extractionNamespace
)
)
); );
if (uri == null) {
throw new RuntimeException(
new FileNotFoundException(
String.format(
"Could not find match for pattern `%s` in [%s] for %s",
versionRegex,
originalUri,
extractionNamespace
)
)
);
}
} else {
uri = extractionNamespace.getUri();
} }
final String uriPath = uri.getPath(); final String uriPath = uri.getPath();
try { try {

View File

@ -41,28 +41,17 @@ import io.druid.server.namespace.cache.NamespaceExtractionCacheManager;
import io.druid.server.namespace.cache.NamespaceExtractionCacheManagersTest; import io.druid.server.namespace.cache.NamespaceExtractionCacheManagersTest;
import io.druid.server.namespace.cache.OffHeapNamespaceExtractionCacheManager; import io.druid.server.namespace.cache.OffHeapNamespaceExtractionCacheManager;
import io.druid.server.namespace.cache.OnHeapNamespaceExtractionCacheManager; import io.druid.server.namespace.cache.OnHeapNamespaceExtractionCacheManager;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
@ -77,6 +66,19 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** /**
* *
@ -84,6 +86,47 @@ import java.util.zip.GZIPOutputStream;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class URIExtractionNamespaceFunctionFactoryTest public class URIExtractionNamespaceFunctionFactoryTest
{ {
private static final String FAKE_SCHEME = "wabblywoo";
private static final Map<String, SearchableVersionedDataFinder> FINDERS = ImmutableMap.<String, SearchableVersionedDataFinder>of(
"file",
new LocalFileTimestampVersionFinder(),
FAKE_SCHEME,
new LocalFileTimestampVersionFinder()
{
URI fixURI(URI uri)
{
final URI newURI;
try {
newURI = new URI(
"file",
uri.getUserInfo(),
uri.getHost(),
uri.getPort(),
uri.getPath(),
uri.getQuery(),
uri.getFragment()
);
}
catch (URISyntaxException e) {
throw Throwables.propagate(e);
}
return newURI;
}
@Override
public String getVersion(URI uri)
{
return super.getVersion(fixURI(uri));
}
@Override
public InputStream getInputStream(URI uri) throws IOException
{
return super.getInputStream(fixURI(uri));
}
}
);
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> getParameters() throws NoSuchMethodException public static Iterable<Object[]> getParameters() throws NoSuchMethodException
{ {
@ -241,12 +284,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
); );
namespaceFunctionFactoryMap.put( namespaceFunctionFactoryMap.put(
URIExtractionNamespace.class, URIExtractionNamespace.class,
new URIExtractionNamespaceFunctionFactory( new URIExtractionNamespaceFunctionFactory(FINDERS)
ImmutableMap.<String, SearchableVersionedDataFinder>of(
"file",
new LocalFileTimestampVersionFinder()
)
)
); );
} }
@ -271,7 +309,9 @@ public class URIExtractionNamespaceFunctionFactoryTest
{ {
lifecycle.start(); lifecycle.start();
fnCache.clear(); fnCache.clear();
tmpFileParent = temporaryFolder.newFolder(); tmpFileParent = new File(temporaryFolder.newFolder(), "");
Assert.assertTrue(tmpFileParent.mkdir());
Assert.assertTrue(tmpFileParent.isDirectory());
tmpFile = Files.createTempFile(tmpFileParent.toPath(), "druidTestURIExtractionNS", suffix).toFile(); tmpFile = Files.createTempFile(tmpFileParent.toPath(), "druidTestURIExtractionNS", suffix).toFile();
final ObjectMapper mapper = new DefaultObjectMapper(); final ObjectMapper mapper = new DefaultObjectMapper();
try (OutputStream ostream = outStreamSupplier.apply(tmpFile)) { try (OutputStream ostream = outStreamSupplier.apply(tmpFile)) {
@ -288,9 +328,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
))); )));
} }
} }
factory = new URIExtractionNamespaceFunctionFactory( factory = new URIExtractionNamespaceFunctionFactory(FINDERS);
ImmutableMap.<String, SearchableVersionedDataFinder>of("file", new LocalFileTimestampVersionFinder())
);
namespace = new URIExtractionNamespace( namespace = new URIExtractionNamespace(
"ns", "ns",
tmpFile.toURI(), tmpFile.toURI(),
@ -486,7 +524,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
namespace.getUri(), namespace.getUri(),
null, null,
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int)namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null null
); );
} }
@ -500,7 +538,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
null, null,
"", "",
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int)namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null null
); );
} }
@ -514,7 +552,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
null, null,
null, null,
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int)namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
"" ""
); );
} }
@ -528,7 +566,7 @@ public class URIExtractionNamespaceFunctionFactoryTest
namespace.getUri(), namespace.getUri(),
"", "",
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int)namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
"" ""
); );
} }
@ -543,8 +581,32 @@ public class URIExtractionNamespaceFunctionFactoryTest
namespace.getUri(), namespace.getUri(),
"[", "[",
namespace.getNamespaceParseSpec(), namespace.getNamespaceParseSpec(),
Period.millis((int)namespace.getPollMs()), Period.millis((int) namespace.getPollMs()),
null null
); );
} }
@Test
public void testWeirdSchemaOnExactURI() throws Exception
{
final URIExtractionNamespace extractionNamespace = new URIExtractionNamespace(
namespace.getNamespace(),
new URI(
FAKE_SCHEME,
namespace.getUri().getUserInfo(),
namespace.getUri().getHost(),
namespace.getUri().getPort(),
namespace.getUri().getPath(),
namespace.getUri().getQuery(),
namespace.getUri().getFragment()
),
null,
null,
namespace.getNamespaceParseSpec(),
Period.millis((int) namespace.getPollMs()),
null
);
final Map<String, String> map = new HashMap<>();
Assert.assertNotNull(factory.getCachePopulator(extractionNamespace, null, map).call());
}
} }