123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- '''
- Extract relevant informations from URL.
- Most of the code comes from jarun/buku, licensed under GPLv3.
- '''
- import os
- import certifi
- import cgi
- from logging import getLogger
- import re
- import urllib3
- from urllib3.exceptions import LocationParseError
- from urllib3.util import parse_url, make_headers
- from bs4 import BeautifulSoup
- logger = getLogger()
- MYHEADERS = None # Default dictionary of headers
- USER_AGENT = (
- "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0"
- )
- SKIP_MIMES = {".pdf", ".txt"}
- def parse_decoded_page(page):
- """Fetch title, description and keywords from decoded HTML page.
- Parameters
- ----------
- page : str
- Decoded HTML page.
- Returns
- -------
- tuple
- (title, description, keywords).
- """
- title = None
- desc = None
- keys = None
- soup = BeautifulSoup(page, "html5lib")
- try:
- title = soup.find("title").text.strip().replace("\n", " ")
- if title:
- title = re.sub(r"\s{2,}", " ", title)
- except Exception as e:
- logger.debug(e)
- description = (
- soup.find("meta", attrs={"name": "description"})
- or soup.find("meta", attrs={"name": "Description"})
- or soup.find("meta", attrs={"property": "description"})
- or soup.find("meta", attrs={"property": "Description"})
- or soup.find("meta", attrs={"name": "og:description"})
- or soup.find("meta", attrs={"name": "og:Description"})
- or soup.find("meta", attrs={"property": "og:description"})
- or soup.find("meta", attrs={"property": "og:Description"})
- )
- try:
- if description:
- desc = description.get("content").strip()
- if desc:
- desc = re.sub(r"\s{2,}", " ", desc)
- except Exception as e:
- logger.debug(e)
- keywords = soup.find("meta", attrs={"name": "keywords"}) or soup.find(
- "meta", attrs={"name": "Keywords"}
- )
- try:
- if keywords:
- keys = keywords.get("content").strip().replace("\n", " ")
- keys = re.sub(r"\s{2,}", " ", keys)
- if is_unusual_tag(keys):
- if keys not in (title, desc):
- logger.debug("keywords to description: %s", keys)
- if desc:
- desc = desc + "\n## " + keys
- else:
- desc = "* " + keys
- keys = None
- except Exception as e:
- logger.debug(e)
- logger.debug("title: %s", title)
- logger.debug("desc : %s", desc)
- logger.debug("keys : %s", keys)
- return (title, desc, keys)
- def get_data_from_page(resp):
- """Detect HTTP response encoding and invoke parser with decoded data.
- Parameters
- ----------
- resp : HTTP response
- Response from GET request.
- Returns
- -------
- tuple
- (title, description, keywords).
- """
- try:
- soup = BeautifulSoup(resp.data, "html.parser")
- except Exception as e:
- logger.error("get_data_from_page(): %s", e)
- try:
- charset = None
- if soup.meta and soup.meta.get("charset") is not None:
- charset = soup.meta.get("charset")
- elif "content-type" in resp.headers:
- _, params = cgi.parse_header(resp.headers["content-type"])
- if params.get("charset") is not None:
- charset = params.get("charset")
- if not charset and soup:
- meta_tag = soup.find("meta", attrs={"http-equiv": "Content-Type"})
- if meta_tag:
- _, params = cgi.parse_header(meta_tag.attrs["content"])
- charset = params.get("charset", charset)
- if charset:
- logger.debug("charset: %s", charset)
- title, desc, keywords = parse_decoded_page(
- resp.data.decode(charset, errors="replace")
- )
- else:
- title, desc, keywords = parse_decoded_page(
- resp.data.decode(errors="replace")
- )
- return (title, desc, keywords)
- except Exception as e:
- logger.error(e)
- return (None, None, None)
- def get_PoolManager(MYPROXY=None):
- """Creates a pool manager with proxy support, if applicable.
- Returns
- -------
- ProxyManager or PoolManager
- ProxyManager if https_proxy is defined, PoolManager otherwise.
- """
- if MYPROXY:
- return urllib3.ProxyManager(
- MYPROXY,
- num_pools=1,
- headers=MYHEADERS,
- timeout=15,
- cert_reqs="CERT_REQUIRED",
- ca_certs=certifi.where(),
- )
- return urllib3.PoolManager(
- num_pools=1,
- headers=MYHEADERS,
- timeout=15,
- cert_reqs="CERT_REQUIRED",
- ca_certs=certifi.where(),
- )
- def network_handler(url, http_head=False):
- """Handle server connection and redirections.
- Parameters
- ----------
- url : str
- URL to fetch.
- http_head : bool
- If True, send only HTTP HEAD request. Default is False.
- Returns
- -------
- tuple
- (title, description, tags, recognized mime, bad url).
- """
- page_title = None
- page_desc = None
- page_keys = None
- exception = False
- if is_nongeneric_url(url) or is_bad_url(url):
- return (None, None, None, 0, 1)
- if is_ignored_mime(url) or http_head:
- method = "HEAD"
- else:
- method = "GET"
- if not MYHEADERS:
- gen_headers()
- try:
- manager = get_PoolManager()
- while True:
- resp = manager.request(method, url)
- if resp.status == 200:
- if method == "GET":
- page_title, page_desc, page_keys = get_data_from_page(resp)
- elif resp.status == 403 and url.endswith("/"):
- # HTTP response Forbidden
- # Handle URLs in the form of https://www.domain.com/
- # which fail when trying to fetch resource '/'
- # retry without trailing '/'
- logger.debug("Received status 403: retrying...")
- # Remove trailing /
- url = url[:-1]
- resp.close()
- continue
- else:
- logger.error("[%s] %s", resp.status, resp.reason)
- if resp:
- resp.close()
- break
- except Exception as e:
- logger.error("network_handler(): %s", e)
- exception = True
- finally:
- if manager:
- manager.clear()
- if exception:
- return (None, None, None, 0, 0)
- if method == "HEAD":
- return ("", "", "", 1, 0)
- if page_title is None:
- return ("", page_desc, page_keys, 0, 0)
- return (page_title, page_desc, page_keys, 0, 0)
- def is_bad_url(url):
- """Check if URL is malformed.
- .. Note:: This API is not bulletproof but works in most cases.
- Parameters
- ----------
- url : str
- URL to scan.
- Returns
- -------
- bool
- True if URL is malformed, False otherwise.
- """
- # Get the netloc token
- try:
- netloc = parse_url(url).netloc
- except LocationParseError as e:
- logger.error("%s, URL: %s", e, url)
- return True
- if not netloc:
- # Try of prepend '//' and get netloc
- netloc = parse_url("//" + url).netloc
- if not netloc:
- return True
- logger.debug("netloc: %s", netloc)
- # netloc cannot start or end with a '.'
- if netloc.startswith(".") or netloc.endswith("."):
- return True
- # netloc should have at least one '.'
- if netloc.rfind(".") < 0:
- return True
- return False
- def is_nongeneric_url(url):
- """Returns True for URLs which are non-http and non-generic.
- Parameters
- ----------
- url : str
- URL to scan.
- Returns
- -------
- bool
- True if URL is a non-generic URL, False otherwise.
- """
- ignored_prefix = ["about:", "apt:", "chrome://", "file://", "place:"]
- for prefix in ignored_prefix:
- if url.startswith(prefix):
- return True
- return False
- def is_unusual_tag(tagstr):
- """Identify unusual tags with word to comma ratio > 3.
- Parameters
- ----------
- tagstr : str
- tag string to check.
- Returns
- -------
- bool
- True if valid tag else False.
- """
- if not tagstr:
- return False
- nwords = len(tagstr.split())
- ncommas = tagstr.count(",") + 1
- if nwords / ncommas > 3:
- return True
- return False
- def is_ignored_mime(url):
- """Check if URL links to ignored MIME.
- .. Note:: Only a 'HEAD' request is made for these URLs.
- Parameters
- ----------
- url : str
- URL to scan.
- Returns
- -------
- bool
- True if URL links to ignored MIME, False otherwise.
- """
- for mime in SKIP_MIMES:
- if url.lower().endswith(mime):
- logger.debug("matched MIME: %s", mime)
- return True
- return False
- def gen_headers():
- """Generate headers for network connection."""
- global MYHEADERS, MYPROXY
- MYHEADERS = {
- "Accept-Encoding": "gzip,deflate",
- "User-Agent": USER_AGENT,
- "Accept": "*/*",
- "Cookie": "",
- "DNT": "1",
- }
- MYPROXY = os.environ.get("https_proxy")
- if MYPROXY:
- try:
- url = parse_url(MYPROXY)
- except Exception as e:
- logger.error(e)
- return
- # Strip username and password (if present) and update headers
- if url.auth:
- MYPROXY = MYPROXY.replace(url.auth + "@", "")
- auth_headers = make_headers(basic_auth=url.auth)
- MYHEADERS.update(auth_headers)
- logger.debug("proxy: [%s]", MYPROXY)
|