123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import hashlib
- import os
- import re
- from pathlib import Path
- import logging
- logger = logging.getLogger()
- def get_fname(url):
- m = hashlib.md5()
- m.update(url.encode("utf8"))
- return m.hexdigest()
- class Store:
- def __init__(self, basedir: Path = None):
- if basedir is None:
- basedir = Path("~/.local/share/marxbook/bookmarks/").expanduser()
- self.basedir = basedir
- self.serializer = Serializer()
- def add(self, url: str, title=None, tag=[], description=""):
- dest = self.basedir
- dest.mkdir(parents=True, exist_ok=True)
- fname = get_fname(url)
- fpath = dest / fname
- content = self.serializer.encode(
- dict(url=url, title=title, tags=tag, description=description)
- )
- with fpath.open("w") as buf:
- buf.write(content)
- def get(self, path: str):
- fpath = self.basedir / path
- with fpath.open() as buf:
- return self.serializer.decode(buf.read())
- def find(self, prefix_path: str) -> Path:
- """
- prefix_path is a special form of contraction. Let's say prefix_path=utils/time/5e
- If there is a single file starting with 5e inside utils/time, then that's found!
- """
- if os.path.exists(prefix_path):
- return prefix_path
- candidates = list(self.basedir.glob(prefix_path + "*"))
- if not candidates:
- raise FileNotFoundError("%s not found" % prefix_path)
- if len(candidates) > 1:
- raise ValueError("Ambiguous prefix %s" % prefix_path)
- return candidates[0]
- def __iter__(self):
- for urlfile in self.basedir.glob("**/*"):
- if not urlfile.is_file():
- continue
- data = self.get(urlfile)
- ret = {"Path": str(urlfile.relative_to(self.basedir))}
- ret.update(data)
- yield ret
- def folder(self, folder: str):
- return Store(self.basedir / folder)
- def move(self, path: str, dest_dir: str):
- dest = self.basedir / dest_dir
- if dest.exists() and not dest.is_dir():
- raise ValueError(
- "destination '%s' already exists and is not a directory" % dest_dir
- )
- if not dest.exists():
- dest.mkdir(parents=True, exist_ok=True)
- fpath = self.basedir / path
- fpath.rename(dest / fpath.name)
- HEADER_LINE = re.compile(r"^([^:]+): (.*)$")
- class Serializer:
- def __init__(self):
- pass
- def encode(self, data: dict) -> str:
- m = ""
- tags = data.pop("tag", []) # those are special!
- for key in data:
- m += "%s: %s\n" % (key.title(), str(data[key]).replace("\n", " "))
- for tag in tags:
- m += "%s: %s\n" % ("Tag", tag)
- return m
- def decode(self, content: str) -> dict:
- d: dict = {"Tag": []}
- for num, line in enumerate(content.split("\n"), 1):
- if not line.strip():
- continue
- m = HEADER_LINE.match(line)
- if m is None:
- logger.error("Invalid line %d" % num)
- continue
- key, value = m.groups()
- key = key.title()
- if key == "Tag":
- d[key].append(value)
- else:
- d[key] = value
- return d
- if __name__ == "__main__":
- import sys
- s = Store()
- # print(s.get(sys.argv[1]))
- for line in s.list(sys.argv[1]):
- print(line)
|