123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- #!/usr/bin/python3
- # coding=utf-8
- """Calippo - much less than a clipboard manager"""
- # lot of code has been stolen from clipster. thanks!
- # pylint: disable=line-too-long
- import subprocess
- import signal
- import argparse
- import json
- import socket
- import os
- import errno
- import sys
- import logging
- import tempfile
- import re
- import stat
- from contextlib import closing
- from gi import require_version
- require_version("Gtk", "3.0")
- from gi.repository import Gtk, Gdk, GLib, GObject # pylint:disable=wrong-import-position
- try:
- require_version("Wnck", "3.0")
- from gi.repository import Wnck
- except (ImportError, ValueError):
- Wnck = None
- if sys.version_info.major == 3:
- # py 3.x
- from configparser import ConfigParser as SafeConfigParser
- else:
- # py 2.x
- from ConfigParser import SafeConfigParser # pylint:disable=import-error
- # In python 2, ENOENT is sometimes IOError and sometimes OSError. Catch
- # both by catching their immediate superclass exception EnvironmentError.
- FileNotFoundError = EnvironmentError # pylint: disable=redefined-builtin
- FileExistsError = ProcessLookupError = OSError # pylint: disable=redefined-builtin
- class suppress_if_errno(object):
- """A context manager which suppresses exceptions with an errno attribute which matches the given value.
- Allows things like:
- try:
- os.makedirs(dirs)
- except OSError as exc:
- if exc.errno != errno.EEXIST:
- raise
- to be expressed as:
- with suppress_if_errno(OSError, errno.EEXIST):
- os.makedirs(dir)
- This is a fork of contextlib.suppress.
- """
- def __init__(self, exceptions, exc_val):
- self._exceptions = exceptions
- self._exc_val = exc_val
- def __enter__(self):
- pass
- def __exit__(self, exctype, excinst, exctb):
- # Unlike isinstance and issubclass, CPython exception handling
- # currently only looks at the concrete type hierarchy (ignoring
- # the instance and subclass checking hooks). While Guido considers
- # that a bug rather than a feature, it's a fairly hard one to fix
- # due to various internal implementation details. suppress provides
- # the simpler issubclass based semantics, rather than trying to
- # exactly reproduce the limitations of the CPython interpreter.
- #
- # See http://bugs.python.org/issue12029 for more details
- return exctype is not None and issubclass(exctype, self._exceptions) and excinst.errno == self._exc_val
- class ClipsterError(Exception):
- """Errors specific to Clipster."""
- def __init__(self, args="Clipster Error."):
- Exception.__init__(self, args)
- class Daemon(object):
- """Handles clipboard events, client requests, stores history."""
- # pylint: disable=too-many-instance-attributes
- def __init__(self, config, command):
- """Set up clipboard objects and history dict."""
- self.config = config
- self.patterns = []
- self.ignore_patterns = []
- self.window = self.p_id = self.c_id = self.sock = None
- self.primary = Gtk.Clipboard.get(Gdk.SELECTION_PRIMARY)
- self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
- self.boards = {"PRIMARY": [], "CLIPBOARD": []}
- self.pid_file = self.config.get('calippo', 'pid_file')
- self.client_msgs = {}
- # Flag to indicate that the in-memory history should be flushed to disk
- self.update_history_file = False
- # Flag whether next clipboard change should be ignored
- self.ignore_next = {'PRIMARY': False, 'CLIPBOARD': False}
- self.whitelist_classes = self.blacklist_classes = []
- self.command = command
- def exit(self):
- """Clean up things before exiting."""
- logging.debug("Daemon exiting...")
- try:
- os.unlink(self.pid_file)
- except FileNotFoundError:
- logging.warning("Failed to remove pid file: %s", self.pid_file)
- Gtk.main_quit()
- def run(self):
- """Launch the clipboard manager daemon.
- Listen for clipboard events & client socket connections."""
- # Set up socket, pid file etc
- self.prepare_files()
- # We need to get the display instance from the window
- # for use in obtaining mouse state.
- # POPUP windows can do this without having to first show the window
- self.window = Gtk.Window(type=Gtk.WindowType.POPUP)
- # Handle clipboard changes
- self.p_id = self.primary.connect('owner-change',
- self.owner_change)
- self.c_id = self.clipboard.connect('owner-change',
- self.owner_change)
- # Handle unix signals
- GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self.exit)
- GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self.exit)
- GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, self.exit)
- Gtk.main()
- def prepare_files(self):
- """Ensure that all files and sockets used
- by the daemon are available."""
- # Create the calippo dir if necessary
- with suppress_if_errno(FileExistsError, errno.EEXIST):
- os.makedirs(self.config.get('calippo', 'data_dir'))
- # check for existing pid_file, and tidy up if appropriate
- with suppress_if_errno(FileNotFoundError, errno.ENOENT):
- with open(self.pid_file) as runf_r:
- try:
- pid = int(runf_r.read())
- except ValueError:
- logging.debug("Invalid pid file, attempting to overwrite.")
- else:
- # pid is an int, determine if this corresponds to a running daemon.
- try:
- # Do nothing, but raise an error if no such process
- os.kill(pid, 0)
- raise ClipsterError("Daemon already running: pid {}".format(pid))
- except ProcessLookupError as exc:
- if exc.errno != errno.ESRCH:
- raise
- # No process found, delete the pid file.
- with suppress_if_errno(FileNotFoundError, errno.ENOENT):
- os.unlink(self.pid_file)
- def owner_change(self, board, event):
- """Handler for owner-change clipboard events."""
- logging.debug("owner-change event!")
- selection = str(event.selection)
- logging.debug("selection: %s", selection)
- active = self.config.get('calippo', 'active_selections').split(',')
- if selection not in active:
- return
- logging.debug("Selection in 'active_selections'")
- event_id = selection == "PRIMARY" and self.p_id or self.c_id
- # Some apps update primary during mouse drag (chrome)
- # Block at start to prevent repeated triggering
- board.handler_block(event_id)
- display = self.window.get_display()
- while Gdk.ModifierType.BUTTON1_MASK & display.get_pointer().mask:
- # Do nothing while mouse button is held down (selection drag)
- pass
- # Try to get text from clipboard
- text = board.wait_for_text()
- if text:
- logging.debug("Selection is text.")
- subprocess.Popen(self.command + [selection, text])
- # self.update_history(selection, text)
- # If no text received, either the selection was an empty string,
- # or the board contains non-text content.
- else:
- # First item in tuple is bool, False if no targets
- if board.wait_for_targets()[0]:
- logging.debug("Selection is not text - ignoring.")
- else:
- logging.debug("Clipboard cleared or empty. Reinstating from history.")
- if self.boards[selection]:
- self.update_board(selection, self.boards[selection][-1])
- else:
- logging.debug("No history available, leaving clipboard empty.")
- # Unblock event handling
- board.handler_unblock(event_id)
- def parse_args():
- """Parse command-line arguments."""
- parser = argparse.ArgumentParser(description='Clipster clipboard manager.')
- parser.add_argument('-f', '--config', action="store",
- help="Path to config directory.")
- parser.add_argument('-l', '--log_level', action="store", default="INFO",
- help="Set log level: DEBUG, INFO (default), WARNING, ERROR, CRITICAL")
- parser.add_argument("command", nargs='+')
- # Mutually exclusive client and daemon options.
- boardgrp = parser.add_mutually_exclusive_group()
- boardgrp.add_argument('-p', '--primary', action="store_const", const='PRIMARY',
- help="Query, or write STDIN to, the PRIMARY clipboard.")
- boardgrp.add_argument('-c', '--clipboard', action="store_const", const='CLIPBOARD',
- help="Query, or write STDIN to, the CLIPBOARD clipboard.")
- return parser.parse_args()
- def parse_config(args, data_dir, conf_dir):
- """Configuration derived from defaults & file."""
- # Set some config defaults
- config_defaults = {
- "data_dir": data_dir, # calippo 'root' dir (see history/socket config)
- "conf_dir": conf_dir, # calippo config dir (see pattern/ignore_pattern file config). Can be overridden using -f cmd-line arg.
- "default_selection": "PRIMARY", # PRIMARY or CLIPBOARD
- "active_selections": "PRIMARY,CLIPBOARD", # Comma-separated list of selections to monitor/save
- "pid_file": "/run/user/{}/calippo.pid".format(os.getuid()),
- "max_input": "50000", # max length of selection input
- "pattern_as_selection": "no", # Extracted pattern should replace current selection.
- } # Comma-separated list of WM_CLASS to identify apps from which to not ignore owner-change events
- config = SafeConfigParser(config_defaults)
- config.add_section('calippo')
- # Try to read config file (either passed in, or default value)
- if args.config:
- config.set('calippo', 'conf_dir', args.config)
- conf_file = os.path.join(config.get('calippo', 'conf_dir'), 'calippo.ini')
- logging.debug("Trying to read config file: %s", conf_file)
- result = config.read(conf_file)
- if not result:
- logging.debug("Unable to read config file: %s", conf_file)
- logging.debug("Merged config: %s",
- sorted(dict(config.items('calippo')).items()))
- return config
- def find_config():
- """Attempt to find config from xdg basedir-spec paths/environment variables."""
- # Set a default directory for calippo files
- # https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
- xdg_config_dirs = os.environ.get('XDG_CONFIG_DIRS', '/etc/xdg').split(':')
- xdg_config_dirs.insert(0, os.environ.get('XDG_CONFIG_HOME', os.path.join(os.environ.get('HOME'), ".config")))
- xdg_data_home = os.environ.get('XDG_DATA_HOME', os.path.join(os.environ.get('HOME'), ".local/share"))
- data_dir = os.path.join(xdg_data_home, "calippo")
- # Keep trying to define conf_dir, moving from local -> global
- for path in xdg_config_dirs:
- conf_dir = os.path.join(path, 'calippo')
- if os.path.exists(conf_dir):
- return conf_dir, data_dir
- return "", data_dir
- def main():
- """Start the application. Return an exit status (0 or 1)."""
- # Find default config and data dirs
- conf_dir, data_dir = find_config()
- # parse command-line arguments
- args = parse_args()
- # Enable logging
- logging.basicConfig(format='%(levelname)s:%(message)s',
- level=getattr(logging, args.log_level.upper()))
- logging.debug("Debugging Enabled.")
- config = parse_config(args, data_dir, conf_dir)
- # Launch the daemon
- Daemon(config, command=args.command).run()
- def safe_decode(data):
- """Convenience method to ensure everything is utf-8."""
- try:
- data = data.decode('utf-8')
- except (UnicodeDecodeError, UnicodeEncodeError, AttributeError):
- pass
- return data
- if __name__ == "__main__":
- try:
- main()
- except ClipsterError as exc:
- if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
- raise
- else:
- # Only output the 'human-readable' part.
- logging.error(exc)
- sys.exit(1)
|