Source code for ssh2.client

import paramiko
import select
import typing as t

from contextlib import suppress
from socket import error as SocketError
from paramiko.config import SSH_PORT
from paramiko.ssh_exception import (
    SSHException,
    AuthenticationException,
    BadAuthenticationType,
    BadHostKeyException
)
from ssh2.config import SSHConfigData
from ssh2.errors import SSHConnectionError
from ssh2.errors import SSHConfigurationError
from ssh2.errors import SSHChannelError
from ssh2.errors import SFTPError


[docs]class SSH: """ A high-level representation of a session with an SSH server. This class wraps the :class:`paramiko.SSHClient` object to simplify most aspects of interacting with an SSH server. """ def __init__(self) -> t.NoReturn: self._client = None self._output = [] @classmethod def _connect(cls, configs: SSHConfigData) -> paramiko.SSHClient: """ Returns the :class:`paramiko.SSHClient` object. Connects to SSH server and authenticates following order of priority set by paramiko -- see :class:`paramiko.connect`. :param configs: :class:`ssh2.config.SSHConfigData` object. :return: :class:`paramiko.SSHClient` object. """ try: ssh = paramiko.SSHClient() ssh.load_system_host_keys(configs.host_key_file) ssh.set_missing_host_key_policy(configs.host_key_policy) ssh.connect(**dict(configs)) return ssh except (SocketError, SSHException, AuthenticationException, BadAuthenticationType, BadHostKeyException, IOError, SSHConfigurationError) as err: raise SSHConnectionError( f"Connection to '{configs.hostname}' failed with " f"error: {err}")
[docs] def connect(self, configs: SSHConfigData) -> None: """ Establishes SSH connection to server. Connects to SSH server and authenticates following order of priority set by paramiko -- see :class:`paramiko.connect`. :param configs: :class:`ssh2.config.SSHConfigData` object. :return: None. """ self._client = SSH._connect(configs)
[docs] def disconnect(self) -> t.NoReturn: """ Close SSH connection. Terminates SSH connection and its underlying :class:`paramiko.Transport`. """ if isinstance(self._client, paramiko.SSHClient): self._client.close()
def is_active(self) -> bool: with suppress(EOFError): transport = self._client.get_transport() transport.send_ignore() return True return False
[docs] def open_tunnel( self, configs: SSHConfigData, dest_hostname: str, dest_port: t.Optional[int] = SSH_PORT, ) -> paramiko.Channel: """ Requests a new channel through an intermediary host. Creates a socket-like object used for establish connects to unreachable hosts, similarily to how the :class:`paramiko.ProxyCommand` works. :param dest_hostname: The destination hostname of this port forwarding. :param dest_port: The destination port. Default 22. :param configs: :class:`ssh2.config.SSHConfigData` object. :return: :class:`paramiko.Channel` object. :raises: :class:`ssh2.errors.SSHChannelError` """ tunnel = SSH._connect(configs) tunnel_transport = tunnel.get_transport() try: return tunnel_transport.open_channel( "direct-tcpip", (dest_hostname, dest_port), (configs.hostname, configs.port) ) except SSHException as err: raise SSHChannelError( f"Transport channel for '{dest_hostname}' failed with: {err}")
[docs] def open_sftp(self): """ Open an SFTP session on the SSH server. Note: This exposes the :class:`paramiko.open_sftp` session object, please view the documentation at http://docs.paramiko.org/en/stable/api/sftp.html. :return: :class:`paramiko.SFTPClient` session object """ if not isinstance(self._client, paramiko.SSHClient): raise SFTPError( f"Unable to establish SFTP session on non-existent " "`SSHClient` object.") return self._client.open_sftp()
[docs] def execute_realtime(self, command: str) -> int: """ Execute a command on the SSH server. The command's output stream is immediately printed to the terminal. :param command: command to execute. :return: exit status code of the executing command """ stdin, stdout, stderr = self._client.exec_command( command, get_pty=True) channel = stdout.channel stdin.close() # close stdin pseudo file -- not needed stderr.close() # close stderr pseudo file -- not needed channel.shutdown_write() # shutdown writes on the stdout channel for line in iter(stdout.readline, ""): print(line, end="") channel.shutdown_read() # shutdown reads on the stdout channel channel.close() # close stdout the channel return channel.recv_exit_status()
[docs] def execute( self, command: str, file: t.Optional[t.Union[None, t.TextIO]] = None ) -> t.Tuple[str, int]: """ Execute a command on the SSH server. The command's output stream is returned along with the exit status code. :param command: command to execute. :param file: an optional file-pointer object to write the output to. :return: a 2-tuple with the STDOUT and exit status code of the executing command. """ stdin, stdout, stderr = self._client.exec_command(command) channel = stdout.channel stdin.close() # close stdin pseudo file -- not needed channel.shutdown_write() # shutdown writes on the stdout channel self._output.append( channel.recv(len(channel.in_buffer)).decode("utf-8")) while not channel.closed or channel.recv_ready() \ or channel.recv_stderr_ready(): readq, _, _ = select.select([channel], [], [], 0.0) for c in readq: if c.recv_ready(): self._output.append( channel.recv(len(c.in_buffer)).decode("utf-8")) if c.recv_stderr_ready(): self._output.append( stderr.channel.recv_stderr( len(c.in_stderr_buffer)).decode("utf-8")) if channel.exit_status_ready() \ and not channel.recv_stderr_ready() \ and not channel.recv_ready(): channel.shutdown_read() # shutdown reads on the stdout channel channel.close() # close stdout the channel break stdout.close() # close stdout pseudo file stderr.close() # close stderr pseudo file if file: file.write("".join(self._output)) return "".join(self._output), channel.recv_exit_status()