marxbook/marxbook/store.py

118 рядки
3,4 КіБ
Python

import hashlib
import os
import re
from pathlib import Path
import logging
logger = logging.getLogger()
def get_fname(url):
m = hashlib.md5()
m.update(url.encode("utf8"))
return m.hexdigest()
class Store:
def __init__(self, basedir: Path = None):
if basedir is None:
basedir = Path("~/.local/share/marxbook/bookmarks/").expanduser()
self.basedir = basedir
self.serializer = Serializer()
def add(self, url: str, title=None, tag=[], description=""):
dest = self.basedir
dest.mkdir(parents=True, exist_ok=True)
fname = get_fname(url)
fpath = dest / fname
content = self.serializer.encode(
dict(url=url, title=title, tags=tag, description=description)
)
with fpath.open("w") as buf:
buf.write(content)
def get(self, path: str):
fpath = self.basedir / path
with fpath.open() as buf:
return self.serializer.decode(buf.read())
def find(self, prefix_path: str) -> Path:
"""
prefix_path is a special form of contraction. Let's say prefix_path=utils/time/5e
If there is a single file starting with 5e inside utils/time, then that's found!
"""
if os.path.exists(prefix_path):
return prefix_path
candidates = list(self.basedir.glob(prefix_path + "*"))
if not candidates:
raise FileNotFoundError("%s not found" % prefix_path)
if len(candidates) > 1:
raise ValueError("Ambiguous prefix %s" % prefix_path)
return candidates[0]
def __iter__(self):
for urlfile in self.basedir.glob("**/*"):
if not urlfile.is_file():
continue
data = self.get(urlfile)
ret = {"Path": str(urlfile.relative_to(self.basedir))}
ret.update(data)
yield ret
def folder(self, folder: str):
return Store(self.basedir / folder)
def move(self, path: str, dest_dir: str):
dest = self.basedir / dest_dir
if dest.exists() and not dest.is_dir():
raise ValueError(
"destination '%s' already exists and is not a directory" % dest_dir
)
if not dest.exists():
dest.mkdir(parents=True, exist_ok=True)
fpath = self.basedir / path
fpath.rename(dest / fpath.name)
HEADER_LINE = re.compile(r"^([^:]+): (.*)$")
class Serializer:
def __init__(self):
pass
def encode(self, data: dict) -> str:
m = ""
tags = data.pop("tag", []) # those are special!
for key in data:
m += "%s: %s\n" % (key.title(), str(data[key]).replace("\n", " "))
for tag in tags:
m += "%s: %s\n" % ("Tag", tag)
return m
def decode(self, content: str) -> dict:
d: dict = {"Tag": []}
for num, line in enumerate(content.split("\n"), 1):
if not line.strip():
continue
m = HEADER_LINE.match(line)
if m is None:
logger.error("Invalid line %d" % num)
continue
key, value = m.groups()
key = key.title()
if key == "Tag":
d[key].append(value)
else:
d[key] = value
return d
if __name__ == "__main__":
import sys
s = Store()
# print(s.get(sys.argv[1]))
for line in s.list(sys.argv[1]):
print(line)