initial commit

This commit is contained in:
boyska 2021-03-21 12:09:07 +01:00
commit 433ef46440
9 changed files with 837 additions and 0 deletions

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
*.egg-info/
/build/
/dist/
__pycache__/
.mypy_cache/

96
README.md Normal file
View file

@ -0,0 +1,96 @@
What
=======
- keeps track of bookmarks
- has the concept of folder well integrated
- supports tags and descriptions, too
- one file per bookmark
- CLI-first
- python3
Storage
-----------
### One file per bookmark
If you want to store a tree, let's just rely on filesystem!
Let's learn [pass](https://www.passwordstore.org/) lesson: when you keep things simple, people can do crazy
shit.
One quick example: if you want to share some folder with a friend of yours, having a REAL folder is the best
way to make that simple. Easy share, few conflicts, easy review.
You can have some folder under git control, or rsync-ed, or shared through nextcloud.
### Filename
What about the filename? Some approaches:
- GUID
- shasum of URL ⇒ you can't modify it!
### File content
First line is the URL.
now we need to store description and tags.
We want to be grep-friendly, so let's use a format borrowed from email (or from debian/control, if you want!):
URI: https://riseup.net/
Description: They are nice folks
Tag: privacy
Tag: change the world
Tag: email
Just avoid newlines in fields and you can do good queries just using grep
### Performance
Won't this be very slow? maybe! If that happens, we'll work it around adding an index, not changing the
storage format.
Interface
-------------
### Searching/opening
Though non optimal, fzf, peco or similar tools can be a very good start: nice interface and support for
opening multiple URLs at a single time.
Dreaming a bit more, it would be nice to:
- be able to both navigate the tree and filter results
- include frecency!
### Moving stuff
If we have filtering, it's nice to be able to use it to move things around, or delete, or whatever.
### Mass tagging
Having tools that can be used to automatically apply/remove tags to returned bookmarks is very nice
### Examples
```
mxb list
mxb list coding/python
firefox "$(mxb list | peco | awk '{ print $ NF }')"
mxb list | peco | cut -f 1 | mxb tag +urgent
mxb list | peco | cut -f 1 | mxb mv work/todo
mxb mv coding/python/exercises/dfkljedua work/todo/
xsel -b | mxb add
```
TODO
=========
- Core:
- move()
- tag()
- CLI: write a cli!
- helper:
- write a marxbook-search helper based on peco

1
marxbook/__init__.py Normal file
View file

@ -0,0 +1 @@
from .store import Store, Serializer

99
marxbook/cli.py Executable file
View file

@ -0,0 +1,99 @@
#!/usr/bin/env python3
from argparse import ArgumentParser
import os
import subprocess
import sys
import tempfile
import marxbook
def get_parser():
p = ArgumentParser()
p.add_argument("--store-dir")
p.add_argument("--editor", default="sensible-editor")
p.add_argument("--batch", default=False, action="store_true")
p.set_defaults(func=None)
subcommands = p.add_subparsers(help="Sub-commands")
list_p = subcommands.add_parser("list")
list_p.add_argument("folder", nargs="?", default="")
list_p.set_defaults(func=main_list)
add_p = subcommands.add_parser("add")
add_p.add_argument("--folder", default="")
add_p.add_argument("--tag", help="Comma-separated list of tags", default="")
add_p.add_argument("--title", help="If omitted, auto-fetch")
add_p.add_argument("--description", help="If omitted, auto-fetch")
add_p.add_argument("url", nargs="?")
add_p.set_defaults(func=main_add)
return p
def main():
p = get_parser()
args = p.parse_args()
if args.func is None:
print("Must specify a subcommand", file=sys.stderr)
return 2
store = marxbook.Store(args.store_dir)
args.func(store, args)
def main_list(store, args):
for mark in store.folder(args.folder):
tag = ",".join(mark["Tag"])
line = [mark["Path"], tag, mark["Title"], mark["Url"]]
print("\t".join(line))
def edit_before_add(data: dict, args) -> dict:
ser = marxbook.Serializer()
fd, fpath = tempfile.mkstemp()
buf = os.fdopen(fd, "w")
buf.write(ser.encode(data))
buf.close()
proc = subprocess.Popen([args.editor, fpath])
proc.communicate()
with open(fpath) as buf:
read_data = ser.decode(buf.read())
os.unlink(fpath)
data = {}
for key in read_data:
data[key.lower()] = read_data[key]
return data
def main_add(store, args):
store = store.folder(args.folder)
batch = args.batch
if args.url is not None:
urls = [args.url]
else:
batch = True
urls = []
for line in sys.stdin.readlines():
urls.append(line.strip())
for url in urls:
data = dict(title=args.title, description=args.description, url=url)
data['tag'] = [t.strip() for t in args.tag.split(",")]
if args.title is None or args.description is None:
_title, _description, _keys, mime, bad = marxbook.extract.network_handler(url)
if not args.title:
data["title"] = _title
if not args.description:
data["description"] = _description
if not batch:
data = edit_before_add(data, args)
store.add(**data)
print(urls)
if __name__ == "__main__":
ret = main()
if type(ret) is int:
sys.exit(ret)

