Source code for linkcheck.director.aggregator

# Copyright (C) 2006-2014 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Aggregate needed object instances for checker threads.
"""
import threading

import requests
import time
import urllib.parse
import random
from .. import log, LOG_CHECK, strformat, LinkCheckerError
from ..decorators import synchronized
from ..cache import urlqueue
from ..htmlutil import loginformsearch
from ..cookies import from_file
from . import logger, status, checker, interrupter


_threads_lock = threading.RLock()
_hosts_lock = threading.RLock()
_downloadedbytes_lock = threading.RLock()


[docs] def new_request_session(config, cookies): """Create a new request session.""" session = requests.Session() if cookies: session.cookies = cookies session.max_redirects = config["maxhttpredirects"] session.headers.update( {"User-Agent": config["useragent"]} ) if config["cookiefile"]: try: for cookie in from_file(config["cookiefile"]): session.cookies.set_cookie(cookie) except Exception as msg: log.error( LOG_CHECK, _("Could not parse cookie file: %s. %s"), config["cookiefile"], msg ) return session
[docs] class Aggregate: """Store thread-safe data collections for checker threads.""" wait_time_min_default = 0.1 wait_time_max_default = 0.6 def __init__(self, config, urlqueue, robots_txt, plugin_manager, result_cache): """Store given link checking objects.""" self.config = config self.urlqueue = urlqueue self.logger = logger.Logger(config) self.threads = [] self.request_sessions = {} self.robots_txt = robots_txt self.plugin_manager = plugin_manager self.result_cache = result_cache self.times = {} self.maxrated = {} self.cookies = None requests_per_second = config["maxrequestspersecond"] self.wait_time_min = 1.0 / requests_per_second self.wait_time_max = 6 * self.wait_time_min self.downloaded_bytes = 0
[docs] def visit_loginurl(self): """Check for a login URL and visit it.""" url = self.config["loginurl"] if not url: return user, password = self.config.get_user_password(url) if not user and not password: raise LinkCheckerError( "loginurl is configured but neither user nor password are set" ) session = new_request_session(self.config, self.cookies) log.debug(LOG_CHECK, "Getting login form %s", url) kwargs = dict(timeout=self.config["timeout"]) # XXX: sslverify? can we reuse HttpUrl.get_request_kwargs() # somehow? response = session.get(url, **kwargs) response.raise_for_status() cgiuser = self.config["loginuserfield"] if user else None cgipassword = self.config["loginpasswordfield"] if password else None form = loginformsearch.search_form(response.text, cgiuser, cgipassword) if not form: raise LinkCheckerError("Login form not found at %s" % url) if user: form.data[cgiuser] = user if password: form.data[cgipassword] = password for key, value in self.config["loginextrafields"].items(): form.data[key] = value formurl = urllib.parse.urljoin(url, form.url) log.debug(LOG_CHECK, "Posting login data to %s", formurl) response = session.post(formurl, data=form.data, **kwargs) response.raise_for_status() self.cookies = session.cookies if len(self.cookies) == 0: raise LinkCheckerError("No cookies set by login URL %s" % url)
[docs] @synchronized(_threads_lock) def start_threads(self): """Spawn threads for URL checking and status printing.""" if self.config["status"]: t = status.Status(self, self.config["status_wait_seconds"]) t.start() self.threads.append(t) if self.config["maxrunseconds"]: t = interrupter.Interrupt(self.config["maxrunseconds"]) t.start() self.threads.append(t) num = self.config["threads"] if num > 0: for dummy in range(num): t = checker.Checker( self.urlqueue, self.logger, self.add_request_session ) self.threads.append(t) t.start() else: self.request_sessions[threading.get_ident()] = new_request_session( self.config, self.cookies ) checker.check_urls(self.urlqueue, self.logger)
[docs] @synchronized(_threads_lock) def add_request_session(self): """Add a request session for current thread.""" session = new_request_session(self.config, self.cookies) self.request_sessions[threading.get_ident()] = session
[docs] @synchronized(_threads_lock) def get_request_session(self): """Get the request session for current thread.""" return self.request_sessions[threading.get_ident()]
[docs] @synchronized(_hosts_lock) def wait_for_host(self, host): """Throttle requests to one host.""" t = time.time() if host in self.times: due_time = self.times[host] if due_time > t: wait = due_time - t time.sleep(wait) t = time.time() if host in self.maxrated: wait_time_min, wait_time_max = self.wait_time_min, self.wait_time_max else: wait_time_min = max(self.wait_time_min, self.wait_time_min_default) wait_time_max = max(self.wait_time_max, self.wait_time_max_default) log.debug(LOG_CHECK, "Min wait time: %s Max wait time: %s for host: %s", wait_time_min, wait_time_max, host) wait_time = random.uniform(wait_time_min, wait_time_max) self.times[host] = t + wait_time
[docs] @synchronized(_hosts_lock) def set_maxrated_for_host(self, host): """Remove the limit on the maximum request rate for a host.""" self.maxrated[host] = True
[docs] @synchronized(_threads_lock) def print_active_threads(self): """Log all currently active threads.""" debug = log.is_debug(LOG_CHECK) if debug: first = True for name in self.get_check_threads(): if first: log.info(LOG_CHECK, _("These URLs are still active:")) first = False log.info(LOG_CHECK, name[12:]) args = dict( num=len( [x for x in self.threads if x.name.startswith("CheckThread-")] ), timeout=strformat.strduration_long(self.config["aborttimeout"]), ) log.info( LOG_CHECK, _( "%(num)d URLs are still active. After a timeout of %(timeout)s" " the active URLs will stop." ) % args, )
[docs] @synchronized(_threads_lock) def get_check_threads(self): """Return iterator of checker threads.""" for t in self.threads: if t.name.startswith("CheckThread-"): yield t.name
[docs] def cancel(self): """Empty the URL queue.""" self.urlqueue.do_shutdown()
[docs] def abort(self): """Print still-active URLs and empty the URL queue.""" self.print_active_threads() self.cancel() timeout = self.config["aborttimeout"] try: self.urlqueue.join(timeout=timeout) except urlqueue.Timeout: log.warn( LOG_CHECK, "Abort timed out after %d seconds, stopping application." % timeout, ) raise KeyboardInterrupt()
[docs] @synchronized(_threads_lock) def remove_stopped_threads(self): """Remove the stopped threads from the internal thread list.""" self.threads = [t for t in self.threads if t.is_alive()]
[docs] @synchronized(_threads_lock) def finish(self): """Wait for checker threads to finish.""" if not self.urlqueue.empty(): # This happens when all checker threads died. self.cancel() for t in self.threads: t.stop() for t in self.threads: t.join(timeout=1.0)
[docs] @synchronized(_threads_lock) def is_finished(self): """Determine if checking is finished.""" self.remove_stopped_threads() return self.urlqueue.empty() and not self.threads
[docs] @synchronized(_downloadedbytes_lock) def add_downloaded_bytes(self, numbytes): """Add to number of downloaded bytes.""" self.downloaded_bytes += numbytes
[docs] def end_log_output(self, **kwargs): """Print ending output to log.""" kwargs.update( dict( downloaded_bytes=self.downloaded_bytes, num_urls=len(self.result_cache), ) ) self.logger.end_log_output(**kwargs)