diff --git a/sandbox/contributions/webcrawler-LARM/src/de/lanlab/larm/fetcher/FetcherTask.java b/sandbox/contributions/webcrawler-LARM/src/de/lanlab/larm/fetcher/FetcherTask.java index 1ed8a85db7a..3e29003afe4 100644 --- a/sandbox/contributions/webcrawler-LARM/src/de/lanlab/larm/fetcher/FetcherTask.java +++ b/sandbox/contributions/webcrawler-LARM/src/de/lanlab/larm/fetcher/FetcherTask.java @@ -1,57 +1,57 @@ -/* ==================================================================== - * The Apache Software License, Version 1.1 +/* + * ==================================================================== + * The Apache Software License, Version 1.1 * - * Copyright (c) 2001 The Apache Software Foundation. All rights - * reserved. + * Copyright (c) 2001 The Apache Software Foundation. All rights + * reserved. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the - * distribution. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. * - * 3. The end-user documentation included with the redistribution, - * if any, must include the following acknowledgment: - * "This product includes software developed by the - * Apache Software Foundation (http://www.apache.org/)." - * Alternately, this acknowledgment may appear in the software itself, - * if and wherever such third-party acknowledgments normally appear. + * 3. The end-user documentation included with the redistribution, + * if any, must include the following acknowledgment: + * "This product includes software developed by the + * Apache Software Foundation (http://www.apache.org/)." + * Alternately, this acknowledgment may appear in the software itself, + * if and wherever such third-party acknowledgments normally appear. * - * 4. The names "Apache" and "Apache Software Foundation" and - * "Apache Lucene" must not be used to endorse or promote products - * derived from this software without prior written permission. For - * written permission, please contact apache@apache.org. + * 4. The names "Apache" and "Apache Software Foundation" and + * "Apache Lucene" must not be used to endorse or promote products + * derived from this software without prior written permission. For + * written permission, please contact apache@apache.org. * - * 5. Products derived from this software may not be called "Apache", - * "Apache Lucene", nor may "Apache" appear in their name, without - * prior written permission of the Apache Software Foundation. + * 5. Products derived from this software may not be called "Apache", + * "Apache Lucene", nor may "Apache" appear in their name, without + * prior written permission of the Apache Software Foundation. * - * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES - * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR - * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF - * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, - * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT - * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - * ==================================================================== + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR + * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * ==================================================================== * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . */ - package de.lanlab.larm.fetcher; import java.net.URL; @@ -78,17 +78,20 @@ import de.lanlab.larm.net.*; * this class gets the documents from the web. It connects to the server given * by the IP address in the URLMessage, gets the document, and forwards it to * the storage. If it's an HTML document, it will be parsed and all links will - * be put into the message handler again. - * - * stores contents of the files in field "contents" + * be put into the message handler again. stores contents of the files in field + * "contents" * * @author Clemens Marschner - * @version $Id$ + * @created 28. Juni 2002 + * @version $Id$ */ public class FetcherTask implements InterruptableTask, LinkHandler, Serializable { + /** + * Description of the Field + */ protected volatile boolean isInterrupted = false; /** @@ -109,8 +112,7 @@ public class FetcherTask private volatile URL base; /** - * the URL of the docuzment - * only valid within a doTask call + * the URL of the docuzment only valid within a doTask call */ private volatile URL contextUrl; @@ -120,8 +122,7 @@ public class FetcherTask protected static volatile MessageHandler messageHandler; /** - * actual number of bytes read - * only valid within a doTask call + * actual number of bytes read only valid within a doTask call */ private volatile long bytesRead = 0; @@ -135,30 +136,61 @@ public class FetcherTask */ private static volatile LinkStorage linkStorage; - - /** * task state IDs. comparisons will be done by their references, so always * use the IDs */ public final static String FT_IDLE = "idle"; + /** + * Description of the Field + */ public final static String FT_STARTED = "started"; + /** + * Description of the Field + */ public final static String FT_OPENCONNECTION = "opening connection"; + /** + * Description of the Field + */ public final static String FT_CONNECTING = "connecting"; + /** + * Description of the Field + */ public final static String FT_GETTING = "getting"; + /** + * Description of the Field + */ public final static String FT_READING = "reading"; + /** + * Description of the Field + */ public final static String FT_SCANNING = "scanning"; + /** + * Description of the Field + */ public final static String FT_STORING = "storing"; + /** + * Description of the Field + */ public final static String FT_READY = "ready"; + /** + * Description of the Field + */ public final static String FT_CLOSING = "closing"; + /** + * Description of the Field + */ public final static String FT_EXCEPTION = "exception"; + /** + * Description of the Field + */ public final static String FT_INTERRUPTED = "interrupted"; private volatile State taskState = new State(FT_IDLE); /** - * the URLs found will be stored and only added to the message handler in the very - * end, to avoid too many synchronizations + * the URLs found will be stored and only added to the message handler in + * the very end, to avoid too many synchronizations */ private volatile LinkedList foundUrls; @@ -172,17 +204,6 @@ public class FetcherTask */ private volatile String title; - /** - * headers for HTTPClient - */ - private static volatile NVPair headers[] = new NVPair[1]; - - static - { - headers[0] = new HTTPClient.NVPair("User-Agent", Constants.CRAWLER_AGENT); - - } - /** * Gets a copy of the current taskState @@ -198,7 +219,7 @@ public class FetcherTask /** * Constructor for the FetcherTask object * - * @param urlMessage Description of the Parameter + * @param urlMessage Description of the Parameter */ public FetcherTask(URLMessage urlMessage) { @@ -227,6 +248,7 @@ public class FetcherTask FetcherTask.docStorage = docStorage; } + /** * Sets the document linkStorage * @@ -268,27 +290,54 @@ public class FetcherTask return actURLMessage.getUrl(); } + volatile SimpleLogger log; volatile SimpleLogger errorLog; volatile HostManager hostManager; + volatile HostResolver hostResolver; + //private long startTime; /** * this will be called by the fetcher thread and will do all the work * - * @TODO probably split this up into different processing steps * @param thread Description of the Parameter + * @TODO probably split this up into different processing steps */ public void run(ServerThread thread) { - taskState.setState(FT_STARTED); // state information is always set to make the thread monitor happy + + taskState.setState(FT_STARTED); + // state information is always set to make the thread monitor happy log = thread.getLog(); - hostManager = ((FetcherThread)thread).getHostManager(); + hostManager = ((FetcherThread) thread).getHostManager(); + hostResolver = hostManager.getHostResolver(); + base = contextUrl = actURLMessage.getUrl(); + String urlString = actURLMessage.getURLString(); + String host = contextUrl.getHost().toLowerCase(); + HostInfo hi = hostManager.getHostInfo(host); +// System.out.println("FetcherTask with " + urlString + " started"); + if(actURLMessage.linkType == URLMessage.LINKTYPE_REDIRECT) + { + taskState.setState(FT_READY, null); + hi.releaseLock(); + return; // we've already crawled that (see below) + } + NVPair[] headers = ((FetcherThread) thread).getDefaultHeaders(); + int numHeaders = ((FetcherThread) thread).getUsedDefaultHeaders(); + boolean isIncremental = false; + if (actURLMessage instanceof WebDocument) + { + // this is an incremental crawl where we only have to check whether the doc crawled + // is newer + isIncremental = true; + headers[numHeaders] = new NVPair("If-Modified-Since", HTTPClient.Util.httpDate(((WebDocument) actURLMessage).getLastModified())); + } //HostManager hm = ((FetcherThread)thread).getHostManager(); errorLog = thread.getErrorLog(); @@ -297,21 +346,19 @@ public class FetcherTask int threadNr = ((FetcherThread) thread).getThreadNumber(); log.log("start"); - base = contextUrl = actURLMessage.getUrl(); - String urlString = actURLMessage.getURLString(); - String host = contextUrl.getHost().toLowerCase(); int hostPos = urlString.indexOf(host); int hostLen = host.length(); - HostInfo hi = hostManager.getHostInfo(host); // get and create + // get and create - if(!hi.isHealthy()) + if (!hi.isHealthy()) { // we make this check as late as possible to get the most current information log.log("Bad Host: " + contextUrl + "; returning"); - System.out.println("[" + threadNr + "] bad host: " + this.actURLMessage.getUrl()); +// System.out.println("[" + threadNr + "] bad host: " + this.actURLMessage.getUrl()); taskState.setState(FT_READY, null); + hi.releaseLock(); return; } @@ -319,14 +366,13 @@ public class FetcherTask HTTPConnection conn = null; - title = "*untitled*"; + title = ""; int size = 1; InputStream in = null; bytesRead = 0; - try { @@ -339,6 +385,7 @@ public class FetcherTask conn = new HTTPConnection(host); conn.setDefaultTimeout(75000); + // 75 s conn.setDefaultAllowUserInteraction(false); @@ -353,83 +400,199 @@ public class FetcherTask int contentLength = 0; Date date = null; - if (statusCode != 404 && statusCode != 403) + if (isIncremental) { - // read up to Constants.FETCHERTASK_MAXFILESIZE bytes into a byte array - taskState.setState(FT_READING, ipURL); - contentType = response.getHeader("Content-Type"); - String length = response.getHeader("Content-Length"); - date = response.getHeaderAsDate("Last-Modified"); - - if (length != null) - { - contentLength = Integer.parseInt(length); - } - log.log("reading"); - - fullBuffer = response.getData(Constants.FETCHERTASK_MAXFILESIZE); // max. 2 MB - base = contextUrl = response.getEffectiveURI().toURL(); - // may have changed after a 30x result code - // to do: record the link between original and effective URL - // like this the effectiveURL may be crawled twice - - - if (fullBuffer != null) - { - contentLength = fullBuffer.length; - this.bytesRead += contentLength; - } + // experimental + System.out.println("ftask: if modified since: " + HTTPClient.Util.httpDate(((WebDocument) actURLMessage).getLastModified())); } - //conn.stop(); // close connection. todo: Do some caching... + URL realURL; - /* - * conn.disconnect(); - */ - if (isInterrupted) + switch (statusCode) { - System.out.println("FetcherTask: interrupted while reading. File truncated"); - log.log("interrupted while reading. File truncated"); - } - else - { - if (fullBuffer != null) - { - taskState.setState(FT_SCANNING, ipURL); + case 404: // file not found + case 403: // access forbidden - log.log("read file (" + fullBuffer.length + " bytes). Now scanning."); - - if (contentType.startsWith("text/html")) + // if this is an incremental crawl, remove the doc from the repository + if (isIncremental) { - - // ouch. I haven't found a better solution yet. just slower ones. - char[] fullCharBuffer = new char[contentLength]; - new InputStreamReader(new ByteArrayInputStream(fullBuffer)).read(fullCharBuffer); - Tokenizer tok = new Tokenizer(); - tok.setLinkHandler(this); - tok.parse(new SimpleCharArrayReader(fullCharBuffer)); - - taskState.setState(FT_STORING, ipURL); - linkStorage.storeLinks(foundUrls); - WebDocument d = new WebDocument(contextUrl, contentType, statusCode, actURLMessage.getReferer(), contentLength, title, date, hostManager); - d.addField("content", fullCharBuffer); + WebDocument d = (WebDocument) actURLMessage; + d.setResultCode(statusCode); + // the repository will remove the doc if this statuscode is matched docStorage.store(d); } + // otherwise, do nothing + // Todo: we could add an error marker to the referal link + break; + case 304: + // not modified + System.out.println("ftask: -> not modified"); + // "not modified since" + taskState.setState(FT_STORING, ipURL); + // let the repository take care of the links + // it will determine that this is the old document (because it already + // has a docId), and will put back the links associated with it + try + { + WebDocument doc = (WebDocument) this.actURLMessage; + doc.setModified(false); + docStorage.store(doc); + this.bytesRead += doc.getSize(); + } + catch (ClassCastException e) + { + System.out.println("error while casting to WebDoc: " + actURLMessage.getInfo()); + } + break; + case 301: // moved permanently + case 302: // moved temporarily + case 303: // see other + case 307: // temporary redirect + /* + * this is a redirect. save it as a link and return. + * note that we could read the doc from the open connection here, but this could mean + * the filters were useless + */ + realURL = response.getEffectiveURI().toURL(); + foundUrls.add(new URLMessage(realURL, contextUrl, URLMessage.LINKTYPE_REDIRECT, "", hostResolver)); + linkStorage.storeLinks(foundUrls); + break; + default: + // this can be a 30x code that was resolved by the HTTPClient and is passed to us as 200 + // we could turn this off and do it ourselves. But then we'd have to take care that + // we don't get into an endless redirection loop -> i.e. extend URLMessage by a counter + // at the moment we add the real URL to the message queue and mark it as a REDIRECT link + // that way it is added to the visited filter. Then we take care that we don't crawl it again + + // the other possibility is that we receive a "Location:" header along with a 200 status code + // I have experienced that HTTPClient has an error with parsing this, so we do it ourselves + //String location = response.getHeader("Location"); + realURL = response.getEffectiveURI().toURL(); + + /*if(location != null) + { + //System.out.println("interesting: location header with url " + location); + foundUrls.add(new URLMessage(new URL(location), contextUrl, URLMessage.LINKTYPE_REDIRECT, "", hostManager)); + this.base = this.contextUrl = location; + } + else*/ + if(!(realURL.equals(contextUrl))) + { + //System.out.println("interesting: redirect with url " + realURL + " -context: " + contextUrl); + foundUrls.add(new URLMessage(realURL, contextUrl, URLMessage.LINKTYPE_REDIRECT, "", hostResolver)); + this.base = this.contextUrl = realURL; + //System.out.println(response); + + } + + + + + if (isIncremental) + { + // experimental + System.out.println("ftask: -> was modified at " + response.getHeaderAsDate("Last-Modified")); + } + // read up to Constants.FETCHERTASK_MAXFILESIZE bytes into a byte array + taskState.setState(FT_READING, ipURL); + contentType = response.getHeader("Content-Type"); + String length = response.getHeader("Content-Length"); + date = response.getHeaderAsDate("Last-Modified"); + + if (length != null) + { + contentLength = Integer.parseInt(length); + } + log.log("reading"); + realURL = response.getEffectiveURI().toURL(); + if (contentType != null && contentType.startsWith("text/html")) + { + fullBuffer = response.getData(Constants.FETCHERTASK_MAXFILESIZE); + hi.releaseLock(); + // max. 2 MB + if (fullBuffer != null) + { + contentLength = fullBuffer.length; + this.bytesRead += contentLength; + } + + /* + * conn.disconnect(); + */ + if (isInterrupted) + { + System.out.println("FetcherTask: interrupted while reading. File truncated"); + log.log("interrupted while reading. File truncated"); + } + else + { + if (fullBuffer != null) + { + taskState.setState(FT_SCANNING, ipURL); + + log.log("read file (" + fullBuffer.length + " bytes). Now scanning."); + + // convert the bytes to Java characters + // ouch. I haven't found a better solution yet. just slower ones. + // remember: for better runtime performance avoid decorators, since they + // multiply function calls + char[] fullCharBuffer = new char[contentLength]; + new InputStreamReader(new ByteArrayInputStream(fullBuffer)).read(fullCharBuffer); + Tokenizer tok = new Tokenizer(); + tok.setLinkHandler(this); + tok.parse(new SimpleCharArrayReader(fullCharBuffer)); + + taskState.setState(FT_STORING, ipURL); + linkStorage.storeLinks(foundUrls); + WebDocument d; + if (isIncremental) + { + d = ((WebDocument) this.actURLMessage); + d.setModified(true); + // file is new or newer + d.setUrl(contextUrl); + d.setMimeType(contentType); + d.setResultCode(statusCode); + d.setSize(contentLength); + d.setTitle(title); + d.setLastModified(date); + } + else + { + d = new WebDocument(contextUrl, contentType, statusCode, actURLMessage.getReferer(), contentLength, title, date, hostResolver); + } + d.addField("content", fullCharBuffer); + d.addField("contentBytes", fullBuffer); + docStorage.store(d); + } + + log.log("scanned"); + } + + log.log("stored"); + } else { // System.out.println("Discovered unknown content type: " + contentType + " at " + urlString); //errorLog.log("[" + threadNr + "] Discovered unknown content type at " + urlString + ": " + contentType + ". just storing"); taskState.setState(FT_STORING, ipURL); linkStorage.storeLinks(foundUrls); - WebDocument d = new WebDocument(contextUrl, contentType, statusCode, actURLMessage.getReferer(), contentLength, title, date, hostManager); - d.addField("content", fullBuffer); + WebDocument d = new WebDocument(contextUrl, contentType, statusCode, actURLMessage.getReferer(), + /* + * contentLength + */ + 0, title, date, hostResolver); + //d.addField("content", fullBuffer); + //d.addField("content", null); docStorage.store(d); } - log.log("scanned"); - } - - log.log("stored"); + break; } + /* + * switch + */ + //conn.stop(); // close connection. todo: Do some caching... + } catch (InterruptedIOException e) { @@ -444,7 +607,7 @@ public class FetcherTask //System.out.println("[" + threadNr + "] FetcherTask: File not Found: " + this.actURLMessage.getUrl()); errorLog.log("error: File not Found: " + this.actURLMessage.getUrl()); } - catch(NoRouteToHostException e) + catch (NoRouteToHostException e) { // router is down or firewall prevents to connect hi.setReachable(false); @@ -453,7 +616,7 @@ public class FetcherTask // e.printStackTrace(); errorLog.log("error: " + e.getClass().getName() + ": " + e.getMessage()); } - catch(ConnectException e) + catch (ConnectException e) { // no server is listening at this port hi.setReachable(false); @@ -461,6 +624,7 @@ public class FetcherTask //System.out.println("[" + threadNr + "] " + e.getClass().getName() + ": " + e.getMessage()); // e.printStackTrace(); errorLog.log("error: " + e.getClass().getName() + ": " + e.getMessage()); + } catch (SocketException e) { @@ -469,7 +633,7 @@ public class FetcherTask errorLog.log("error: " + e.getClass().getName() + ": " + e.getMessage()); } - catch(UnknownHostException e) + catch (UnknownHostException e) { // IP Address not to be determined hi.setReachable(false); @@ -500,10 +664,10 @@ public class FetcherTask e.printStackTrace(); System.out.println("[" + threadNr + "]: stopping"); errorLog.log("error: " + e.getClass().getName() + ": " + e.getMessage() + "; stopping"); - } finally { + hi.releaseLock(); if (isInterrupted) { @@ -521,7 +685,6 @@ public class FetcherTask */ taskState.setState(FT_CLOSING); conn.stop(); - taskState.setState(FT_READY); foundUrls = null; } @@ -529,7 +692,8 @@ public class FetcherTask /** * the interrupt method. not in use since the change to HTTPClient - * @TODO decide if we need this anymore + * + * @TODO decide if we need this anymore */ public void interrupt() { @@ -563,11 +727,12 @@ public class FetcherTask /** - * this is called whenever a link was found in the current document, - * Don't create too many objects here, as this will be called - * millions of times + * this is called whenever a link was found in the current document, Don't + * create too many objects here, as this will be called millions of times * - * @param link Description of the Parameter + * @param link Description of the Parameter + * @param anchor Description of the Parameter + * @param isFrame Description of the Parameter */ public void handleLink(String link, String anchor, boolean isFrame) { @@ -599,8 +764,11 @@ public class FetcherTask // relative url url = new URL(base, link); } - - URLMessage urlMessage = new URLMessage(url, contextUrl, isFrame, anchor, hostManager); + if(url.getPath() == null || url.getPath().length() == 0) + { + url = new URL(url.getProtocol(), url.getHost(), url.getPort(), "/" + url.getFile()); + } + URLMessage urlMessage = new URLMessage(url, contextUrl, isFrame ? URLMessage.LINKTYPE_FRAME : URLMessage.LINKTYPE_ANCHOR, anchor, hostResolver); //String urlString = urlMessage.getURLString(); @@ -670,6 +838,11 @@ public class FetcherTask * /System.out.println("Task " + this.taskNr + " finished (" + totalRead + " bytes in " + timeElapsed + " ms with " + totalRead / (timeElapsed / 1000.0) + " bytes/s)"); * } */ + /** + * Gets the bytesRead attribute of the FetcherTask object + * + * @return The bytesRead value + */ public long getBytesRead() { return bytesRead;