394
marxbook/extract.py Normal file
View file

@ -0,0 +1,394 @@
'''
Extract relevant informations from URL.
Most of the code comes from jarun/buku, licensed under GPLv3.
'''
import os
import certifi
import cgi
from logging import getLogger
import re
import urllib3
from urllib3.exceptions import LocationParseError
from urllib3.util import parse_url, make_headers
from bs4 import BeautifulSoup
logger = getLogger()
MYHEADERS = None # Default dictionary of headers
USER_AGENT = (
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:61.0) Gecko/20100101 Firefox/61.0"
)
SKIP_MIMES = {".pdf", ".txt"}
def parse_decoded_page(page):
"""Fetch title, description and keywords from decoded HTML page.
Parameters
----------
page : str
Decoded HTML page.
Returns
-------
tuple
(title, description, keywords).
"""
title = None
desc = None
keys = None
soup = BeautifulSoup(page, "html5lib")
try:
title = soup.find("title").text.strip().replace("\n", " ")
if title:
title = re.sub(r"\s{2,}", " ", title)
except Exception as e:
logger.debug(e)
description = (
soup.find("meta", attrs={"name": "description"})
or soup.find("meta", attrs={"name": "Description"})
or soup.find("meta", attrs={"property": "description"})
or soup.find("meta", attrs={"property": "Description"})
or soup.find("meta", attrs={"name": "og:description"})
or soup.find("meta", attrs={"name": "og:Description"})
or soup.find("meta", attrs={"property": "og:description"})
or soup.find("meta", attrs={"property": "og:Description"})
)
try:
if description:
desc = description.get("content").strip()
if desc:
desc = re.sub(r"\s{2,}", " ", desc)
except Exception as e:
logger.debug(e)
keywords = soup.find("meta", attrs={"name": "keywords"}) or soup.find(
"meta", attrs={"name": "Keywords"}
)
try:
if keywords:
keys = keywords.get("content").strip().replace("\n", " ")
keys = re.sub(r"\s{2,}", " ", keys)
if is_unusual_tag(keys):
if keys not in (title, desc):
logger.debug("keywords to description: %s", keys)
if desc:
desc = desc + "\n## " + keys
else:
desc = "* " + keys
keys = None
except Exception as e:
logger.debug(e)
logger.debug("title: %s", title)
logger.debug("desc : %s", desc)
logger.debug("keys : %s", keys)
return (title, desc, keys)
def get_data_from_page(resp):
"""Detect HTTP response encoding and invoke parser with decoded data.
Parameters
----------
resp : HTTP response
Response from GET request.
Returns
-------
tuple
(title, description, keywords).
"""
try:
soup = BeautifulSoup(resp.data, "html.parser")
except Exception as e:
logger.error("get_data_from_page(): %s", e)
try:
charset = None
if soup.meta and soup.meta.get("charset") is not None:
charset = soup.meta.get("charset")
elif "content-type" in resp.headers:
_, params = cgi.parse_header(resp.headers["content-type"])
if params.get("charset") is not None:
charset = params.get("charset")
if not charset and soup:
meta_tag = soup.find("meta", attrs={"http-equiv": "Content-Type"})
if meta_tag:
_, params = cgi.parse_header(meta_tag.attrs["content"])
charset = params.get("charset", charset)
if charset:
logger.debug("charset: %s", charset)
title, desc, keywords = parse_decoded_page(
resp.data.decode(charset, errors="replace")
)
else:
title, desc, keywords = parse_decoded_page(
resp.data.decode(errors="replace")
)
return (title, desc, keywords)
except Exception as e:
logger.error(e)
return (None, None, None)
def get_PoolManager(MYPROXY=None):
"""Creates a pool manager with proxy support, if applicable.
Returns
-------
ProxyManager or PoolManager
ProxyManager if https_proxy is defined, PoolManager otherwise.
"""
if MYPROXY:
return urllib3.ProxyManager(
MYPROXY,
num_pools=1,
headers=MYHEADERS,
timeout=15,
cert_reqs="CERT_REQUIRED",
ca_certs=certifi.where(),
)
return urllib3.PoolManager(
num_pools=1,
headers=MYHEADERS,
timeout=15,
cert_reqs="CERT_REQUIRED",
ca_certs=certifi.where(),
)
def network_handler(url, http_head=False):
"""Handle server connection and redirections.
Parameters
----------
url : str
URL to fetch.
http_head : bool
If True, send only HTTP HEAD request. Default is False.
Returns
-------
tuple
(title, description, tags, recognized mime, bad url).
"""
page_title = None
page_desc = None
page_keys = None
exception = False
if is_nongeneric_url(url) or is_bad_url(url):
return (None, None, None, 0, 1)
if is_ignored_mime(url) or http_head:
method = "HEAD"
else:
method = "GET"
if not MYHEADERS:
gen_headers()
try:
manager = get_PoolManager()
while True:
resp = manager.request(method, url)
if resp.status == 200:
if method == "GET":
page_title, page_desc, page_keys = get_data_from_page(resp)
elif resp.status == 403 and url.endswith("/"):
# HTTP response Forbidden
# Handle URLs in the form of https://www.domain.com/
# which fail when trying to fetch resource '/'
# retry without trailing '/'
logger.debug("Received status 403: retrying...")
# Remove trailing /
url = url[:-1]
resp.close()
continue
else:
logger.error("[%s] %s", resp.status, resp.reason)
if resp:
resp.close()
break
except Exception as e:
logger.error("network_handler(): %s", e)
exception = True
finally:
if manager:
manager.clear()
if exception:
return (None, None, None, 0, 0)
if method == "HEAD":
return ("", "", "", 1, 0)
if page_title is None:
return ("", page_desc, page_keys, 0, 0)
return (page_title, page_desc, page_keys, 0, 0)
def is_bad_url(url):
"""Check if URL is malformed.
.. Note:: This API is not bulletproof but works in most cases.
Parameters
----------
url : str
URL to scan.
Returns
-------
bool
True if URL is malformed, False otherwise.
"""
# Get the netloc token
try:
netloc = parse_url(url).netloc
except LocationParseError as e:
logger.error("%s, URL: %s", e, url)
return True
if not netloc:
# Try of prepend '//' and get netloc
netloc = parse_url("//" + url).netloc
if not netloc:
return True
logger.debug("netloc: %s", netloc)
# netloc cannot start or end with a '.'
if netloc.startswith(".") or netloc.endswith("."):
return True
# netloc should have at least one '.'
if netloc.rfind(".") < 0:
return True
return False
def is_nongeneric_url(url):
"""Returns True for URLs which are non-http and non-generic.
Parameters
----------
url : str
URL to scan.
Returns
-------
bool
True if URL is a non-generic URL, False otherwise.
"""
ignored_prefix = ["about:", "apt:", "chrome://", "file://", "place:"]
for prefix in ignored_prefix:
if url.startswith(prefix):
return True
return False
def is_unusual_tag(tagstr):
"""Identify unusual tags with word to comma ratio > 3.
Parameters
----------
tagstr : str
tag string to check.
Returns
-------
bool
True if valid tag else False.
"""
if not tagstr:
return False
nwords = len(tagstr.split())
ncommas = tagstr.count(",") + 1
if nwords / ncommas > 3:
return True
return False
def is_ignored_mime(url):
"""Check if URL links to ignored MIME.
.. Note:: Only a 'HEAD' request is made for these URLs.
Parameters
----------
url : str
URL to scan.
Returns
-------
bool
True if URL links to ignored MIME, False otherwise.
"""
for mime in SKIP_MIMES:
if url.lower().endswith(mime):
logger.debug("matched MIME: %s", mime)
return True
return False
def gen_headers():
"""Generate headers for network connection."""
global MYHEADERS, MYPROXY
MYHEADERS = {
"Accept-Encoding": "gzip,deflate",
"User-Agent": USER_AGENT,
"Accept": "*/*",
"Cookie": "",
"DNT": "1",
}
MYPROXY = os.environ.get("https_proxy")
if MYPROXY:
try:
url = parse_url(MYPROXY)
except Exception as e:
logger.error(e)
return
# Strip username and password (if present) and update headers
if url.auth:
MYPROXY = MYPROXY.replace(url.auth + "@", "")
auth_headers = make_headers(basic_auth=url.auth)
MYHEADERS.update(auth_headers)
logger.debug("proxy: [%s]", MYPROXY)

