123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- #!/usr/bin/python3
- # coding=utf-8
- """Calippo - much less than a clipboard manager"""
- # lot of code has been stolen from clipster. thanks!
- # pylint: disable=line-too-long
- import subprocess
- import signal
- import argparse
- import json
- import socket
- import os
- import errno
- import sys
- import logging
- import tempfile
- import re
- import stat
- from contextlib import closing
- from gi import require_version
- require_version("Gtk", "3.0")
- from gi.repository import (
- Gtk,
- Gdk,
- GLib,
- GObject,
- ) # pylint:disable=wrong-import-position
- try:
- require_version("Wnck", "3.0")
- from gi.repository import Wnck
- except (ImportError, ValueError):
- Wnck = None
- if sys.version_info.major == 3:
- # py 3.x
- from configparser import ConfigParser as SafeConfigParser
- else:
- # py 2.x
- from ConfigParser import SafeConfigParser # pylint:disable=import-error
- # In python 2, ENOENT is sometimes IOError and sometimes OSError. Catch
- # both by catching their immediate superclass exception EnvironmentError.
- FileNotFoundError = EnvironmentError # pylint: disable=redefined-builtin
- FileExistsError = ProcessLookupError = OSError # pylint: disable=redefined-builtin
- class suppress_if_errno(object):
- """A context manager which suppresses exceptions with an errno attribute which matches the given value.
- Allows things like:
- try:
- os.makedirs(dirs)
- except OSError as exc:
- if exc.errno != errno.EEXIST:
- raise
- to be expressed as:
- with suppress_if_errno(OSError, errno.EEXIST):
- os.makedirs(dir)
- This is a fork of contextlib.suppress.
- """
- def __init__(self, exceptions, exc_val):
- self._exceptions = exceptions
- self._exc_val = exc_val
- def __enter__(self):
- pass
- def __exit__(self, exctype, excinst, exctb):
- # Unlike isinstance and issubclass, CPython exception handling
- # currently only looks at the concrete type hierarchy (ignoring
- # the instance and subclass checking hooks). While Guido considers
- # that a bug rather than a feature, it's a fairly hard one to fix
- # due to various internal implementation details. suppress provides
- # the simpler issubclass based semantics, rather than trying to
- # exactly reproduce the limitations of the CPython interpreter.
- #
- # See http://bugs.python.org/issue12029 for more details
- return (
- exctype is not None
- and issubclass(exctype, self._exceptions)
- and excinst.errno == self._exc_val
- )
- class ClipsterError(Exception):
- """Errors specific to Clipster."""
- def __init__(self, args="Clipster Error."):
- Exception.__init__(self, args)
- class Daemon(object):
- """Handles clipboard events, client requests, stores history."""
- # pylint: disable=too-many-instance-attributes
- def __init__(self, config, command):
- """Set up clipboard objects and history dict."""
- self.config = config
- self.patterns = []
- self.ignore_patterns = []
- self.window = self.p_id = self.c_id = self.sock = None
- self.primary = Gtk.Clipboard.get(Gdk.SELECTION_PRIMARY)
- self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
- self.boards = {"PRIMARY": [], "CLIPBOARD": []}
- self.pid_file = self.config.get("calippo", "pid_file")
- self.client_msgs = {}
- # Flag to indicate that the in-memory history should be flushed to disk
- self.update_history_file = False
- # Flag whether next clipboard change should be ignored
- self.ignore_next = {"PRIMARY": False, "CLIPBOARD": False}
- self.whitelist_classes = self.blacklist_classes = []
- self.command = command
- def exit(self):
- """Clean up things before exiting."""
- logging.debug("Daemon exiting...")
- try:
- os.unlink(self.pid_file)
- except FileNotFoundError:
- logging.warning("Failed to remove pid file: %s", self.pid_file)
- Gtk.main_quit()
- def run(self):
- """Launch the clipboard manager daemon.
- Listen for clipboard events & client socket connections."""
- # Set up socket, pid file etc
- self.prepare_files()
- # We need to get the display instance from the window
- # for use in obtaining mouse state.
- # POPUP windows can do this without having to first show the window
- self.window = Gtk.Window(type=Gtk.WindowType.POPUP)
- # Handle clipboard changes
- self.p_id = self.primary.connect("owner-change", self.owner_change)
- self.c_id = self.clipboard.connect("owner-change", self.owner_change)
- # Handle unix signals
- GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self.exit)
- GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self.exit)
- GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, self.exit)
- Gtk.main()
- def prepare_files(self):
- """Ensure that all files and sockets used
- by the daemon are available."""
- # Create the calippo dir if necessary
- with suppress_if_errno(FileExistsError, errno.EEXIST):
- os.makedirs(self.config.get("calippo", "data_dir"))
- # check for existing pid_file, and tidy up if appropriate
- with suppress_if_errno(FileNotFoundError, errno.ENOENT):
- with open(self.pid_file) as runf_r:
- try:
- pid = int(runf_r.read())
- except ValueError:
- logging.debug("Invalid pid file, attempting to overwrite.")
- else:
- # pid is an int, determine if this corresponds to a running daemon.
- try:
- # Do nothing, but raise an error if no such process
- os.kill(pid, 0)
- raise ClipsterError(
- "Daemon already running: pid {}".format(pid)
- )
- except ProcessLookupError as exc:
- if exc.errno != errno.ESRCH:
- raise
- # No process found, delete the pid file.
- with suppress_if_errno(FileNotFoundError, errno.ENOENT):
- os.unlink(self.pid_file)
- def owner_change(self, board, event):
- """Handler for owner-change clipboard events."""
- logging.debug("owner-change event!")
- selection = str(event.selection)
- logging.debug("selection: %s", selection)
- active = self.config.get("calippo", "active_selections").split(",")
- if selection not in active:
- return
- logging.debug("Selection in 'active_selections'")
- event_id = selection == "PRIMARY" and self.p_id or self.c_id
- # Some apps update primary during mouse drag (chrome)
- # Block at start to prevent repeated triggering
- board.handler_block(event_id)
- display = self.window.get_display()
- if selection == "PRIMARY":
- while Gdk.ModifierType.BUTTON1_MASK & display.get_pointer().mask:
- # Do nothing while mouse button is held down (selection drag)
- pass
- # Try to get text from clipboard
- text = board.wait_for_text()
- if text:
- logging.debug("Selection is text.")
- subprocess.Popen(self.command + [selection, text])
- # self.update_history(selection, text)
- # If no text received, either the selection was an empty string,
- # or the board contains non-text content.
- else:
- # First item in tuple is bool, False if no targets
- if board.wait_for_targets()[0]:
- logging.debug("Selection is not text - ignoring.")
- else:
- logging.debug("Clipboard cleared or empty. Reinstating from history.")
- if self.boards[selection]:
- self.update_board(selection, self.boards[selection][-1])
- else:
- logging.debug("No history available, leaving clipboard empty.")
- # Unblock event handling
- board.handler_unblock(event_id)
- def parse_args():
- """Parse command-line arguments."""
- parser = argparse.ArgumentParser(description="Clipster clipboard manager.")
- parser.add_argument(
- "-f", "--config", action="store", help="Path to config directory."
- )
- parser.add_argument(
- "-l",
- "--log_level",
- action="store",
- default="INFO",
- help="Set log level: DEBUG, INFO (default), WARNING, ERROR, CRITICAL",
- )
- parser.add_argument("command", nargs="+")
- # Mutually exclusive client and daemon options.
- boardgrp = parser.add_mutually_exclusive_group()
- boardgrp.add_argument(
- "-p",
- "--primary",
- action="store_const",
- const="PRIMARY",
- help="Query, or write STDIN to, the PRIMARY clipboard.",
- )
- boardgrp.add_argument(
- "-c",
- "--clipboard",
- action="store_const",
- const="CLIPBOARD",
- help="Query, or write STDIN to, the CLIPBOARD clipboard.",
- )
- return parser.parse_args()
- def parse_config(args, data_dir, conf_dir):
- """Configuration derived from defaults & file."""
- # Set some config defaults
- config_defaults = {
- "data_dir": data_dir, # calippo 'root' dir (see history/socket config)
- "conf_dir": conf_dir, # calippo config dir (see pattern/ignore_pattern file config). Can be overridden using -f cmd-line arg.
- # change this to PRIMARY,CLIPBOARD to activate both
- "active_selections": "CLIPBOARD", # Comma-separated list of selections to monitor/save
- "pid_file": "/run/user/{}/calippo.pid".format(os.getuid()),
- }
- config = SafeConfigParser(config_defaults)
- config.add_section("calippo")
- # Try to read config file (either passed in, or default value)
- if args.config:
- config.set("calippo", "conf_dir", args.config)
- conf_file = os.path.join(config.get("calippo", "conf_dir"), "calippo.ini")
- logging.debug("Trying to read config file: %s", conf_file)
- result = config.read(conf_file)
- if not result:
- logging.debug("Unable to read config file: %s", conf_file)
- logging.debug("Merged config: %s", sorted(dict(config.items("calippo")).items()))
- return config
- def find_config():
- """Attempt to find config from xdg basedir-spec paths/environment variables."""
- # Set a default directory for calippo files
- # https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
- xdg_config_dirs = os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")
- xdg_config_dirs.insert(
- 0,
- os.environ.get(
- "XDG_CONFIG_HOME", os.path.join(os.environ.get("HOME"), ".config")
- ),
- )
- xdg_data_home = os.environ.get(
- "XDG_DATA_HOME", os.path.join(os.environ.get("HOME"), ".local/share")
- )
- data_dir = os.path.join(xdg_data_home, "calippo")
- # Keep trying to define conf_dir, moving from local -> global
- for path in xdg_config_dirs:
- conf_dir = os.path.join(path, "calippo")
- if os.path.exists(conf_dir):
- return conf_dir, data_dir
- return "", data_dir
- def main():
- """Start the application. Return an exit status (0 or 1)."""
- # Find default config and data dirs
- conf_dir, data_dir = find_config()
- # parse command-line arguments
- args = parse_args()
- # Enable logging
- logging.basicConfig(
- format="%(levelname)s:%(message)s",
- level=getattr(logging, args.log_level.upper()),
- )
- logging.debug("Debugging Enabled.")
- config = parse_config(args, data_dir, conf_dir)
- # Launch the daemon
- Daemon(config, command=args.command).run()
- def safe_decode(data):
- """Convenience method to ensure everything is utf-8."""
- try:
- data = data.decode("utf-8")
- except (UnicodeDecodeError, UnicodeEncodeError, AttributeError):
- pass
- return data
- if __name__ == "__main__":
- try:
- main()
- except ClipsterError as exc:
- if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
- raise
- else:
- # Only output the 'human-readable' part.
- logging.error(exc)
- sys.exit(1)
|