364 lines
12 KiB
Python
Executable file
364 lines
12 KiB
Python
Executable file
#!/usr/bin/python3
|
|
# coding=utf-8
|
|
"""Calippo - much less than a clipboard manager"""
|
|
|
|
# lot of code has been stolen from clipster. thanks!
|
|
|
|
|
|
# pylint: disable=line-too-long
|
|
|
|
import subprocess
|
|
import signal
|
|
import argparse
|
|
import json
|
|
import socket
|
|
import os
|
|
import errno
|
|
import sys
|
|
import logging
|
|
import tempfile
|
|
import re
|
|
import stat
|
|
from contextlib import closing
|
|
from gi import require_version
|
|
|
|
require_version("Gtk", "3.0")
|
|
from gi.repository import (
|
|
Gtk,
|
|
Gdk,
|
|
GLib,
|
|
GObject,
|
|
) # pylint:disable=wrong-import-position
|
|
|
|
try:
|
|
require_version("Wnck", "3.0")
|
|
from gi.repository import Wnck
|
|
except (ImportError, ValueError):
|
|
Wnck = None
|
|
|
|
if sys.version_info.major == 3:
|
|
# py 3.x
|
|
from configparser import ConfigParser as SafeConfigParser
|
|
else:
|
|
# py 2.x
|
|
from ConfigParser import SafeConfigParser # pylint:disable=import-error
|
|
|
|
# In python 2, ENOENT is sometimes IOError and sometimes OSError. Catch
|
|
# both by catching their immediate superclass exception EnvironmentError.
|
|
FileNotFoundError = EnvironmentError # pylint: disable=redefined-builtin
|
|
FileExistsError = ProcessLookupError = OSError # pylint: disable=redefined-builtin
|
|
|
|
|
|
class suppress_if_errno(object):
|
|
"""A context manager which suppresses exceptions with an errno attribute which matches the given value.
|
|
|
|
Allows things like:
|
|
|
|
try:
|
|
os.makedirs(dirs)
|
|
except OSError as exc:
|
|
if exc.errno != errno.EEXIST:
|
|
raise
|
|
|
|
to be expressed as:
|
|
|
|
with suppress_if_errno(OSError, errno.EEXIST):
|
|
os.makedirs(dir)
|
|
|
|
This is a fork of contextlib.suppress.
|
|
|
|
"""
|
|
|
|
def __init__(self, exceptions, exc_val):
|
|
self._exceptions = exceptions
|
|
self._exc_val = exc_val
|
|
|
|
def __enter__(self):
|
|
pass
|
|
|
|
def __exit__(self, exctype, excinst, exctb):
|
|
# Unlike isinstance and issubclass, CPython exception handling
|
|
# currently only looks at the concrete type hierarchy (ignoring
|
|
# the instance and subclass checking hooks). While Guido considers
|
|
# that a bug rather than a feature, it's a fairly hard one to fix
|
|
# due to various internal implementation details. suppress provides
|
|
# the simpler issubclass based semantics, rather than trying to
|
|
# exactly reproduce the limitations of the CPython interpreter.
|
|
#
|
|
# See http://bugs.python.org/issue12029 for more details
|
|
return (
|
|
exctype is not None
|
|
and issubclass(exctype, self._exceptions)
|
|
and excinst.errno == self._exc_val
|
|
)
|
|
|
|
|
|
class ClipsterError(Exception):
|
|
"""Errors specific to Clipster."""
|
|
|
|
def __init__(self, args="Clipster Error."):
|
|
Exception.__init__(self, args)
|
|
|
|
|
|
class Daemon(object):
|
|
"""Handles clipboard events, client requests, stores history."""
|
|
|
|
# pylint: disable=too-many-instance-attributes
|
|
|
|
def __init__(self, config, command):
|
|
"""Set up clipboard objects and history dict."""
|
|
|
|
self.config = config
|
|
self.patterns = []
|
|
self.ignore_patterns = []
|
|
self.window = self.p_id = self.c_id = self.sock = None
|
|
self.primary = Gtk.Clipboard.get(Gdk.SELECTION_PRIMARY)
|
|
self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
|
|
self.boards = {"PRIMARY": [], "CLIPBOARD": []}
|
|
self.pid_file = self.config.get("calippo", "pid_file")
|
|
self.client_msgs = {}
|
|
# Flag to indicate that the in-memory history should be flushed to disk
|
|
self.update_history_file = False
|
|
# Flag whether next clipboard change should be ignored
|
|
self.ignore_next = {"PRIMARY": False, "CLIPBOARD": False}
|
|
self.whitelist_classes = self.blacklist_classes = []
|
|
self.command = command
|
|
|
|
def exit(self):
|
|
"""Clean up things before exiting."""
|
|
|
|
logging.debug("Daemon exiting...")
|
|
try:
|
|
os.unlink(self.pid_file)
|
|
except FileNotFoundError:
|
|
logging.warning("Failed to remove pid file: %s", self.pid_file)
|
|
Gtk.main_quit()
|
|
|
|
def run(self):
|
|
"""Launch the clipboard manager daemon.
|
|
Listen for clipboard events & client socket connections."""
|
|
|
|
# Set up socket, pid file etc
|
|
self.prepare_files()
|
|
|
|
# We need to get the display instance from the window
|
|
# for use in obtaining mouse state.
|
|
# POPUP windows can do this without having to first show the window
|
|
self.window = Gtk.Window(type=Gtk.WindowType.POPUP)
|
|
|
|
# Handle clipboard changes
|
|
self.p_id = self.primary.connect("owner-change", self.owner_change)
|
|
self.c_id = self.clipboard.connect("owner-change", self.owner_change)
|
|
# Handle unix signals
|
|
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGINT, self.exit)
|
|
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGTERM, self.exit)
|
|
GLib.unix_signal_add(GLib.PRIORITY_HIGH, signal.SIGHUP, self.exit)
|
|
|
|
Gtk.main()
|
|
|
|
def prepare_files(self):
|
|
"""Ensure that all files and sockets used
|
|
by the daemon are available."""
|
|
|
|
# Create the calippo dir if necessary
|
|
with suppress_if_errno(FileExistsError, errno.EEXIST):
|
|
os.makedirs(self.config.get("calippo", "data_dir"))
|
|
|
|
# check for existing pid_file, and tidy up if appropriate
|
|
with suppress_if_errno(FileNotFoundError, errno.ENOENT):
|
|
with open(self.pid_file) as runf_r:
|
|
try:
|
|
pid = int(runf_r.read())
|
|
except ValueError:
|
|
logging.debug("Invalid pid file, attempting to overwrite.")
|
|
else:
|
|
# pid is an int, determine if this corresponds to a running daemon.
|
|
try:
|
|
# Do nothing, but raise an error if no such process
|
|
os.kill(pid, 0)
|
|
raise ClipsterError(
|
|
"Daemon already running: pid {}".format(pid)
|
|
)
|
|
except ProcessLookupError as exc:
|
|
if exc.errno != errno.ESRCH:
|
|
raise
|
|
# No process found, delete the pid file.
|
|
with suppress_if_errno(FileNotFoundError, errno.ENOENT):
|
|
os.unlink(self.pid_file)
|
|
|
|
def owner_change(self, board, event):
|
|
"""Handler for owner-change clipboard events."""
|
|
|
|
logging.debug("owner-change event!")
|
|
selection = str(event.selection)
|
|
logging.debug("selection: %s", selection)
|
|
active = self.config.get("calippo", "active_selections").split(",")
|
|
|
|
if selection not in active:
|
|
return
|
|
|
|
logging.debug("Selection in 'active_selections'")
|
|
event_id = selection == "PRIMARY" and self.p_id or self.c_id
|
|
# Some apps update primary during mouse drag (chrome)
|
|
# Block at start to prevent repeated triggering
|
|
board.handler_block(event_id)
|
|
display = self.window.get_display()
|
|
if selection == "PRIMARY":
|
|
while Gdk.ModifierType.BUTTON1_MASK & display.get_pointer().mask:
|
|
# Do nothing while mouse button is held down (selection drag)
|
|
pass
|
|
# Try to get text from clipboard
|
|
text = board.wait_for_text()
|
|
if text:
|
|
logging.debug("Selection is text.")
|
|
subprocess.Popen(self.command + [selection, text])
|
|
# self.update_history(selection, text)
|
|
# If no text received, either the selection was an empty string,
|
|
# or the board contains non-text content.
|
|
else:
|
|
# First item in tuple is bool, False if no targets
|
|
if board.wait_for_targets()[0]:
|
|
logging.debug("Selection is not text - ignoring.")
|
|
else:
|
|
logging.debug("Clipboard cleared or empty. Reinstating from history.")
|
|
if self.boards[selection]:
|
|
self.update_board(selection, self.boards[selection][-1])
|
|
else:
|
|
logging.debug("No history available, leaving clipboard empty.")
|
|
|
|
# Unblock event handling
|
|
board.handler_unblock(event_id)
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command-line arguments."""
|
|
|
|
parser = argparse.ArgumentParser(description="Clipster clipboard manager.")
|
|
parser.add_argument(
|
|
"-f", "--config", action="store", help="Path to config directory."
|
|
)
|
|
parser.add_argument(
|
|
"-l",
|
|
"--log_level",
|
|
action="store",
|
|
default="INFO",
|
|
help="Set log level: DEBUG, INFO (default), WARNING, ERROR, CRITICAL",
|
|
)
|
|
|
|
parser.add_argument("command", nargs="+")
|
|
# Mutually exclusive client and daemon options.
|
|
boardgrp = parser.add_mutually_exclusive_group()
|
|
boardgrp.add_argument(
|
|
"-p",
|
|
"--primary",
|
|
action="store_const",
|
|
const="PRIMARY",
|
|
help="Query, or write STDIN to, the PRIMARY clipboard.",
|
|
)
|
|
boardgrp.add_argument(
|
|
"-c",
|
|
"--clipboard",
|
|
action="store_const",
|
|
const="CLIPBOARD",
|
|
help="Query, or write STDIN to, the CLIPBOARD clipboard.",
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def parse_config(args, data_dir, conf_dir):
|
|
"""Configuration derived from defaults & file."""
|
|
|
|
# Set some config defaults
|
|
config_defaults = {
|
|
"data_dir": data_dir, # calippo 'root' dir (see history/socket config)
|
|
"conf_dir": conf_dir, # calippo config dir (see pattern/ignore_pattern file config). Can be overridden using -f cmd-line arg.
|
|
# change this to PRIMARY,CLIPBOARD to activate both
|
|
"active_selections": "CLIPBOARD", # Comma-separated list of selections to monitor/save
|
|
"pid_file": "/run/user/{}/calippo.pid".format(os.getuid()),
|
|
}
|
|
|
|
config = SafeConfigParser(config_defaults)
|
|
config.add_section("calippo")
|
|
|
|
# Try to read config file (either passed in, or default value)
|
|
if args.config:
|
|
config.set("calippo", "conf_dir", args.config)
|
|
conf_file = os.path.join(config.get("calippo", "conf_dir"), "calippo.ini")
|
|
logging.debug("Trying to read config file: %s", conf_file)
|
|
result = config.read(conf_file)
|
|
if not result:
|
|
logging.debug("Unable to read config file: %s", conf_file)
|
|
|
|
logging.debug("Merged config: %s", sorted(dict(config.items("calippo")).items()))
|
|
|
|
return config
|
|
|
|
|
|
def find_config():
|
|
"""Attempt to find config from xdg basedir-spec paths/environment variables."""
|
|
|
|
# Set a default directory for calippo files
|
|
# https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
|
|
xdg_config_dirs = os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")
|
|
xdg_config_dirs.insert(
|
|
0,
|
|
os.environ.get(
|
|
"XDG_CONFIG_HOME", os.path.join(os.environ.get("HOME"), ".config")
|
|
),
|
|
)
|
|
xdg_data_home = os.environ.get(
|
|
"XDG_DATA_HOME", os.path.join(os.environ.get("HOME"), ".local/share")
|
|
)
|
|
|
|
data_dir = os.path.join(xdg_data_home, "calippo")
|
|
# Keep trying to define conf_dir, moving from local -> global
|
|
for path in xdg_config_dirs:
|
|
conf_dir = os.path.join(path, "calippo")
|
|
if os.path.exists(conf_dir):
|
|
return conf_dir, data_dir
|
|
return "", data_dir
|
|
|
|
|
|
def main():
|
|
"""Start the application. Return an exit status (0 or 1)."""
|
|
|
|
# Find default config and data dirs
|
|
conf_dir, data_dir = find_config()
|
|
|
|
# parse command-line arguments
|
|
args = parse_args()
|
|
|
|
# Enable logging
|
|
logging.basicConfig(
|
|
format="%(levelname)s:%(message)s",
|
|
level=getattr(logging, args.log_level.upper()),
|
|
)
|
|
logging.debug("Debugging Enabled.")
|
|
|
|
config = parse_config(args, data_dir, conf_dir)
|
|
|
|
# Launch the daemon
|
|
Daemon(config, command=args.command).run()
|
|
|
|
|
|
def safe_decode(data):
|
|
"""Convenience method to ensure everything is utf-8."""
|
|
|
|
try:
|
|
data = data.decode("utf-8")
|
|
except (UnicodeDecodeError, UnicodeEncodeError, AttributeError):
|
|
pass
|
|
return data
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except ClipsterError as exc:
|
|
if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
|
|
raise
|
|
else:
|
|
# Only output the 'human-readable' part.
|
|
logging.error(exc)
|
|
sys.exit(1)
|