118 рядки
3,4 КіБ
Python
118 рядки
3,4 КіБ
Python
import hashlib
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
def get_fname(url):
|
|
m = hashlib.md5()
|
|
m.update(url.encode("utf8"))
|
|
return m.hexdigest()
|
|
|
|
|
|
class Store:
|
|
def __init__(self, basedir: Path = None):
|
|
if basedir is None:
|
|
basedir = Path("~/.local/share/marxbook/bookmarks/").expanduser()
|
|
self.basedir = basedir
|
|
self.serializer = Serializer()
|
|
|
|
def add(self, url: str, title=None, tag=[], description=""):
|
|
dest = self.basedir
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
fname = get_fname(url)
|
|
fpath = dest / fname
|
|
content = self.serializer.encode(
|
|
dict(url=url, title=title, tags=tag, description=description)
|
|
)
|
|
with fpath.open("w") as buf:
|
|
buf.write(content)
|
|
|
|
def get(self, path: str):
|
|
fpath = self.basedir / path
|
|
with fpath.open() as buf:
|
|
return self.serializer.decode(buf.read())
|
|
|
|
def find(self, prefix_path: str) -> Path:
|
|
"""
|
|
prefix_path is a special form of contraction. Let's say prefix_path=utils/time/5e
|
|
If there is a single file starting with 5e inside utils/time, then that's found!
|
|
"""
|
|
if os.path.exists(prefix_path):
|
|
return prefix_path
|
|
|
|
candidates = list(self.basedir.glob(prefix_path + "*"))
|
|
if not candidates:
|
|
raise FileNotFoundError("%s not found" % prefix_path)
|
|
if len(candidates) > 1:
|
|
raise ValueError("Ambiguous prefix %s" % prefix_path)
|
|
return candidates[0]
|
|
|
|
def __iter__(self):
|
|
for urlfile in self.basedir.glob("**/*"):
|
|
if not urlfile.is_file():
|
|
continue
|
|
data = self.get(urlfile)
|
|
ret = {"Path": str(urlfile.relative_to(self.basedir))}
|
|
ret.update(data)
|
|
yield ret
|
|
|
|
def folder(self, folder: str):
|
|
return Store(self.basedir / folder)
|
|
|
|
def move(self, path: str, dest_dir: str):
|
|
dest = self.basedir / dest_dir
|
|
if dest.exists() and not dest.is_dir():
|
|
raise ValueError(
|
|
"destination '%s' already exists and is not a directory" % dest_dir
|
|
)
|
|
if not dest.exists():
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
fpath = self.basedir / path
|
|
fpath.rename(dest / fpath.name)
|
|
|
|
|
|
HEADER_LINE = re.compile(r"^([^:]+): (.*)$")
|
|
|
|
|
|
class Serializer:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def encode(self, data: dict) -> str:
|
|
m = ""
|
|
tags = data.pop("tag", []) # those are special!
|
|
for key in data:
|
|
m += "%s: %s\n" % (key.title(), str(data[key]).replace("\n", " "))
|
|
for tag in tags:
|
|
m += "%s: %s\n" % ("Tag", tag)
|
|
return m
|
|
|
|
def decode(self, content: str) -> dict:
|
|
d: dict = {"Tag": []}
|
|
for num, line in enumerate(content.split("\n"), 1):
|
|
if not line.strip():
|
|
continue
|
|
m = HEADER_LINE.match(line)
|
|
if m is None:
|
|
logger.error("Invalid line %d" % num)
|
|
continue
|
|
key, value = m.groups()
|
|
key = key.title()
|
|
if key == "Tag":
|
|
d[key].append(value)
|
|
else:
|
|
d[key] = value
|
|
return d
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
s = Store()
|
|
# print(s.get(sys.argv[1]))
|
|
for line in s.list(sys.argv[1]):
|
|
print(line)
|