calippo/calippo.py

364 lines
12 KiB
Python
Executable file

#!/usr/bin/python3
# coding=utf-8
"""Calippo - much less than a clipboard manager"""
# lot of code has been stolen from clipster. thanks!
# pylint: disable=line-too-long
import subprocess
import signal
import argparse
import json
import socket
import os
import errno
import sys
import logging
import tempfile
import re
import stat
from contextlib import closing
from gi import require_version
require_version("Gtk", "3.0")
from gi.repository import (
Gtk,
Gdk,
GLib,
GObject,
) # pylint:disable=wrong-import-position
try:
require_version("Wnck", "3.0")
from gi.repository import Wnck
except (ImportError, ValueError):
Wnck = None
if sys.version_info.major == 3:
# py 3.x
from configparser import ConfigParser as SafeConfigParser
else:
# py 2.x
from ConfigParser import SafeConfigParser # pylint:disable=import-error
# In python 2, ENOENT is sometimes IOError and sometimes OSError. Catch
# both by catching their immediate superclass exception EnvironmentError.
FileNotFoundError = EnvironmentError # pylint: disable=redefined-builtin
FileExistsError = ProcessLookupError = OSError # pylint: disable=redefined-builtin
class suppress_if_errno(object):
"""A context manager which suppresses exceptions with an errno attribute which matches the given value.
Allows things like:
try:
os.makedirs(dirs)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
to be expressed as:
with suppress_if_errno(OSError, errno.EEXIST):
os.makedirs(dir)
This is a fork of contextlib.suppress.
"""
def __init__(self, exceptions, exc_val):
self._exceptions = exceptions
self._exc_val = exc_val
def __enter__(self):
pass
def __exit__(self, exctype, excinst, exctb):
# Unlike isinstance and issubclass, CPython exception handling
# currently only looks at the concrete type hierarchy (ignoring
# the instance and subclass checking hooks). While Guido considers
# that a bug rather than a feature, it's a fairly hard one to fix
# due to various internal implementation details. suppress provides
# the simpler issubclass based semantics, rather than trying to
# exactly reproduce the limitations of the CPython interpreter.
#
# See http://bugs.python.org/issue12029 for more details
return (
exctype is not None
and issubclass(exctype, self._exceptions)
and excinst.errno == self._exc_val
)
class ClipsterError(Exception):
"""Errors specific to Clipster."""
def __init__(self, args="Clipster Error."):
Exception.__init__(self, args)
class Daemon(object):
"""Handles clipboard events, client requests, stores history."""
# pylint: disable=too-many-instance-attributes
def __init__(self, config, command):
"""Set up clipboard objects and history dict."""
self.config = config
self.patterns = []
self.ignore_patterns = []
self.window = self.p_id = self.c_id = self.sock = None
self.primary = Gtk.Clipboard.get(Gdk.SELECTION_PRIMARY)
self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
self.boards = {"PRIMARY": [], "CLIPBOARD": []}
self.pid_file = self.config.get("calippo", "pid_file")
self.client_msgs = {}
# Flag to indicate that the in-memory history should be flushed to disk
self.update_history_file = False
# Flag whether next clipboard change should be ignored
self.ignore_next = {"PRIMARY": False, "CLIPBOARD": False}
self.whitelist_classes = self.blacklist_classes = []
self.command = command
def exit(self):
"""Clean up things before exiting."""
logging.debug("Daemon exiting...")
try:
os.unlink(self.pid_file)
except FileNotFoundError:
logging.warning("Failed to remove pid file: %s", self.pid_file)
Gtk.main_quit()
def run(self):
"""Launch the clipboard manager daemon.
Listen for clipboard events & client socket connections."""
# Set up socket, pid file etc
self.prepare_files()
# We need to get the display instance from the window
# for use in obtaining mouse state.
# POPUP windows can do this without having to first show the window
self.window = Gtk.Window(type=Gtk.WindowType.POPUP)
# Handle clipboard changes
self.p_id = self.primary.connect("owner-change", self.owner_change)
self.c_id = self.clipboard.connect("owner-change", self.owner_change)
# Handle unix signals
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self.exit)
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self.exit)
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, self.exit)
Gtk.main()
def prepare_files(self):
"""Ensure that all files and sockets used
by the daemon are available."""
# Create the calippo dir if necessary
with suppress_if_errno(FileExistsError, errno.EEXIST):
os.makedirs(self.config.get("calippo", "data_dir"))
# check for existing pid_file, and tidy up if appropriate
with suppress_if_errno(FileNotFoundError, errno.ENOENT):
with open(self.pid_file) as runf_r:
try:
pid = int(runf_r.read())
except ValueError:
logging.debug("Invalid pid file, attempting to overwrite.")
else:
# pid is an int, determine if this corresponds to a running daemon.
try:
# Do nothing, but raise an error if no such process
os.kill(pid, 0)
raise ClipsterError(
"Daemon already running: pid {}".format(pid)
)
except ProcessLookupError as exc:
if exc.errno != errno.ESRCH:
raise
# No process found, delete the pid file.
with suppress_if_errno(FileNotFoundError, errno.ENOENT):
os.unlink(self.pid_file)
def owner_change(self, board, event):
"""Handler for owner-change clipboard events."""
logging.debug("owner-change event!")
selection = str(event.selection)
logging.debug("selection: %s", selection)
active = self.config.get("calippo", "active_selections").split(",")
if selection not in active:
return
logging.debug("Selection in 'active_selections'")
event_id = selection == "PRIMARY" and self.p_id or self.c_id
# Some apps update primary during mouse drag (chrome)
# Block at start to prevent repeated triggering
board.handler_block(event_id)
display = self.window.get_display()
if selection == "PRIMARY":
while Gdk.ModifierType.BUTTON1_MASK & display.get_pointer().mask:
# Do nothing while mouse button is held down (selection drag)
pass
# Try to get text from clipboard
text = board.wait_for_text()
if text:
logging.debug("Selection is text.")
subprocess.Popen(self.command + [selection, text])
# self.update_history(selection, text)
# If no text received, either the selection was an empty string,
# or the board contains non-text content.
else:
# First item in tuple is bool, False if no targets
if board.wait_for_targets()[0]:
logging.debug("Selection is not text - ignoring.")
else:
logging.debug("Clipboard cleared or empty. Reinstating from history.")
if self.boards[selection]:
self.update_board(selection, self.boards[selection][-1])
else:
logging.debug("No history available, leaving clipboard empty.")
# Unblock event handling
board.handler_unblock(event_id)
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Clipster clipboard manager.")
parser.add_argument(
"-f", "--config", action="store", help="Path to config directory."
)
parser.add_argument(
"-l",
"--log_level",
action="store",
default="INFO",
help="Set log level: DEBUG, INFO (default), WARNING, ERROR, CRITICAL",
)
parser.add_argument("command", nargs="+")
# Mutually exclusive client and daemon options.
boardgrp = parser.add_mutually_exclusive_group()
boardgrp.add_argument(
"-p",
"--primary",
action="store_const",
const="PRIMARY",
help="Query, or write STDIN to, the PRIMARY clipboard.",
)
boardgrp.add_argument(
"-c",
"--clipboard",
action="store_const",
const="CLIPBOARD",
help="Query, or write STDIN to, the CLIPBOARD clipboard.",
)
return parser.parse_args()
def parse_config(args, data_dir, conf_dir):
"""Configuration derived from defaults & file."""
# Set some config defaults
config_defaults = {
"data_dir": data_dir, # calippo 'root' dir (see history/socket config)
"conf_dir": conf_dir, # calippo config dir (see pattern/ignore_pattern file config). Can be overridden using -f cmd-line arg.
# change this to PRIMARY,CLIPBOARD to activate both
"active_selections": "CLIPBOARD", # Comma-separated list of selections to monitor/save
"pid_file": "/run/user/{}/calippo.pid".format(os.getuid()),
}
config = SafeConfigParser(config_defaults)
config.add_section("calippo")
# Try to read config file (either passed in, or default value)
if args.config:
config.set("calippo", "conf_dir", args.config)
conf_file = os.path.join(config.get("calippo", "conf_dir"), "calippo.ini")
logging.debug("Trying to read config file: %s", conf_file)
result = config.read(conf_file)
if not result:
logging.debug("Unable to read config file: %s", conf_file)
logging.debug("Merged config: %s", sorted(dict(config.items("calippo")).items()))
return config
def find_config():
"""Attempt to find config from xdg basedir-spec paths/environment variables."""
# Set a default directory for calippo files
# https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
xdg_config_dirs = os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")
xdg_config_dirs.insert(
0,
os.environ.get(
"XDG_CONFIG_HOME", os.path.join(os.environ.get("HOME"), ".config")
),
)
xdg_data_home = os.environ.get(
"XDG_DATA_HOME", os.path.join(os.environ.get("HOME"), ".local/share")
)
data_dir = os.path.join(xdg_data_home, "calippo")
# Keep trying to define conf_dir, moving from local -> global
for path in xdg_config_dirs:
conf_dir = os.path.join(path, "calippo")
if os.path.exists(conf_dir):
return conf_dir, data_dir
return "", data_dir
def main():
"""Start the application. Return an exit status (0 or 1)."""
# Find default config and data dirs
conf_dir, data_dir = find_config()
# parse command-line arguments
args = parse_args()
# Enable logging
logging.basicConfig(
format="%(levelname)s:%(message)s",
level=getattr(logging, args.log_level.upper()),
)
logging.debug("Debugging Enabled.")
config = parse_config(args, data_dir, conf_dir)
# Launch the daemon
Daemon(config, command=args.command).run()
def safe_decode(data):
"""Convenience method to ensure everything is utf-8."""
try:
data = data.decode("utf-8")
except (UnicodeDecodeError, UnicodeEncodeError, AttributeError):
pass
return data
if __name__ == "__main__":
try:
main()
except ClipsterError as exc:
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
raise
else:
# Only output the 'human-readable' part.
logging.error(exc)
sys.exit(1)