diff options
author | Tom Ryder <tom@sanctum.geek.nz> | 2023-07-12 09:43:25 +1200 |
---|---|---|
committer | Tom Ryder <tom@sanctum.geek.nz> | 2023-07-12 12:39:54 +1200 |
commit | 7725eb0fe50020e404499ebfd9cad6a0b4fd0c47 (patch) | |
tree | 1a7b169adedd952f936310a9f5c3a250e2aa5f30 | |
parent | Commit the first version of this I wrote (diff) | |
download | ssh_negotiate_term-7725eb0fe50020e404499ebfd9cad6a0b4fd0c47.tar.gz ssh_negotiate_term-7725eb0fe50020e404499ebfd9cad6a0b4fd0c47.zip |
Commit rewrite
-rwxr-xr-x | ssh_negotiate_term | 200 |
1 files changed, 87 insertions, 113 deletions
diff --git a/ssh_negotiate_term b/ssh_negotiate_term index cedbd6e..31cb869 100755 --- a/ssh_negotiate_term +++ b/ssh_negotiate_term @@ -1,10 +1,9 @@ #!/usr/bin/python3 """ Wrap ssh(1) commands, parse the options, and check whether the hostname -argument is either an IPv4/IPv6 address or matches the hostname pattern for -network equipment rather than servers. If so, and we're using a TERM type -that's not well-supported on network gear, send a different TERM setting with -a more broadly-compatible TERM type. +argument is either an IPv4/IPv6 address or matches any of a set of configured +hostname patterns. If so, and we're using a TERM type that's not +well-supported on network gear, set a more broadly-compatible TERM type. """ import argparse import configparser @@ -14,138 +13,113 @@ import re import sys -class SSHArgumentParserError(Exception): - """ - Exception for SSHArgumentParser to throw on error. - """ - - -class SSHArgumentParser(argparse.ArgumentParser): - """ - Subclass argparse.ArgumentParser just so that errors raise exceptions - rather than exiting. - """ - - def error(self, message): - raise SSHArgumentParserError(message) - - class SSHNegotiateTerm(): """ Using a class, to encapsulate a fair bit of system state that gets injected into this. """ - def __init__(self, _os=os, _sys=sys, config_path=None): - self._os = _os - self._sys = _sys - - config = configparser.ConfigParser() - config['patterns'] = {} - config['ssh'] = {} - config['ssh']['path'] = '/usr/bin/ssh' - config['translations'] = {} - if not config_path: - config_path = self._os.path.expanduser( - '~/.config/ssh_negotiate_term/config') - config.read(self._os.path.expanduser(config_path)) + # Configuration file path; this gets expanded with os.path.expanduser() + CONFIG_PATH = '~/.config/ssh_negotiate_term/config' + + # Defaults to set to which a config file replaces/appends + DEFAULT_PATTERNS = {} + DEFAULT_SSH = '/usr/bin/ssh' + DEFAULT_TRANSLATIONS = { + 'putty-256color': 'xterm', + 'screen-256color': 'screen', + 'tmux': 'screen', + 'rxvt-256color': 'rxvt', + 'tmux-256color': 'screen', + 'xterm-256color': 'xterm', + } + + def __init__(self, term, argv, config=None): + + # Read config file if we weren't passed an already-complete config + if not config: + config = configparser.ConfigParser() + config['patterns'] = self.DEFAULT_PATTERNS + config['ssh'] = {} + config['ssh']['path'] = self.DEFAULT_SSH + config['translations'] = self.DEFAULT_TRANSLATIONS + config.read(os.path.expanduser(self.CONFIG_PATH)) self.config = config - def run(self): - """ - Build argument vector for the real SSH command from what we were passed; if - there's a compatible TERM string we should use for this call, add it with an - ssh(1) option -o string. - """ - args = [self.config['ssh']['path']] - term = self.get_compatible_term() - if term: - args.extend(['-o', f'SetEnv=TERM={term}']) - args.extend(self._sys.argv[1:]) - self._os.execv(self.config['ssh']['path'], args) - - def get_hostname(self): + # Replace the first argument (this script) with the real SSH + ssh = self.config['ssh']['path'] + self.argv = [ssh] + argv[1:] + + # Translate the terminal if a downgrade is required, otherwise take it + # as provided + if self.downgrade_required(term): + self.term = self.config['translations'][term] + else: + self.term = term + + # Flags from `man ssh` so we can attempt to parse the command line + # OpenSSH_9.2p1 Debian-2, OpenSSL 3.0.9 30 May 2023 + SSH_OPTIONS_SWITCHES = list('46AaCfGgKkMNnqsTtVvXxYy') + SSH_OPTIONS_ARGUMENTS = list('BbcDEeFIiJLlmOopQRSWw') + + def downgrade_required(self, term): """ - Given an argument vector for an ssh(1) command---including the `ssh` - command itself---iterate through it until we find what should be the - hostname, taking account of argument options. Return 'None' on fail. + Given a TERM terminal name, having already set self.config and + self.argv in the constructor, decide whether it's appropriate to + downgrade the TERM string before calling SSH. """ - - # Set up a parser object, a subclass of the stock argparse one. - parser = SSHArgumentParser() - # Handle boolean flags. - for letter in list('46AaCfGgKkMNnqsTtVvXxYy'): + # TERM wasn't set at all; do nothing + if not term: + return False + # TERM was set, but there's no translation configured for it; do + # nothing + if term not in self.config['translations']: + return False + + # Parse the SSH command line to try and get the hostname + parser = argparse.ArgumentParser() + for letter in self.SSH_OPTIONS_SWITCHES: parser.add_argument('-' + letter, action='store_true') - # Handle options that require arguments. - for letter in list('BbcDEeFIiJLlmOopQRSWw'): + for letter in self.SSH_OPTIONS_ARGUMENTS: parser.add_argument('-' + letter) - # Look for the hostname, which should always be required. parser.add_argument('hostname') - # Note that we ignore the command. - - # Try to parse the command line arguments after the program name. If we - # can't make sense of it, the subclass should throw our custom exception, - # which we'll catch and then return 'None' instead, meaning we can't tell - # what the hostname is, if there is one at all. - # + # Don't exit if we can't get the hostname, as ArgumentParser does by + # default; instead, just give up on the idea of downgrading try: - args = parser.parse_args() - return args.hostname - except SSHArgumentParserError: - return None + args = parser.parse_args(self.argv[1:]) + hostname = args.hostname + except SystemExit: + return False - def get_compatible_term(self): - """ - If there's a better TERM string to use for this hostname, return it; otherwise, - return None, meaning that we shouldn't export a different TERM for the actual - ssh(1) command run. - """ - - # Require that there's a translation for this TERM string to use. - try: - term = self.config['translations'][self._os.environ['TERM']] - except KeyError: - return None - - # Require stdin, stdout, and stderr all point to a terminal. Yes, I know - # this might be more Pythonic as a list comprehension, but I tried that and - # found it harder to read. - # - for descriptor in [ - self._sys.stdin, - self._sys.stdout, - self._sys.stderr]: - if not descriptor.isatty(): - return None - - # Require that we have at least one argument to ssh(1); we'll assume the - # last argument is the hostname. This is just an heuristic, and will often - # not be true; it'll often be a command to be run on the remote system, but - # in such a case that's done for batch jobs rather than interactive - # processes, and it therefore often doesn't matter what the TERM is. - # - hostname = self.get_hostname() - if not hostname: - return None - - # If the hostname parses as an IPv4 or IPv6 address, downgrade the TERM for - # compatibility. + # If the hostname looks like an IPv4 or IPv6 address, we'll downgrade try: ipaddress.ip_address(hostname) - return term + return True except ValueError: pass - # If the hostname matches the network equipment hostname pattern, downgrade - # the TERM for compatibility. + # If the hostname looks like one of the configured patterns, we'll + # downgrade for pattern in self.config['patterns']: if re.search(self.config['patterns'][pattern], hostname): - return term + return True + + # We *could* downgrade, but the hostname doesn't look like something we + # need to downgrade for, so don't + return False - # Default to keeping the defined terminal (i.e., don't downgrade). - return None + def exec(self): + """ + Convenience function to exec the real ssh(1) program with what we + decided in the object's constructor. + """ + os.environ['TERM'] = self.term + os.execv(self.argv[0], self.argv) +# If called from the command line, create the object with the real terminal and +# arguments to this script, and execute them as it concludes appropriate; the +# separation here is just to make the object testable if __name__ == '__main__': - snt = SSHNegotiateTerm() - snt.run() + snt = SSHNegotiateTerm(os.environ['TERM'], sys.argv) + snt.exec() |