89
marxbook/store.py Normal file
View file

@ -0,0 +1,89 @@
import hashlib
import re
from pathlib import Path
import logging
logger = logging.getLogger()
def get_fname(url):
m = hashlib.md5()
m.update(url.encode('utf8'))
return m.hexdigest()
class Store:
def __init__(self, basedir: Path = None):
if basedir is None:
basedir = Path('~/.local/share/marxbook/bookmarks/').expanduser()
self.basedir = basedir
self.serializer = Serializer()
def add(self, url: str, title=None, tag=[], description=''):
dest = self.basedir
dest.mkdir(parents=True, exist_ok=True)
fname = get_fname(url)
fpath = dest / fname
content = self.serializer.encode(dict(
url=url, title=title, tags=tag, description=description))
with fpath.open('w') as buf:
buf.write(content)
def get(self, path: str):
fpath = self.basedir / path
with fpath.open() as buf:
return self.serializer.decode(buf.read())
def __iter__(self):
for urlfile in self.basedir.glob('**/*'):
if not urlfile.is_file():
continue
data = self.get(urlfile)
ret = { 'Path': str(urlfile.relative_to(self.basedir)) }
ret.update(data)
yield ret
def folder(self, folder: str):
return Store(self.basedir / folder)
HEADER_LINE = re.compile(r'^([^:]+): (.*)$')
class Serializer:
def __init__(self):
pass
def encode(self, data: dict) -> str:
m = ''
tags = data.pop('tags', []) # those are special!
for key in data:
m += '%s: %s\n' % (key.title(), str(data[key]).replace('\n', ' '))
for tag in tags:
m += '%s: %s\n' % ('Tag', tag)
return m
def decode(self, content: str) -> dict:
d: dict = {'Tag': []}
for num, line in enumerate(content.split('\n'), 1):
if not line.strip():
continue
m = HEADER_LINE.match(line)
if m is None:
logger.error("Invalid line %d" % num)
continue
key, value = m.groups()
key = key.title()
if key == 'Tag':
d[key].append(value)
else:
d[key] = value
return d
if __name__ == '__main__':
import sys
s = Store()
# print(s.get(sys.argv[1]))
for line in s.list(sys.argv[1]):
print(line)

13
misc/buku_import.py Executable file
View file

@ -0,0 +1,13 @@
from pathlib import Path
import sqlite3
def import_from_buku(store, buku_path: Path = None):
if buku_path is None:
buku_path = Path('~/.local/share/buku/bookmarks.db').expanduser()
conn = sqlite3.connect(buku_path)
cur = conn.cursor()
query = '''SELECT URL, metadata, tags, desc FROM bookmarks'''
for url, title, tags, desc in cur.execute(query):
tags = [t.strip() for t in tags.split(',')]
store.add('', url=url, title=title, tags=tags, description=desc)

104
misc/ff_import.py Executable file
View file

@ -0,0 +1,104 @@
import sqlite3
from marxbook import Store
from pathlib import Path
import sys
import logging
logger = logging.getLogger()
def is_nongeneric_url(url):
"""Returns True for URLs which are non-http and non-generic.
Parameters
----------
url : str
URL to scan.
Returns
-------
bool
True if URL is a non-generic URL, False otherwise.
"""
ignored_prefix = ["about:", "apt:", "chrome://", "file://", "place:"]
for prefix in ignored_prefix:
if url.startswith(prefix):
return True
return False
def load_firefox_database(store: Store, path):
"""Connect to Firefox sqlite db and import bookmarks into BukuDb.
Parameters
----------
path : str
Path to Firefox bookmarks sqlite database.
"""
path = Path(path).expanduser()
# Connect to input DB
if sys.version_info >= (3, 4, 4):
# Python 3.4.4 and above
conn = sqlite3.connect("file:%s?mode=ro" % path, uri=True)
else:
conn = sqlite3.connect(path)
cur = conn.cursor()
res = cur.execute(
"SELECT DISTINCT fk, parent, title FROM moz_bookmarks WHERE type=1"
)
# get id's and remove duplicates
for fk, parent_id, bm_title in res.fetchall():
# get the url
res = cur.execute("SELECT url FROM moz_places where id={}".format(fk))
url = res.fetchone()[0]
if is_nongeneric_url(url):
continue
# get tags
res = cur.execute(
"SELECT parent FROM moz_bookmarks WHERE "
"fk={} AND title IS NULL".format(fk)
)
bm_tag_ids = [tid for item in res.fetchall() for tid in item]
bookmark_tags = []
for bm_tag_id in bm_tag_ids:
res = cur.execute(
"SELECT title FROM moz_bookmarks WHERE id={}".format(bm_tag_id)
)
bookmark_tags.append(res.fetchone()[0])
# add folder name
folder: list = []
while parent_id:
res = cur.execute(
"SELECT title,parent FROM moz_bookmarks "
"WHERE id={}".format(parent_id)
)
parent = res.fetchone()
if parent:
title, parent_id = parent
if title:
folder.insert(0, title)
folder_name = "/".join(folder).lstrip("/")
# get the title
if not bm_title:
bm_title = ""
print(f'store.add({folder_name}, url={url}, title={bm_title}, tags={bookmark_tags})')
store.add(folder_name, url=url, title=bm_title, tags=bookmark_tags)
try:
cur.close()
conn.close()
except Exception:
logger.exception("Couldnt close FF db")
if __name__ == "__main__":
s = Store("~/.local/share/marxbook/bookmarks/")
load_firefox_database(s, sys.argv[1])

36
setup.py Normal file
View file

@ -0,0 +1,36 @@
import os
from setuptools import setup
def read(fname):
with open(os.path.join(os.path.dirname(__file__), fname)) as buf:
return buf.read()
setup(
name="marxbook",
version="0.0.1",
description="A flat-file bookmark manager",
long_description=read("README.md"),
long_description_content_type="text/markdown",
author="boyska",
author_email="piuttosto@logorroici.org",
license="AGPL",
packages=["marxbook"],
install_requires=[
"beautifulsoup4==4.7.1",
],
python_requires=">=3.5",
zip_safe=True,
include_package_data=False,
entry_points={
"console_scripts": [
"mxb=marxbook.cli:main",
],
},
classifiers=[
"License :: OSI Approved :: GNU Affero General Public License v3",
"Programming Language :: Python :: 3.5",
],
)