Skip to content

back to Reference (Gold) summary

Reference (Gold): fabric

Pytest Summary for test tests

status count
passed 353
failed 41
skipped 7
total 401
collected 401

Failed pytests:

config.py::Config_::from_v1::must_be_given_explicit_env_arg

config.py::Config_::from_v1::must_be_given_explicit_env_arg
self = 

    def must_be_given_explicit_env_arg(self):
        config = Config.from_v1(
>           env=Lexicon(self.env, sudo_password="sikrit")
        )
E       AttributeError: 'from_v1' object has no attribute 'env'

tests/config.py:77: AttributeError

config.py::Config_::from_v1::additional_kwargs::forwards_arbitrary_kwargs_to_init

config.py::Config_::from_v1::additional_kwargs::forwards_arbitrary_kwargs_to_init
self = 

    def forwards_arbitrary_kwargs_to_init(self):
        config = Config.from_v1(
>           self.env,
            # Vanilla Invoke
            overrides={"some": "value"},
            # Fabric
            system_ssh_path="/what/ever",
        )
E       AttributeError: 'additional_kwargs' object has no attribute 'env'

tests/config.py:84: AttributeError

config.py::Config_::from_v1::additional_kwargs::subservient_to_runtime_overrides

config.py::Config_::from_v1::additional_kwargs::subservient_to_runtime_overrides
self = 

    def subservient_to_runtime_overrides(self):
>       env = self.env
E       AttributeError: 'additional_kwargs' object has no attribute 'env'

tests/config.py:94: AttributeError

config.py::Config_::from_v1::additional_kwargs::connect_kwargs_also_merged_with_imported_values

config.py::Config_::from_v1::additional_kwargs::connect_kwargs_also_merged_with_imported_values
self = 

    def connect_kwargs_also_merged_with_imported_values(self):
>       self.env["key_filename"] = "whatever"
E       AttributeError: 'additional_kwargs' object has no attribute 'env'

tests/config.py:102: AttributeError

config.py::Config_::from_v1::var_mappings::always_use_pty

config.py::Config_::from_v1::var_mappings::always_use_pty
self = 

    def always_use_pty(self):
        # Testing both due to v1-didn't-use-None-default issues
>       config = self._conf(always_use_pty=True)

tests/config.py:112: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'always_use_pty': True}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::forward_agent

config.py::Config_::from_v1::var_mappings::forward_agent
self = 

    def forward_agent(self):
>       config = self._conf(forward_agent=True)

tests/config.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'forward_agent': True}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::gateway

config.py::Config_::from_v1::var_mappings::gateway
self = 

    def gateway(self):
>       config = self._conf(gateway="bastion.host")

tests/config.py:122: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'gateway': 'bastion.host'}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::key_filename::base

config.py::Config_::from_v1::var_mappings::key_filename::base
self = 

    def base(self):
>       config = self._conf(key_filename="/some/path")

tests/config.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'key_filename': '/some/path'}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'key_filename' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::key_filename::is_not_set_if_None

config.py::Config_::from_v1::var_mappings::key_filename::is_not_set_if_None
self = 

    def is_not_set_if_None(self):
>       config = self._conf(key_filename=None)

tests/config.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'key_filename': None}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'key_filename' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::no_agent

config.py::Config_::from_v1::var_mappings::no_agent
self = 

    def no_agent(self):
>       config = self._conf()

tests/config.py:137: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::password::set_just_to_connect_kwargs_if_sudo_password_set

config.py::Config_::from_v1::var_mappings::password::set_just_to_connect_kwargs_if_sudo_password_set
self = 

    def set_just_to_connect_kwargs_if_sudo_password_set(self):
        # NOTE: default faux env has sudo_password set already...
>       config = self._conf(password="screaming-firehawks")

tests/config.py:145: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'password': 'screaming-firehawks'}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'password' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::password::set_to_both_password_fields_if_necessary

config.py::Config_::from_v1::var_mappings::password::set_to_both_password_fields_if_necessary
self = 

    def set_to_both_password_fields_if_necessary(self):
>       config = self._conf(password="sikrit", sudo_password=None)

tests/config.py:150: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'password': 'sikrit', 'sudo_password': None}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'password' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::ssh_config_path

config.py::Config_::from_v1::var_mappings::ssh_config_path
self = 

    def ssh_config_path(self):
>       self.env.ssh_config_path = "/where/ever"
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:155: AttributeError

config.py::Config_::from_v1::var_mappings::sudo_password

config.py::Config_::from_v1::var_mappings::sudo_password
self = 

    def sudo_password(self):
>       config = self._conf(sudo_password="sikrit")

tests/config.py:160: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'sudo_password': 'sikrit'}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::sudo_prompt

config.py::Config_::from_v1::var_mappings::sudo_prompt
self = 

    def sudo_prompt(self):
>       config = self._conf(sudo_prompt="password???")

tests/config.py:164: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'sudo_prompt': 'password???'}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::timeout

config.py::Config_::from_v1::var_mappings::timeout
self = 

    def timeout(self):
>       config = self._conf(timeout=15)

tests/config.py:168: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'timeout': 15}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::use_ssh_config

config.py::Config_::from_v1::var_mappings::use_ssh_config
self = 

    def use_ssh_config(self):
        # Testing both due to v1-didn't-use-None-default issues
>       config = self._conf(use_ssh_config=True)

tests/config.py:173: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'use_ssh_config': True}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

config.py::Config_::from_v1::var_mappings::warn_only

config.py::Config_::from_v1::var_mappings::warn_only
self = 

    def warn_only(self):
        # Testing both due to v1-didn't-use-None-default issues
>       config = self._conf(warn_only=True)

tests/config.py:180: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'warn_only': True}

    def _conf(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/config.py:72: AttributeError

connection.py::Connection_::init::ssh_config::proxy_jump::wins_over_default

connection.py::Connection_::init::ssh_config::proxy_jump::wins_over_default
self = 

    def wins_over_default(self):
        cxn = self._runtime_cxn(basename="proxyjump")
>       assert cxn.gateway == self._expected_gw
E       AttributeError: 'proxy_jump' object has no attribute '_expected_gw'

tests/connection.py:388: AttributeError

connection.py::Connection_::init::ssh_config::proxy_jump::wins_over_configuration

connection.py::Connection_::init::ssh_config::proxy_jump::wins_over_configuration
self = 

    def wins_over_configuration(self):
        cxn = self._runtime_cxn(
            basename="proxyjump", overrides={"gateway": "meh gw"}
        )
>       assert cxn.gateway == self._expected_gw
E       AttributeError: 'proxy_jump' object has no attribute '_expected_gw'

tests/connection.py:394: AttributeError

connection.py::Connection_::init::ssh_config::proxy_jump::gateway_Connections_get_parent_connection_configs

connection.py::Connection_::init::ssh_config::proxy_jump::gateway_Connections_get_parent_connection_configs
self = 

    def gateway_Connections_get_parent_connection_configs(self):
        conf = self._runtime_config(
            basename="proxyjump",
            overrides={"some_random_option": "a-value"},
        )
        cxn = Connection("runtime", config=conf)
        # Safety
        assert cxn.config is conf
>       assert cxn.gateway == self._expected_gw
E       AttributeError: 'proxy_jump' object has no attribute '_expected_gw'

tests/connection.py:449: AttributeError

connection.py::Connection_::from_v1::must_be_given_explicit_env_arg

connection.py::Connection_::from_v1::must_be_given_explicit_env_arg
self = 

    def must_be_given_explicit_env_arg(self):
>       cxn = Connection.from_v1(self.env)
E       AttributeError: 'from_v1' object has no attribute 'env'

tests/connection.py:526: AttributeError

connection.py::Connection_::from_v1::obtaining_config::defaults_to_calling_Config_from_v1

connection.py::Connection_::from_v1::obtaining_config::defaults_to_calling_Config_from_v1
self = 
Config_from_v1 = 

    @patch("fabric.connection.Config.from_v1")
    def defaults_to_calling_Config_from_v1(self, Config_from_v1):
>       Connection.from_v1(self.env)
E       AttributeError: 'obtaining_config' object has no attribute 'env'

tests/connection.py:532: AttributeError

connection.py::Connection_::from_v1::obtaining_config::may_be_given_config_explicitly

connection.py::Connection_::from_v1::obtaining_config::may_be_given_config_explicitly
self = 
Config_from_v1 = 

    @patch("fabric.connection.Config.from_v1")
    def may_be_given_config_explicitly(self, Config_from_v1):
        # Arguably a dupe of regular Connection constructor behavior,
        # but whatever.
>       Connection.from_v1(env=self.env, config=Config())
E       AttributeError: 'obtaining_config' object has no attribute 'env'

tests/connection.py:539: AttributeError

connection.py::Connection_::from_v1::additional_kwargs::forwards_arbitrary_kwargs_to_init

connection.py::Connection_::from_v1::additional_kwargs::forwards_arbitrary_kwargs_to_init
self = 

    def forwards_arbitrary_kwargs_to_init(self):
        cxn = Connection.from_v1(
>           self.env,
            connect_kwargs={"foo": "bar"},
            inline_ssh_env=False,
            connect_timeout=15,
        )
E       AttributeError: 'additional_kwargs' object has no attribute 'env'

tests/connection.py:546: AttributeError

connection.py::Connection_::from_v1::additional_kwargs::conflicting_kwargs_win_over_v1_env_values

connection.py::Connection_::from_v1::additional_kwargs::conflicting_kwargs_win_over_v1_env_values
self = 

    def conflicting_kwargs_win_over_v1_env_values(self):
>       env = Lexicon(self.env)
E       AttributeError: 'additional_kwargs' object has no attribute 'env'

tests/connection.py:556: AttributeError

connection.py::Connection_::from_v1::var_mappings::host_string

connection.py::Connection_::from_v1::var_mappings::host_string
self = 

    def host_string(self):
>       cxn = self._cxn()  # default is 'localghost'

tests/connection.py:566: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {}

    def _cxn(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/connection.py:522: AttributeError

connection.py::Connection_::from_v1::var_mappings::None_host_string_errors_usefully

connection.py::Connection_::from_v1::var_mappings::None_host_string_errors_usefully
self = 

    @raises(InvalidV1Env)
    def None_host_string_errors_usefully(self):
>       self._cxn(host_string=None)

tests/connection.py:571: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'host_string': None}

    def _cxn(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/connection.py:522: AttributeError

connection.py::Connection_::from_v1::var_mappings::user

connection.py::Connection_::from_v1::var_mappings::user
self = 

    def user(self):
>       cxn = self._cxn(user="space")

tests/connection.py:574: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'user': 'space'}

    def _cxn(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'var_mappings' object has no attribute 'env'

tests/connection.py:522: AttributeError

connection.py::Connection_::from_v1::var_mappings::port::basic

connection.py::Connection_::from_v1::var_mappings::port::basic
self = 

    def basic(self):
>       cxn = self._cxn(port=2222)

tests/connection.py:579: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'port': 2222}

    def _cxn(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'port' object has no attribute 'env'

tests/connection.py:522: AttributeError

connection.py::Connection_::from_v1::var_mappings::port::casted_to_int

connection.py::Connection_::from_v1::var_mappings::port::casted_to_int
self = 

    def casted_to_int(self):
>       cxn = self._cxn(port="2222")

tests/connection.py:583: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'port': '2222'}

    def _cxn(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'port' object has no attribute 'env'

tests/connection.py:522: AttributeError

connection.py::Connection_::from_v1::var_mappings::port::not_supplied_if_given_in_host_string

connection.py::Connection_::from_v1::var_mappings::port::not_supplied_if_given_in_host_string
self = 

    def not_supplied_if_given_in_host_string(self):
>       cxn = self._cxn(host_string="localghost:3737", port=2222)

tests/connection.py:587: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = 
kwargs = {'host_string': 'localghost:3737', 'port': 2222}

    def _cxn(self, **kwargs):
>       self.env.update(kwargs)
E       AttributeError: 'port' object has no attribute 'env'

tests/connection.py:522: AttributeError

connection.py::Connection_::shell::calls_RemoteShell_run_with_all_kwargs_and_returns_its_result

connection.py::Connection_::shell::calls_RemoteShell_run_with_all_kwargs_and_returns_its_result
self = 
RemoteShell = 
client = 

    @patch(remote_shell_path)
    def calls_RemoteShell_run_with_all_kwargs_and_returns_its_result(
        self, RemoteShell, client
    ):
        remote = RemoteShell.return_value
        cxn = Connection("host")
        kwargs = dict(
            env={"foo": "bar"},
            replace_env=True,
            encoding="utf-16",
            in_stream=StringIO("meh"),
            watchers=["meh"],
        )
        result = cxn.shell(**kwargs)
        RemoteShell.assert_any_call(context=cxn)
        assert remote.run.call_count == 1
        # Expect explicit use of default values for all kwarg-settings
        # besides what shell() itself tweaks
>       expected = dict(self.defaults, pty=True, command=None, **kwargs)
E       AttributeError: 'shell' object has no attribute 'defaults'

tests/connection.py:1079: AttributeError

connection.py::Connection_::shell::raises_TypeError_for_disallowed_kwargs

connection.py::Connection_::shell::raises_TypeError_for_disallowed_kwargs
self = 
client = 

    def raises_TypeError_for_disallowed_kwargs(self, client):
>       for key in self.defaults.keys():
E       AttributeError: 'shell' object has no attribute 'defaults'

tests/connection.py:1084: AttributeError

connection.py::Connection_::shell::honors_config_system_for_allowed_kwargs

connection.py::Connection_::shell::honors_config_system_for_allowed_kwargs
self = 
RemoteShell = 
client = 

    @patch(remote_shell_path)
    def honors_config_system_for_allowed_kwargs(self, RemoteShell, client):
        remote = RemoteShell.return_value
        allowed = dict(
            env={"foo": "bar"},
            replace_env=True,
            encoding="utf-16",
            in_stream="sentinel",
            watchers=["sentinel"],
        )
        ignored = dict(echo=True, hide="foo")  # Spot check
        config = Config({"run": dict(allowed, **ignored)})
        cxn = Connection("host", config=config)
        cxn.shell()
        kwargs = remote.run.call_args[1]
        for key, value in allowed.items():
            assert kwargs[key] == value
        for key, value in ignored.items():
>           assert kwargs[key] == self.defaults[key]
E           AttributeError: 'shell' object has no attribute 'defaults'

tests/connection.py:1119: AttributeError

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[run]

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[run]
self = 
Thread = 
Queue = , method = 'run'

    @mark.parametrize("method", ALL_METHODS)
    @patch("fabric.group.Queue")
    @patch("fabric.group.ExceptionHandlingThread")
    def executes_arguments_on_contents_run_via_threading(
        self, Thread, Queue, method
    ):
        queue = Queue.return_value
>       g = ThreadingGroup.from_connections(self.cxns)
E       AttributeError: 'ThreadingGroup_' object has no attribute 'cxns'

tests/group.py:193: AttributeError

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[sudo]

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[sudo]
self = 
Thread = 
Queue = , method = 'sudo'

    @mark.parametrize("method", ALL_METHODS)
    @patch("fabric.group.Queue")
    @patch("fabric.group.ExceptionHandlingThread")
    def executes_arguments_on_contents_run_via_threading(
        self, Thread, Queue, method
    ):
        queue = Queue.return_value
>       g = ThreadingGroup.from_connections(self.cxns)
E       AttributeError: 'ThreadingGroup_' object has no attribute 'cxns'

tests/group.py:193: AttributeError

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[put]

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[put]
self = 
Thread = 
Queue = , method = 'put'

    @mark.parametrize("method", ALL_METHODS)
    @patch("fabric.group.Queue")
    @patch("fabric.group.ExceptionHandlingThread")
    def executes_arguments_on_contents_run_via_threading(
        self, Thread, Queue, method
    ):
        queue = Queue.return_value
>       g = ThreadingGroup.from_connections(self.cxns)
E       AttributeError: 'ThreadingGroup_' object has no attribute 'cxns'

tests/group.py:193: AttributeError

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[get]

group.py::ThreadingGroup_::executes_arguments_on_contents_run_via_threading[get]
self = 
Thread = 
Queue = , method = 'get'

    @mark.parametrize("method", ALL_METHODS)
    @patch("fabric.group.Queue")
    @patch("fabric.group.ExceptionHandlingThread")
    def executes_arguments_on_contents_run_via_threading(
        self, Thread, Queue, method
    ):
        queue = Queue.return_value
>       g = ThreadingGroup.from_connections(self.cxns)
E       AttributeError: 'ThreadingGroup_' object has no attribute 'cxns'

tests/group.py:193: AttributeError

transfer.py::Transfer_::get::mode_concerns::preserves_remote_mode_by_default

transfer.py::Transfer_::get::mode_concerns::preserves_remote_mode_by_default
self = 
sftp = (, , )

    def preserves_remote_mode_by_default(self, sftp):
        transfer, client, mock_os = sftp
        # Attributes obj reflecting a realistic 'extended' octal mode
>       client.stat.return_value = self.attrs
E       AttributeError: 'mode_concerns' object has no attribute 'attrs'

tests/transfer.py:139: AttributeError

transfer.py::Transfer_::get::mode_concerns::allows_disabling_remote_mode_preservation

transfer.py::Transfer_::get::mode_concerns::allows_disabling_remote_mode_preservation
self = 
sftp = (, , )

    def allows_disabling_remote_mode_preservation(self, sftp):
        transfer, client, mock_os = sftp
>       client.stat.return_value = self.attrs
E       AttributeError: 'mode_concerns' object has no attribute 'attrs'

tests/transfer.py:147: AttributeError

Patch diff

diff --git a/fabric/_version.py b/fabric/_version.py
index 16e1eb75..f9a16f88 100644
--- a/fabric/_version.py
+++ b/fabric/_version.py
@@ -1,2 +1,2 @@
-__version_info__ = 3, 2, 2
-__version__ = '.'.join(map(str, __version_info__))
+__version_info__ = (3, 2, 2)
+__version__ = ".".join(map(str, __version_info__))
diff --git a/fabric/auth.py b/fabric/auth.py
index b3460d6b..b09529ad 100644
--- a/fabric/auth.py
+++ b/fabric/auth.py
@@ -1,8 +1,15 @@
 from functools import partial
 from getpass import getpass
 from pathlib import Path
+
 from paramiko import Agent, PKey
-from paramiko.auth_strategy import AuthStrategy, Password, InMemoryPrivateKey, OnDiskPrivateKey
+from paramiko.auth_strategy import (
+    AuthStrategy,
+    Password,
+    InMemoryPrivateKey,
+    OnDiskPrivateKey,
+)
+
 from .util import win32


@@ -30,6 +37,35 @@ class OpenSSHAuthStrategy(AuthStrategy):
     .. versionadded:: 3.1
     """

+    # Skimming openssh code (ssh.c and sshconnect2.c) gives us the following
+    # behavior to crib from:
+    # - parse cli (initializing identity_files if any given)
+    # - parse user config, then system config _unless_ user gave cli config
+    # path; this will also add to identity_files if any IdentityFile found
+    # (after the CLI ones)
+    # - lots of value init, string interpolation, etc
+    # - if no other identity_files exist at this point, fill in the ~/.ssh/
+    # defaults:
+    #   - in order: rsa, dsa, ecdsa, ecdsa_sk, ed25519, xmss (???)
+    # - initial connection (ssh_connect() - presumably handshake/hostkey/kex)
+    # - load all identity_files (and any implicit certs of those)
+    # - eventually runs pubkey_prepare() which, per its own comment,
+    # loads/assembles key material in this order:
+    #   - certs - config file, then cli, skipping any non-user (?) certs
+    #   - agent keys that are also listed in the config file
+    #   - agent keys _not_ listed in config files
+    #   - non-agent config file keys (this seems like it includes cli and
+    #   implicit defaults)
+    #   - once list is assembled, drop anything not listed in configured pubkey
+    #   algorithms list
+    # - auth_none to get list of acceptable authmethods
+    # - while-loops along that, or next returned, list of acceptable
+    # authmethods, using a handler table, so eg a 'standard' openssh on both
+    # ends might start with 'publickey,password,keyboard-interactive'; so it'll
+    # try all pubkeys found above before eventually trying a password prompt,
+    # and then if THAT fails, it will try kbdint call-and-response (similar to
+    # password but where server sends you the prompt(s) it wants displayed)
+
     def __init__(self, ssh_config, fabric_config, username):
         """
         Extends superclass with additional inputs.
@@ -48,10 +84,124 @@ class OpenSSHAuthStrategy(AuthStrategy):
         super().__init__(ssh_config=ssh_config)
         self.username = username
         self.config = fabric_config
+        # NOTE: Agent seems designed to always 'work' even w/o a live agent, in
+        # which case it just yields an empty key list.
         self.agent = Agent()

+    def get_pubkeys(self):
+        # Similar to OpenSSH, we first obtain sources in arbitrary order,
+        # tracking where they came from and whether they were a cert.
+        config_certs, config_keys, cli_certs, cli_keys = [], [], [], []
+        # Our own configuration is treated like `ssh -i`, partly because that's
+        # where our -i flag ends up, partly because our config data has no
+        # direct OpenSSH analogue (it's _not_ in your ssh_config! it's
+        # elsewhere!)
+        for path in self.config.authentication.identities:
+            try:
+                key = PKey.from_path(path)
+            except FileNotFoundError:
+                continue
+            source = OnDiskPrivateKey(
+                username=self.username,
+                source="python-config",
+                path=path,
+                pkey=key,
+            )
+            (cli_certs if key.public_blob else cli_keys).append(source)
+        # Now load ssh_config IdentityFile directives, sorting again into
+        # cert/key split.
+        # NOTE: Config's ssh_config loader already behaves OpenSSH-compatibly:
+        # if the user supplied a custom ssh_config file path, that is the only
+        # one loaded; otherwise, it loads and merges the user and system paths.
+        # TODO: CertificateFile support? Most people seem to rely on the
+        # implicit cert loading of IdentityFile...
+        for path in self.ssh_config.get("identityfile", []):
+            try:
+                key = PKey.from_path(path)
+            except FileNotFoundError:
+                continue
+            source = OnDiskPrivateKey(
+                username=self.username,
+                source="ssh-config",
+                path=path,
+                pkey=key,
+            )
+            (config_certs if key.public_blob else config_keys).append(source)
+        # At this point, if we've still got no keys or certs, look in the
+        # default user locations.
+        if not any((config_certs, config_keys, cli_certs, cli_keys)):
+            user_ssh = Path.home() / f"{'' if win32 else '.'}ssh"
+            # This is the same order OpenSSH documents as using.
+            for type_ in ("rsa", "ecdsa", "ed25519", "dsa"):
+                path = user_ssh / f"id_{type_}"
+                try:
+                    key = PKey.from_path(path)
+                except FileNotFoundError:
+                    continue
+                source = OnDiskPrivateKey(
+                    username=self.username,
+                    source="implicit-home",
+                    path=path,
+                    pkey=key,
+                )
+                dest = config_certs if key.public_blob else config_keys
+                dest.append(source)
+        # TODO: set agent_keys to empty list if IdentitiesOnly is true
+        agent_keys = self.agent.get_keys()
+
+        # We've finally loaded everything; now it's time to throw them upwards
+        # in the intended order...
+        # TODO: define subroutine that dedupes (& honors
+        # PubkeyAcceptedAlgorithms) then rub that on all of the below.
+        # First, all local _certs_ (config wins over cli, for w/e reason)
+        for source in config_certs:
+            yield source
+        for source in cli_certs:
+            yield source
+        # Then all agent keys, first ones that were also mentioned in configs,
+        # then 'new' ones not found in configs.
+        deferred_agent_keys = []
+        for key in agent_keys:
+            config_index = None
+            for i, config_key in enumerate(config_keys):
+                if config_key.pkey == key:
+                    config_index = i
+                    break
+            if config_index:
+                yield InMemoryPrivateKey(username=self.username, pkey=key)
+                # Nuke so it doesn't get re-yielded by regular conf keys bit
+                del config_keys[config_index]
+            else:
+                deferred_agent_keys.append(key)
+        for key in deferred_agent_keys:
+            yield InMemoryPrivateKey(username=self.username, pkey=key)
+        for source in cli_keys:
+            yield source
+        # This will now be just the config-borne keys that were NOT in agent
+        for source in config_keys:
+            yield source
+
+    def get_sources(self):
+        # TODO: initial none-auth + tracking the response's allowed types.
+        # however, SSHClient never did this deeply, and there's no guarantee a
+        # server _will_ send anything but "any" anyways...
+        # Public keys of all kinds typically first.
+        yield from self.get_pubkeys()
+        user = self.username
+        prompter = partial(getpass, f"{user}'s password: ")
+        # Then password.
+        yield Password(username=self.username, password_getter=prompter)
+
+    def authenticate(self, *args, **kwargs):
+        # Just do what our parent would, except make sure we close() after.
+        try:
+            return super().authenticate(*args, **kwargs)
+        finally:
+            self.close()
+
     def close(self):
         """
         Shut down any resources we ourselves opened up.
         """
-        pass
+        # TODO: bare try/except here as "best effort"? ugh
+        self.agent.close()
diff --git a/fabric/config.py b/fabric/config.py
index e501ed72..c56f3434 100644
--- a/fabric/config.py
+++ b/fabric/config.py
@@ -1,8 +1,10 @@
 import copy
 import errno
 import os
+
 from invoke.config import Config as InvokeConfig, merge_dicts
 from paramiko.config import SSHConfig
+
 from .runners import Remote, RemoteShell
 from .util import get_local_user, debug

@@ -33,7 +35,8 @@ class Config(InvokeConfig):

     .. versionadded:: 2.0
     """
-    prefix = 'fabric'
+
+    prefix = "fabric"

     @classmethod
     def from_v1(cls, env, **kwargs):
@@ -60,7 +63,43 @@ class Config(InvokeConfig):

         .. versionadded:: 2.4
         """
-        pass
+        # TODO: automagic import, if we can find a way to test that
+        # Use overrides level (and preserve whatever the user may have given)
+        # TODO: we really do want arbitrary number of config levels, don't we?
+        # TODO: most of these need more care re: only filling in when they
+        # differ from the v1 default. As-is these won't overwrite runtime
+        # overrides (due to .setdefault) but they may still be filling in empty
+        # values to stomp on lower level config levels...
+        data = kwargs.pop("overrides", {})
+        # TODO: just use a dataproxy or defaultdict??
+        for subdict in ("connect_kwargs", "run", "sudo", "timeouts"):
+            data.setdefault(subdict, {})
+        # PTY use
+        data["run"].setdefault("pty", env.always_use_pty)
+        # Gateway
+        data.setdefault("gateway", env.gateway)
+        # Agent forwarding
+        data.setdefault("forward_agent", env.forward_agent)
+        # Key filename(s)
+        if env.key_filename is not None:
+            data["connect_kwargs"].setdefault("key_filename", env.key_filename)
+        # Load keys from agent?
+        data["connect_kwargs"].setdefault("allow_agent", not env.no_agent)
+        data.setdefault("ssh_config_path", env.ssh_config_path)
+        # Sudo password
+        data["sudo"].setdefault("password", env.sudo_password)
+        # Vanilla password (may be used for regular and/or sudo, depending)
+        passwd = env.password
+        data["connect_kwargs"].setdefault("password", passwd)
+        if not data["sudo"]["password"]:
+            data["sudo"]["password"] = passwd
+        data["sudo"].setdefault("prompt", env.sudo_prompt)
+        data["timeouts"].setdefault("connect", env.timeout)
+        data.setdefault("load_ssh_configs", env.use_ssh_config)
+        data["run"].setdefault("warn", env.warn_only)
+        # Put overrides back for real constructor and go
+        kwargs["overrides"] = data
+        return cls(**kwargs)

     def __init__(self, *args, **kwargs):
         """
@@ -93,18 +132,33 @@ class Config(InvokeConfig):
             `set_runtime_ssh_path`, which will inform exactly what
             `load_ssh_config` does.
         """
-        ssh_config = kwargs.pop('ssh_config', None)
-        lazy = kwargs.get('lazy', False)
-        self.set_runtime_ssh_path(kwargs.pop('runtime_ssh_path', None))
-        system_path = kwargs.pop('system_ssh_path', '/etc/ssh/ssh_config')
+        # Tease out our own kwargs.
+        # TODO: consider moving more stuff out of __init__ and into methods so
+        # there's less of this sort of splat-args + pop thing? Eh.
+        ssh_config = kwargs.pop("ssh_config", None)
+        lazy = kwargs.get("lazy", False)
+        self.set_runtime_ssh_path(kwargs.pop("runtime_ssh_path", None))
+        system_path = kwargs.pop("system_ssh_path", "/etc/ssh/ssh_config")
         self._set(_system_ssh_path=system_path)
-        self._set(_user_ssh_path=kwargs.pop('user_ssh_path', '~/.ssh/config'))
+        self._set(_user_ssh_path=kwargs.pop("user_ssh_path", "~/.ssh/config"))
+
+        # Record whether we were given an explicit object (so other steps know
+        # whether to bother loading from disk or not)
+        # This needs doing before super __init__ as that calls our post_init
         explicit = ssh_config is not None
         self._set(_given_explicit_object=explicit)
+
+        # Arrive at some non-None SSHConfig object (upon which to run .parse()
+        # later, in _load_ssh_file())
         if ssh_config is None:
             ssh_config = SSHConfig()
         self._set(base_ssh_config=ssh_config)
+
+        # Now that our own attributes have been prepared & kwargs yanked, we
+        # can fall up into parent __init__()
         super().__init__(*args, **kwargs)
+
+        # And finally perform convenience non-lazy bits if needed
         if not lazy:
             self.load_ssh_config()

@@ -117,7 +171,7 @@ class Config(InvokeConfig):

         .. versionadded:: 2.0
         """
-        pass
+        self._set(_runtime_ssh_path=path)

     def load_ssh_config(self):
         """
@@ -128,7 +182,54 @@ class Config(InvokeConfig):

         .. versionadded:: 2.0
         """
-        pass
+        # Update the runtime SSH config path (assumes enough regular config
+        # levels have been loaded that anyone wanting to transmit this info
+        # from a 'vanilla' Invoke config, has gotten it set.)
+        if self.ssh_config_path:
+            self._runtime_ssh_path = self.ssh_config_path
+        # Load files from disk if we weren't given an explicit SSHConfig in
+        # __init__
+        if not self._given_explicit_object:
+            self._load_ssh_files()
+
+    def clone(self, *args, **kwargs):
+        # TODO: clone() at this point kinda-sorta feels like it's retreading
+        # __reduce__ and the related (un)pickling stuff...
+        # Get cloned obj.
+        # NOTE: Because we also extend .init_kwargs, the actual core SSHConfig
+        # data is passed in at init time (ensuring no files get loaded a 2nd,
+        # etc time) and will already be present, so we don't need to set
+        # .base_ssh_config ourselves. Similarly, there's no need to worry about
+        # how the SSH config paths may be inaccurate until below; nothing will
+        # be referencing them.
+        new = super().clone(*args, **kwargs)
+        # Copy over our custom attributes, so that the clone still resembles us
+        # re: recording where the data originally came from (in case anything
+        # re-runs ._load_ssh_files(), for example).
+        for attr in (
+            "_runtime_ssh_path",
+            "_system_ssh_path",
+            "_user_ssh_path",
+        ):
+            setattr(new, attr, getattr(self, attr))
+        # Load SSH configs, in case they weren't prior to now (e.g. a vanilla
+        # Invoke clone(into), instead of a us-to-us clone.)
+        self.load_ssh_config()
+        # All done
+        return new
+
+    def _clone_init_kwargs(self, *args, **kw):
+        # Parent kwargs
+        kwargs = super()._clone_init_kwargs(*args, **kw)
+        # Transmit our internal SSHConfig via explicit-obj kwarg, thus
+        # bypassing any file loading. (Our extension of clone() above copies
+        # over other attributes as well so that the end result looks consistent
+        # with reality.)
+        new_config = SSHConfig()
+        # TODO: as with other spots, this implies SSHConfig needs a cleaner
+        # public API re: creating and updating its core data.
+        new_config._config = copy.deepcopy(self.base_ssh_config._config)
+        return dict(kwargs, ssh_config=new_config)

     def _load_ssh_files(self):
         """
@@ -139,7 +240,20 @@ class Config(InvokeConfig):

         :returns: ``None``.
         """
-        pass
+        # TODO: does this want to more closely ape the behavior of
+        # InvokeConfig.load_files? re: having a _found attribute for each that
+        # determines whether to load or skip
+        if self._runtime_ssh_path is not None:
+            path = self._runtime_ssh_path
+            # Manually blow up like open() (_load_ssh_file normally doesn't)
+            if not os.path.exists(path):
+                raise FileNotFoundError(
+                    errno.ENOENT, "No such file or directory", path
+                )
+            self._load_ssh_file(os.path.expanduser(path))
+        elif self.load_ssh_configs:
+            for path in (self._user_ssh_path, self._system_ssh_path):
+                self._load_ssh_file(os.path.expanduser(path))

     def _load_ssh_file(self, path):
         """
@@ -149,7 +263,15 @@ class Config(InvokeConfig):

         :returns: ``None``.
         """
-        pass
+        if os.path.isfile(path):
+            old_rules = len(self.base_ssh_config._config)
+            with open(path) as fd:
+                self.base_ssh_config.parse(fd)
+            new_rules = len(self.base_ssh_config._config)
+            msg = "Loaded {} new ssh_config rules from {!r}"
+            debug(msg.format(new_rules - old_rules, path))
+        else:
+            debug("File not found, skipping")

     @staticmethod
     def global_defaults():
@@ -169,4 +291,41 @@ class Config(InvokeConfig):
             Added the ``authentication`` settings section, plus sub-attributes
             such as ``authentication.strategy_class``.
         """
-        pass
+        # TODO: hrm should the run-related things actually be derived from the
+        # runner_class? E.g. Local defines local stuff, Remote defines remote
+        # stuff? Doesn't help with the final config tree tho...
+        # TODO: as to that, this is a core problem, Fabric wants split
+        # local/remote stuff, eg replace_env wants to be False for local and
+        # True remotely; shell wants to differ depending on target (and either
+        # way, does not want to use local interrogation for remote)
+        # TODO: is it worth moving all of our 'new' settings to a discrete
+        # namespace for cleanliness' sake? e.g. ssh.port, ssh.user etc.
+        # It wouldn't actually simplify this code any, but it would make it
+        # easier for users to determine what came from which library/repo.
+        defaults = InvokeConfig.global_defaults()
+        # TODO 4.0: this is already a mess, strongly consider a new 'ssh'
+        # subtree because otherwise it's guessing where, or whether, 'ssh' is
+        # in the setting name! i.e. 'inline_ssh_env' -> ssh.use_inline_env,
+        # 'load_ssh_configs' -> ssh.load_configs, 'ssh_config_path' ->
+        # ssh.config_path, etc
+        ours = {
+            "authentication": {
+                "identities": [],
+                "strategy_class": None,
+            },
+            "connect_kwargs": {},
+            "forward_agent": False,
+            "gateway": None,
+            "inline_ssh_env": True,
+            "load_ssh_configs": True,
+            "port": 22,
+            "runners": {"remote": Remote, "remote_shell": RemoteShell},
+            "ssh_config_path": None,
+            "tasks": {"collection_name": "fabfile"},
+            # TODO: this becomes an override/extend once Invoke grows execution
+            # timeouts (which should be timeouts.execute)
+            "timeouts": {"connect": None},
+            "user": get_local_user(),
+        }
+        merge_dicts(defaults, ours)
+        return defaults
diff --git a/fabric/connection.py b/fabric/connection.py
index be567e3f..4d4c49b7 100644
--- a/fabric/connection.py
+++ b/fabric/connection.py
@@ -2,6 +2,7 @@ from contextlib import contextmanager
 from io import StringIO
 from threading import Event
 import socket
+
 from decorator import decorator
 from invoke import Context
 from invoke.exceptions import ThreadException
@@ -9,12 +10,42 @@ from paramiko.agent import AgentRequestHandler
 from paramiko.client import SSHClient, AutoAddPolicy
 from paramiko.config import SSHConfig
 from paramiko.proxy import ProxyCommand
+
 from .config import Config
 from .exceptions import InvalidV1Env
 from .transfer import Transfer
 from .tunnels import TunnelManager, Tunnel


+@decorator
+def opens(method, self, *args, **kwargs):
+    self.open()
+    return method(self, *args, **kwargs)
+
+
+def derive_shorthand(host_string):
+    user_hostport = host_string.rsplit("@", 1)
+    hostport = user_hostport.pop()
+    user = user_hostport[0] if user_hostport and user_hostport[0] else None
+
+    # IPv6: can't reliably tell where addr ends and port begins, so don't
+    # try (and don't bother adding special syntax either, user should avoid
+    # this situation by using port=).
+    if hostport.count(":") > 1:
+        host = hostport
+        port = None
+    # IPv4: can split on ':' reliably.
+    else:
+        host_port = hostport.rsplit(":", 1)
+        host = host_port.pop(0) or None
+        port = host_port[0] if host_port and host_port[0] else None
+
+    if port is not None:
+        port = int(port)
+
+    return {"user": user, "host": host, "port": port}
+
+
 class Connection(Context):
     """
     A connection to an SSH daemon, with methods for commands and file transfer.
@@ -89,6 +120,13 @@ class Connection(Context):

     .. versionadded:: 2.0
     """
+
+    # NOTE: these are initialized here to hint to invoke.Config.__setattr__
+    # that they should be treated as real attributes instead of config proxies.
+    # (Additionally, we're doing this instead of using invoke.Config._set() so
+    # we can take advantage of Sphinx's attribute-doc-comment static analysis.)
+    # Once an instance is created, these values will usually be non-None
+    # because they default to the default config values.
     host = None
     original_host = None
     user = None
@@ -126,11 +164,60 @@ class Connection(Context):

         .. versionadded:: 2.4
         """
-        pass
-
-    def __init__(self, host, user=None, port=None, config=None, gateway=
-        None, forward_agent=None, connect_timeout=None, connect_kwargs=None,
-        inline_ssh_env=None):
+        # TODO: import fabric.state.env (need good way to test it first...)
+        # TODO: how to handle somebody accidentally calling this in a process
+        # where 'fabric' is fabric 2, and there's no fabric 1? Probably just a
+        # re-raise of ImportError??
+        # Our only requirement is a non-empty host_string
+        if not env.host_string:
+            raise InvalidV1Env(
+                "Supplied v1 env has an empty `host_string` value! Please make sure you're calling Connection.from_v1 within a connected Fabric 1 session."  # noqa
+            )
+        # TODO: detect collisions with kwargs & except instead of overwriting?
+        # (More Zen of Python compliant, but also, effort, and also, makes it
+        # harder for users to intentionally overwrite!)
+        connect_kwargs = kwargs.setdefault("connect_kwargs", {})
+        kwargs.setdefault("host", env.host_string)
+        shorthand = derive_shorthand(env.host_string)
+        # TODO: don't we need to do the below skipping for user too?
+        kwargs.setdefault("user", env.user)
+        # Skip port if host string seemed to have it; otherwise we hit our own
+        # ambiguity clause in __init__. v1 would also have been doing this
+        # anyways (host string wins over other settings).
+        if not shorthand["port"]:
+            # Run port through int(); v1 inexplicably has a string default...
+            kwargs.setdefault("port", int(env.port))
+        # key_filename defaults to None in v1, but in v2, we expect it to be
+        # either unset, or set to a list. Thus, we only pull it over if it is
+        # not None.
+        if env.key_filename is not None:
+            connect_kwargs.setdefault("key_filename", env.key_filename)
+        # Obtain config values, if not given, from its own from_v1
+        # NOTE: not using setdefault as we truly only want to call
+        # Config.from_v1 when necessary.
+        if "config" not in kwargs:
+            kwargs["config"] = Config.from_v1(env)
+        return cls(**kwargs)
+
+    # TODO: should "reopening" an existing Connection object that has been
+    # closed, be allowed? (See e.g. how v1 detects closed/semi-closed
+    # connections & nukes them before creating a new client to the same host.)
+    # TODO: push some of this into paramiko.client.Client? e.g. expand what
+    # Client.exec_command does, it already allows configuring a subset of what
+    # we do / will eventually do / did in 1.x. It's silly to have to do
+    # .get_transport().open_session().
+    def __init__(
+        self,
+        host,
+        user=None,
+        port=None,
+        config=None,
+        gateway=None,
+        forward_agent=None,
+        connect_timeout=None,
+        connect_kwargs=None,
+        inline_ssh_env=None,
+    ):
         """
         Set up a new object representing a server connection.

@@ -282,67 +369,194 @@ class Connection(Context):
             config value has now changed and defaults to ``True``, not
             ``False``.
         """
+        # NOTE: parent __init__ sets self._config; for now we simply overwrite
+        # that below. If it's somehow problematic we would want to break parent
+        # __init__ up in a manner that is more cleanly overrideable.
         super().__init__(config=config)
+
+        #: The .Config object referenced when handling default values (for e.g.
+        #: user or port, when not explicitly given) or deciding how to behave.
         if config is None:
             config = Config()
+        # Handle 'vanilla' Invoke config objects, which need cloning 'into' one
+        # of our own Configs (which grants the new defaults, etc, while not
+        # squashing them if the Invoke-level config already accounted for them)
         elif not isinstance(config, Config):
             config = config.clone(into=Config)
         self._set(_config=config)
+        # TODO: when/how to run load_files, merge, load_shell_env, etc?
+        # TODO: i.e. what is the lib use case here (and honestly in invoke too)
+
         shorthand = self.derive_shorthand(host)
-        host = shorthand['host']
-        err = (
-            'You supplied the {} via both shorthand and kwarg! Please pick one.'
-            )
-        if shorthand['user'] is not None:
+        host = shorthand["host"]
+        err = "You supplied the {} via both shorthand and kwarg! Please pick one."  # noqa
+        if shorthand["user"] is not None:
             if user is not None:
-                raise ValueError(err.format('user'))
-            user = shorthand['user']
-        if shorthand['port'] is not None:
+                raise ValueError(err.format("user"))
+            user = shorthand["user"]
+        if shorthand["port"] is not None:
             if port is not None:
-                raise ValueError(err.format('port'))
-            port = shorthand['port']
+                raise ValueError(err.format("port"))
+            port = shorthand["port"]
+
+        # NOTE: we load SSH config data as early as possible as it has
+        # potential to affect nearly every other attribute.
+        #: The per-host SSH config data, if any. (See :ref:`ssh-config`.)
         self.ssh_config = self.config.base_ssh_config.lookup(host)
+
         self.original_host = host
+        #: The hostname of the target server.
         self.host = host
-        if 'hostname' in self.ssh_config:
-            self.host = self.ssh_config['hostname']
-        self.user = user or self.ssh_config.get('user', self.config.user)
-        self.port = port or int(self.ssh_config.get('port', self.config.port))
+        if "hostname" in self.ssh_config:
+            # TODO: log that this occurred?
+            self.host = self.ssh_config["hostname"]
+
+        #: The username this connection will use to connect to the remote end.
+        self.user = user or self.ssh_config.get("user", self.config.user)
+        # TODO: is it _ever_ possible to give an empty user value (e.g.
+        # user='')? E.g. do some SSH server specs allow for that?
+
+        #: The network port to connect on.
+        self.port = port or int(self.ssh_config.get("port", self.config.port))
+
+        # Gateway/proxy/bastion/jump setting: non-None values - string,
+        # Connection, even eg False - get set directly; None triggers seek in
+        # config/ssh_config
+        #: The gateway `.Connection` or ``ProxyCommand`` string to be used,
+        #: if any.
         self.gateway = gateway if gateway is not None else self.get_gateway()
+        # NOTE: we use string above, vs ProxyCommand obj, to avoid spinning up
+        # the ProxyCommand subprocess at init time, vs open() time.
+        # TODO: make paramiko.proxy.ProxyCommand lazy instead?
+
         if forward_agent is None:
+            # Default to config...
             forward_agent = self.config.forward_agent
-            if 'forwardagent' in self.ssh_config:
-                map_ = {'yes': True, 'no': False}
-                forward_agent = map_[self.ssh_config['forwardagent']]
+            # But if ssh_config is present, it wins
+            if "forwardagent" in self.ssh_config:
+                # TODO: SSHConfig really, seriously needs some love here, god
+                map_ = {"yes": True, "no": False}
+                forward_agent = map_[self.ssh_config["forwardagent"]]
+        #: Whether agent forwarding is enabled.
         self.forward_agent = forward_agent
+
         if connect_timeout is None:
-            connect_timeout = self.ssh_config.get('connecttimeout', self.
-                config.timeouts.connect)
+            connect_timeout = self.ssh_config.get(
+                "connecttimeout", self.config.timeouts.connect
+            )
         if connect_timeout is not None:
             connect_timeout = int(connect_timeout)
+        #: Connection timeout
         self.connect_timeout = connect_timeout
+
+        #: Keyword arguments given to `paramiko.client.SSHClient.connect` when
+        #: `open` is called.
         self.connect_kwargs = self.resolve_connect_kwargs(connect_kwargs)
+
+        #: The `paramiko.client.SSHClient` instance this connection wraps.
         client = SSHClient()
         client.set_missing_host_key_policy(AutoAddPolicy())
         self.client = client
+
+        #: A convenience handle onto the return value of
+        #: ``self.client.get_transport()`` (after connection time).
         self.transport = None
+
         if inline_ssh_env is None:
             inline_ssh_env = self.config.inline_ssh_env
+        #: Whether to construct remote command lines with env vars prefixed
+        #: inline.
         self.inline_ssh_env = inline_ssh_env

+    def resolve_connect_kwargs(self, connect_kwargs):
+        # TODO: is it better to pre-empt conflicts w/ manually-handled
+        # connect() kwargs (hostname, username, etc) here or in open()? We're
+        # doing open() for now in case e.g. someone manually modifies
+        # .connect_kwargs attributewise, but otherwise it feels better to do it
+        # early instead of late.
+        constructor_kwargs = connect_kwargs or {}
+        config_kwargs = self.config.connect_kwargs
+        constructor_keys = constructor_kwargs.get("key_filename", [])
+        config_keys = config_kwargs.get("key_filename", [])
+        ssh_config_keys = self.ssh_config.get("identityfile", [])
+
+        # Default data: constructor if given, config otherwise
+        final_kwargs = constructor_kwargs or config_kwargs
+
+        # Key filename: merge, in order, config (which includes CLI flags),
+        # then constructor kwargs, and finally SSH config file data.
+        # Make sure all are normalized to list as well!
+        final_keys = []
+        for value in (config_keys, constructor_keys, ssh_config_keys):
+            if isinstance(value, str):
+                value = [value]
+            final_keys.extend(value)
+        # Only populate if non-empty.
+        if final_keys:
+            final_kwargs["key_filename"] = final_keys
+
+        return final_kwargs
+
+    def get_gateway(self):
+        # SSH config wins over Invoke-style config
+        if "proxyjump" in self.ssh_config:
+            # Reverse hop1,hop2,hop3 style ProxyJump directive so we start
+            # with the final (itself non-gatewayed) hop and work up to
+            # the front (actual, supplied as our own gateway) hop
+            hops = reversed(self.ssh_config["proxyjump"].split(","))
+            prev_gw = None
+            for hop in hops:
+                # Short-circuit if we appear to be our own proxy, which would
+                # be a RecursionError. Implies SSH config wildcards.
+                # TODO: in an ideal world we'd check user/port too in case they
+                # differ, but...seriously? They can file a PR with those extra
+                # half dozen test cases in play, E_NOTIME
+                if self.derive_shorthand(hop)["host"] == self.host:
+                    return None
+                # Happily, ProxyJump uses identical format to our host
+                # shorthand...
+                kwargs = dict(config=self.config.clone())
+                if prev_gw is not None:
+                    kwargs["gateway"] = prev_gw
+                cxn = Connection(hop, **kwargs)
+                prev_gw = cxn
+            return prev_gw
+        elif "proxycommand" in self.ssh_config:
+            # Just a string, which we interpret as a proxy command..
+            return self.ssh_config["proxycommand"]
+        # Fallback: config value (may be None).
+        return self.config.gateway
+
     def __repr__(self):
-        bits = [('host', self.host)]
+        # Host comes first as it's the most common differentiator by far
+        bits = [("host", self.host)]
+        # TODO: maybe always show user regardless? Explicit is good...
         if self.user != self.config.user:
-            bits.append(('user', self.user))
+            bits.append(("user", self.user))
+        # TODO: harder to make case for 'always show port'; maybe if it's
+        # non-22 (even if config has overridden the local default)?
         if self.port != self.config.port:
-            bits.append(('port', self.port))
+            bits.append(("port", self.port))
+        # NOTE: sometimes self.gateway may be eg False if someone wants to
+        # explicitly override a configured non-None value (as otherwise it's
+        # impossible for __init__ to tell if a None means "nothing given" or
+        # "seriously please no gatewaying". So, this must always be a vanilla
+        # truth test and not eg "is not None".
         if self.gateway:
-            val = 'proxyjump'
+            # Displaying type because gw params would probs be too verbose
+            val = "proxyjump"
             if isinstance(self.gateway, str):
-                val = 'proxycommand'
-            bits.append(('gw', val))
-        return '<Connection {}>'.format(' '.join('{}={}'.format(*x) for x in
-            bits))
+                val = "proxycommand"
+            bits.append(("gw", val))
+        return "<Connection {}>".format(
+            " ".join("{}={}".format(*x) for x in bits)
+        )
+
+    def _identity(self):
+        # TODO: consider including gateway and maybe even other init kwargs?
+        # Whether two cxns w/ same user/host/port but different
+        # gateway/keys/etc, should be considered "the same", is unclear.
+        return (self.host, self.user, self.port)

     def __eq__(self, other):
         if not isinstance(other, Connection):
@@ -353,8 +567,16 @@ class Connection(Context):
         return self._identity() < other._identity()

     def __hash__(self):
+        # NOTE: this departs from Context/DataProxy, which is not usefully
+        # hashable.
         return hash(self._identity())

+    def derive_shorthand(self, host_string):
+        # NOTE: used to be defined inline; preserving API call for both
+        # backwards compatibility and because it seems plausible we may want to
+        # modify behavior later, using eg config or other attributes.
+        return derive_shorthand(host_string)
+
     @property
     def is_connected(self):
         """
@@ -362,7 +584,7 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        return self.transport.active if self.transport else False

     def open(self):
         """
@@ -387,7 +609,62 @@ class Connection(Context):
             Now returns the inner Paramiko connect call's return value instead
             of always returning the implicit ``None``.
         """
-        pass
+        # Short-circuit
+        if self.is_connected:
+            return
+        err = "Refusing to be ambiguous: connect() kwarg '{}' was given both via regular arg and via connect_kwargs!"  # noqa
+        # These may not be given, period
+        for key in """
+            hostname
+            port
+            username
+        """.split():
+            if key in self.connect_kwargs:
+                raise ValueError(err.format(key))
+        # These may be given one way or the other, but not both
+        if (
+            "timeout" in self.connect_kwargs
+            and self.connect_timeout is not None
+        ):
+            raise ValueError(err.format("timeout"))
+        # No conflicts -> merge 'em together
+        kwargs = dict(
+            self.connect_kwargs,
+            username=self.user,
+            hostname=self.host,
+            port=self.port,
+        )
+        if self.gateway:
+            kwargs["sock"] = self.open_gateway()
+        if self.connect_timeout:
+            kwargs["timeout"] = self.connect_timeout
+        # Strip out empty defaults for less noisy debugging
+        if "key_filename" in kwargs and not kwargs["key_filename"]:
+            del kwargs["key_filename"]
+        auth_strategy_class = self.authentication.strategy_class
+        if auth_strategy_class is not None:
+            # Pop connect_kwargs related to auth to avoid giving Paramiko
+            # conflicting signals.
+            for key in (
+                "allow_agent",
+                "key_filename",
+                "look_for_keys",
+                "passphrase",
+                "password",
+                "pkey",
+                "username",
+            ):
+                kwargs.pop(key, None)
+
+            kwargs["auth_strategy"] = auth_strategy_class(
+                ssh_config=self.ssh_config,
+                fabric_config=self.config,
+                username=self.user,
+            )
+        # Actually connect!
+        result = self.client.connect(**kwargs)
+        self.transport = self.client.get_transport()
+        return result

     def open_gateway(self):
         """
@@ -400,7 +677,34 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        # ProxyCommand is faster to set up, so do it first.
+        if isinstance(self.gateway, str):
+            # Leverage a dummy SSHConfig to ensure %h/%p/etc are parsed.
+            # TODO: use real SSH config once loading one properly is
+            # implemented.
+            ssh_conf = SSHConfig()
+            dummy = "Host {}\n    ProxyCommand {}"
+            ssh_conf.parse(StringIO(dummy.format(self.host, self.gateway)))
+            return ProxyCommand(ssh_conf.lookup(self.host)["proxycommand"])
+        # Handle inner-Connection gateway type here.
+        # TODO: logging
+        self.gateway.open()
+        # TODO: expose the opened channel itself as an attribute? (another
+        # possible argument for separating the two gateway types...) e.g. if
+        # someone wanted to piggyback on it for other same-interpreter socket
+        # needs...
+        # TODO: and the inverse? allow users to supply their own socket/like
+        # object they got via $WHEREEVER?
+        # TODO: how best to expose timeout param? reuse general connection
+        # timeout from config?
+        return self.gateway.transport.open_channel(
+            kind="direct-tcpip",
+            dest_addr=(self.host, int(self.port)),
+            # NOTE: src_addr needs to be 'empty but not None' values to
+            # correctly encode into a network message. Theoretically Paramiko
+            # could auto-interpret None sometime & save us the trouble.
+            src_addr=("", 0),
+        )

     def close(self):
         """
@@ -414,7 +718,14 @@ class Connection(Context):
         .. versionchanged:: 3.0
             Now closes SFTP sessions too (2.x required manually doing so).
         """
-        pass
+        if self._sftp is not None:
+            self._sftp.close()
+            self._sftp = None
+
+        if self.is_connected:
+            self.client.close()
+            if self.forward_agent and self._agent_handler is not None:
+                self._agent_handler.close()

     def __enter__(self):
         return self
@@ -422,6 +733,18 @@ class Connection(Context):
     def __exit__(self, *exc):
         self.close()

+    @opens
+    def create_session(self):
+        channel = self.transport.open_session()
+        if self.forward_agent:
+            self._agent_handler = AgentRequestHandler(channel)
+        return channel
+
+    def _remote_runner(self):
+        return self.config.runners.remote(
+            context=self, inline_env=self.inline_ssh_env
+        )
+
     @opens
     def run(self, command, **kwargs):
         """
@@ -437,7 +760,7 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        return self._run(self._remote_runner(), command, **kwargs)

     @opens
     def sudo(self, command, **kwargs):
@@ -451,7 +774,7 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        return self._sudo(self._remote_runner(), command, **kwargs)

     @opens
     def shell(self, **kwargs):
@@ -513,7 +836,25 @@ class Connection(Context):

         .. versionadded:: 2.7
         """
-        pass
+        runner = self.config.runners.remote_shell(context=self)
+        # Reinstate most defaults as explicit kwargs to ensure user's config
+        # doesn't make this mode break horribly. Then override a few that need
+        # to change, like pty.
+        allowed = ("encoding", "env", "in_stream", "replace_env", "watchers")
+        new_kwargs = {}
+        for key, value in self.config.global_defaults()["run"].items():
+            if key in allowed:
+                # Use allowed kwargs if given, otherwise also fill them from
+                # defaults
+                new_kwargs[key] = kwargs.pop(key, self.config.run[key])
+            else:
+                new_kwargs[key] = value
+        new_kwargs.update(pty=True)
+        # At this point, any leftover kwargs would be ignored, so yell instead
+        if kwargs:
+            err = "shell() got unexpected keyword arguments: {!r}"
+            raise TypeError(err.format(list(kwargs.keys())))
+        return runner.run(command=None, **new_kwargs)

     def local(self, *args, **kwargs):
         """
@@ -524,7 +865,9 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        # Superclass run() uses runners.local, so we can literally just call it
+        # straight.
+        return super().run(*args, **kwargs)

     @opens
     def sftp(self):
@@ -538,7 +881,9 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        if self._sftp is None:
+            self._sftp = self.client.open_sftp()
+        return self._sftp

     def get(self, *args, **kwargs):
         """
@@ -549,7 +894,7 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        return Transfer(self).get(*args, **kwargs)

     def put(self, *args, **kwargs):
         """
@@ -560,12 +905,20 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        return Transfer(self).put(*args, **kwargs)

+    # TODO: yield the socket for advanced users? Other advanced use cases
+    # (perhaps factor out socket creation itself)?
+    # TODO: probably push some of this down into Paramiko
     @contextmanager
     @opens
-    def forward_local(self, local_port, remote_port=None, remote_host=
-        'localhost', local_host='localhost'):
+    def forward_local(
+        self,
+        local_port,
+        remote_port=None,
+        remote_host="localhost",
+        local_host="localhost",
+    ):
         """
         Open a tunnel connecting ``local_port`` to the server's environment.

@@ -606,12 +959,65 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
-
+        if not remote_port:
+            remote_port = local_port
+
+        # TunnelManager does all of the work, sitting in the background (so we
+        # can yield) and spawning threads every time somebody connects to our
+        # local port.
+        finished = Event()
+        manager = TunnelManager(
+            local_port=local_port,
+            local_host=local_host,
+            remote_port=remote_port,
+            remote_host=remote_host,
+            # TODO: not a huge fan of handing in our transport, but...?
+            transport=self.transport,
+            finished=finished,
+        )
+        manager.start()
+
+        # Return control to caller now that things ought to be operational
+        try:
+            yield
+        # Teardown once user exits block
+        finally:
+            # Signal to manager that it should close all open tunnels
+            finished.set()
+            # Then wait for it to do so
+            manager.join()
+            # Raise threading errors from within the manager, which would be
+            # one of:
+            # - an inner ThreadException, which was created by the manager on
+            # behalf of its Tunnels; this gets directly raised.
+            # - some other exception, which would thus have occurred in the
+            # manager itself; we wrap this in a new ThreadException.
+            # NOTE: in these cases, some of the metadata tracking in
+            # ExceptionHandlingThread/ExceptionWrapper/ThreadException (which
+            # is useful when dealing with multiple nearly-identical sibling IO
+            # threads) is superfluous, but it doesn't feel worth breaking
+            # things up further; we just ignore it for now.
+            wrapper = manager.exception()
+            if wrapper is not None:
+                if wrapper.type is ThreadException:
+                    raise wrapper.value
+                else:
+                    raise ThreadException([wrapper])
+
+            # TODO: cancel port forward on transport? Does that even make sense
+            # here (where we used direct-tcpip) vs the opposite method (which
+            # is what uses forward-tcpip)?
+
+    # TODO: probably push some of this down into Paramiko
     @contextmanager
     @opens
-    def forward_remote(self, remote_port, local_port=None, remote_host=
-        '127.0.0.1', local_host='localhost'):
+    def forward_remote(
+        self,
+        remote_port,
+        local_port=None,
+        remote_host="127.0.0.1",
+        local_host="localhost",
+    ):
         """
         Open a tunnel connecting ``remote_port`` to the local environment.

@@ -656,4 +1062,54 @@ class Connection(Context):

         .. versionadded:: 2.0
         """
-        pass
+        if not local_port:
+            local_port = remote_port
+        # Callback executes on each connection to the remote port and is given
+        # a Channel hooked up to said port. (We don't actually care about the
+        # source/dest host/port pairs at all; only whether the channel has data
+        # to read and suchlike.)
+        # We then pair that channel with a new 'outbound' socket connection to
+        # the local host/port being forwarded, in a new Tunnel.
+        # That Tunnel is then added to a shared data structure so we can track
+        # & close them during shutdown.
+        #
+        # TODO: this approach is less than ideal because we have to share state
+        # between ourselves & the callback handed into the transport's own
+        # thread handling (which is roughly analogous to our self-controlled
+        # TunnelManager for local forwarding). See if we can use more of
+        # Paramiko's API (or improve it and then do so) so that isn't
+        # necessary.
+        tunnels = []
+
+        def callback(channel, src_addr_tup, dst_addr_tup):
+            sock = socket.socket()
+            # TODO: handle connection failure such that channel, etc get closed
+            sock.connect((local_host, local_port))
+            # TODO: we don't actually need to generate the Events at our level,
+            # do we? Just let Tunnel.__init__ do it; all we do is "press its
+            # button" on shutdown...
+            tunnel = Tunnel(channel=channel, sock=sock, finished=Event())
+            tunnel.start()
+            # Communication between ourselves & the Paramiko handling subthread
+            tunnels.append(tunnel)
+
+        # Ask Paramiko (really, the remote sshd) to call our callback whenever
+        # connections are established on the remote iface/port.
+        # transport.request_port_forward(remote_host, remote_port, callback)
+        try:
+            self.transport.request_port_forward(
+                address=remote_host, port=remote_port, handler=callback
+            )
+            yield
+        finally:
+            # TODO: see above re: lack of a TunnelManager
+            # TODO: and/or also refactor with TunnelManager re: shutdown logic.
+            # E.g. maybe have a non-thread TunnelManager-alike with a method
+            # that acts as the callback? At least then there's a tiny bit more
+            # encapsulation...meh.
+            for tunnel in tunnels:
+                tunnel.finished.set()
+                tunnel.join()
+            self.transport.cancel_port_forward(
+                address=remote_host, port=remote_port
+            )
diff --git a/fabric/exceptions.py b/fabric/exceptions.py
index 0343cf15..f965219e 100644
--- a/fabric/exceptions.py
+++ b/fabric/exceptions.py
@@ -1,3 +1,5 @@
+# TODO: this may want to move to Invoke if we can find a use for it there too?
+# Or make it _more_ narrowly focused and stay here?
 class NothingToDo(Exception):
     pass

@@ -10,6 +12,9 @@ class GroupException(Exception):
     """

     def __init__(self, result):
+        #: The `.GroupResult` object which would have been returned, had there
+        #: been no errors. See its docstring (and that of `.Group`) for
+        #: details.
         self.result = result


@@ -17,4 +22,5 @@ class InvalidV1Env(Exception):
     """
     Raised when attempting to import a Fabric 1 ``env`` which is missing data.
     """
+
     pass
diff --git a/fabric/executor.py b/fabric/executor.py
index 2bb16019..4ef06f68 100644
--- a/fabric/executor.py
+++ b/fabric/executor.py
@@ -1,5 +1,6 @@
 import invoke
 from invoke import Call, Task
+
 from .tasks import ConnectionCall
 from .exceptions import NothingToDo
 from .util import debug
@@ -38,7 +39,64 @@ class Executor(invoke.Executor):

         :returns: Homogenous list of Connection init kwarg dicts.
         """
-        pass
+        dicts = []
+        for value in hosts or []:
+            # Assume first posarg to Connection() if not already a dict.
+            if not isinstance(value, dict):
+                value = dict(host=value)
+            dicts.append(value)
+        return dicts
+
+    def expand_calls(self, calls, apply_hosts=True):
+        # Generate new call list with per-host variants & Connections inserted
+        ret = []
+        cli_hosts = []
+        host_str = self.core[0].args.hosts.value
+        if apply_hosts and host_str:
+            cli_hosts = host_str.split(",")
+        for call in calls:
+            if isinstance(call, Task):
+                call = Call(task=call)
+            # TODO: expand this to allow multiple types of execution plans,
+            # pending outcome of invoke#461 (which, if flexible enough to
+            # handle intersect of dependencies+parameterization, just becomes
+            # 'honor that new feature of Invoke')
+            # TODO: roles, other non-runtime host parameterizations, etc
+            # Pre-tasks get added only once, not once per host.
+            ret.extend(self.expand_calls(call.pre, apply_hosts=False))
+            # Determine final desired host list based on CLI and task values
+            # (with CLI, being closer to runtime, winning) and normalize to
+            # Connection-init kwargs.
+            call_hosts = getattr(call, "hosts", None)
+            cxn_params = self.normalize_hosts(cli_hosts or call_hosts)
+            # Main task, per host/connection
+            for init_kwargs in cxn_params:
+                ret.append(self.parameterize(call, init_kwargs))
+            # Deal with lack of hosts list (acts same as `inv` in that case)
+            # TODO: no tests for this branch?
+            if not cxn_params:
+                ret.append(call)
+            # Post-tasks added once, not once per host.
+            ret.extend(self.expand_calls(call.post, apply_hosts=False))
+        # Add remainder as anonymous task
+        if self.core.remainder:
+            # TODO: this will need to change once there are more options for
+            # setting host lists besides "-H or 100% within-task"
+            if not cli_hosts:
+                raise NothingToDo(
+                    "Was told to run a command, but not given any hosts to run it on!"  # noqa
+                )
+
+            def anonymous(c):
+                c.run(self.core.remainder)
+
+            anon = Call(Task(body=anonymous))
+            # TODO: see above TODOs about non-parameterized setups, roles etc
+            # TODO: will likely need to refactor that logic some more so it can
+            # be used both there and here.
+            for init_kwargs in self.normalize_hosts(cli_hosts):
+                ret.append(self.parameterize(anon, init_kwargs))
+        return ret

     def parameterize(self, call, connection_init_kwargs):
         """
@@ -53,4 +111,17 @@ class Executor(invoke.Executor):
         :returns:
             `.ConnectionCall`.
         """
-        pass
+        msg = "Parameterizing {!r} with Connection kwargs {!r}"
+        debug(msg.format(call, connection_init_kwargs))
+        # Generate a custom ConnectionCall that has init_kwargs (used for
+        # creating the Connection at runtime) set to the requested params.
+        new_call_kwargs = dict(init_kwargs=connection_init_kwargs)
+        clone = call.clone(into=ConnectionCall, with_=new_call_kwargs)
+        return clone
+
+    def dedupe(self, tasks):
+        # Don't perform deduping, we will often have "duplicate" tasks w/
+        # distinct host values/etc.
+        # TODO: might want some deduplication later on though - falls under
+        # "how to mesh parameterization with pre/post/etc deduping".
+        return tasks
diff --git a/fabric/group.py b/fabric/group.py
index f30a24cd..506f2353 100644
--- a/fabric/group.py
+++ b/fabric/group.py
@@ -1,5 +1,7 @@
 from queue import Queue
+
 from invoke.util import ExceptionHandlingThread
+
 from .connection import Connection
 from .exceptions import GroupException

@@ -78,6 +80,7 @@ class Group(list):
         .. versionchanged:: 2.3
             Added ``**kwargs`` (was previously only ``*hosts``).
         """
+        # TODO: #563, #388 (could be here or higher up in Program area)
         self.extend([Connection(host, **kwargs) for host in hosts])

     @classmethod
@@ -87,7 +90,16 @@ class Group(list):

         .. versionadded:: 2.0
         """
-        pass
+        # TODO: *args here too; or maybe just fold into __init__ and type
+        # check?
+        group = cls()
+        group.extend(connections)
+        return group
+
+    def _do(self, method, *args, **kwargs):
+        # TODO: rename this something public & commit to an API for user
+        # subclasses
+        raise NotImplementedError

     def run(self, *args, **kwargs):
         """
@@ -97,7 +109,11 @@ class Group(list):

         .. versionadded:: 2.0
         """
-        pass
+        # TODO: how to change method of execution across contents? subclass,
+        # kwargs, additional methods, inject an executor? Doing subclass for
+        # now, but not 100% sure it's the best route.
+        # TODO: also need way to deal with duplicate connections (see THOUGHTS)
+        return self._do("run", *args, **kwargs)

     def sudo(self, *args, **kwargs):
         """
@@ -107,7 +123,18 @@ class Group(list):

         .. versionadded:: 2.6
         """
-        pass
+        # TODO: see run() TODOs
+        return self._do("sudo", *args, **kwargs)
+
+    # TODO: this all needs to mesh well with similar strategies applied to
+    # entire tasks - so that may still end up factored out into Executors or
+    # something lower level than both those and these?
+
+    # TODO: local? Invoke wants ability to do that on its own though, which
+    # would be distinct from Group. (May want to switch Group to use that,
+    # though, whatever it ends up being? Eg many cases where you do want to do
+    # some local thing either N times identically, or parameterized by remote
+    # cxn values)

     def put(self, *args, **kwargs):
         """
@@ -123,7 +150,7 @@ class Group(list):

         .. versionadded:: 2.6
         """
-        pass
+        return self._do("put", *args, **kwargs)

     def get(self, *args, **kwargs):
         """
@@ -150,7 +177,13 @@ class Group(list):

         .. versionadded:: 2.6
         """
-        pass
+        # TODO 4.0: consider making many of these into kwarg-only methods? then
+        # below could become kwargs.setdefault() if desired.
+        # TODO: do we care enough to handle explicitly given, yet falsey,
+        # values? it's a lot more complexity for a corner case.
+        if len(args) < 2 and "local" not in kwargs:
+            kwargs["local"] = "{host}/"
+        return self._do("get", *args, **kwargs)

     def close(self):
         """
@@ -158,7 +191,8 @@ class Group(list):

         .. versionadded:: 2.4
         """
-        pass
+        for cxn in self:
+            cxn.close()

     def __enter__(self):
         return self
@@ -174,6 +208,25 @@ class SerialGroup(Group):
     .. versionadded:: 2.0
     """

+    def _do(self, method, *args, **kwargs):
+        results = GroupResult()
+        excepted = False
+        for cxn in self:
+            try:
+                results[cxn] = getattr(cxn, method)(*args, **kwargs)
+            except Exception as e:
+                results[cxn] = e
+                excepted = True
+        if excepted:
+            raise GroupException(results)
+        return results
+
+
+def thread_worker(cxn, queue, method, args, kwargs):
+    result = getattr(cxn, method)(*args, **kwargs)
+    # TODO: namedtuple or attrs object?
+    queue.put((cxn, result))
+

 class ThreadingGroup(Group):
     """
@@ -182,6 +235,53 @@ class ThreadingGroup(Group):
     .. versionadded:: 2.0
     """

+    def _do(self, method, *args, **kwargs):
+        results = GroupResult()
+        queue = Queue()
+        threads = []
+        for cxn in self:
+            thread = ExceptionHandlingThread(
+                target=thread_worker,
+                kwargs=dict(
+                    cxn=cxn,
+                    queue=queue,
+                    method=method,
+                    args=args,
+                    kwargs=kwargs,
+                ),
+            )
+            threads.append(thread)
+        for thread in threads:
+            thread.start()
+        for thread in threads:
+            # TODO: configurable join timeout
+            thread.join()
+        # Get non-exception results from queue
+        while not queue.empty():
+            # TODO: io-sleep? shouldn't matter if all threads are now joined
+            cxn, result = queue.get(block=False)
+            # TODO: outstanding musings about how exactly aggregate results
+            # ought to ideally operate...heterogenous obj like this, multiple
+            # objs, ??
+            results[cxn] = result
+        # Get exceptions from the threads themselves.
+        # TODO: in a non-thread setup, this would differ, e.g.:
+        # - a queue if using multiprocessing
+        # - some other state-passing mechanism if using e.g. coroutines
+        # - ???
+        excepted = False
+        for thread in threads:
+            wrapper = thread.exception()
+            if wrapper is not None:
+                # Outer kwargs is Thread instantiation kwargs, inner is kwargs
+                # passed to thread target/body.
+                cxn = wrapper.kwargs["kwargs"]["cxn"]
+                results[cxn] = wrapper.value
+                excepted = True
+        if excepted:
+            raise GroupException(results)
+        return results
+

 class GroupResult(dict):
     """
@@ -209,6 +309,18 @@ class GroupResult(dict):
         self._successes = {}
         self._failures = {}

+    def _bifurcate(self):
+        # Short-circuit to avoid reprocessing every access.
+        if self._successes or self._failures:
+            return
+        # TODO: if we ever expect .succeeded/.failed to be useful before a
+        # GroupResult is fully initialized, this needs to become smarter.
+        for key, value in self.items():
+            if isinstance(value, BaseException):
+                self._failures[key] = value
+            else:
+                self._successes[key] = value
+
     @property
     def succeeded(self):
         """
@@ -216,7 +328,8 @@ class GroupResult(dict):

         .. versionadded:: 2.0
         """
-        pass
+        self._bifurcate()
+        return self._successes

     @property
     def failed(self):
@@ -225,4 +338,5 @@ class GroupResult(dict):

         .. versionadded:: 2.0
         """
-        pass
+        self._bifurcate()
+        return self._failures
diff --git a/fabric/main.py b/fabric/main.py
index 18c45ce4..ce0f66e4 100644
--- a/fabric/main.py
+++ b/fabric/main.py
@@ -3,17 +3,191 @@ CLI entrypoint & parser configuration.

 Builds on top of Invoke's core functionality for same.
 """
+
 import getpass
 from pathlib import Path
+
 from invoke import Argument, Collection, Exit, Program
 from invoke import __version__ as invoke
 from paramiko import __version__ as paramiko, Agent
+
 from . import __version__ as fabric
 from . import Config, Executor


 class Fab(Program):
-    pass
+    def print_version(self):
+        super().print_version()
+        print("Paramiko {}".format(paramiko))
+        print("Invoke {}".format(invoke))
+
+    def core_args(self):
+        core_args = super().core_args()
+        my_args = [
+            Argument(
+                names=("H", "hosts"),
+                help="Comma-separated host name(s) to execute tasks against.",
+            ),
+            Argument(
+                names=("i", "identity"),
+                kind=list,  # Same as OpenSSH, can give >1 key
+                # TODO: automatically add hint about iterable-ness to Invoke
+                # help display machinery?
+                help="Path to runtime SSH identity (key) file. May be given multiple times.",  # noqa
+            ),
+            Argument(
+                names=("list-agent-keys",),
+                kind=bool,
+                help="Display ssh-agent key list, and exit.",
+            ),
+            # TODO: worth having short flags for these prompt args?
+            Argument(
+                names=("prompt-for-login-password",),
+                kind=bool,
+                help="Request an upfront SSH-auth password prompt.",
+            ),
+            Argument(
+                names=("prompt-for-passphrase",),
+                kind=bool,
+                help="Request an upfront SSH key passphrase prompt.",
+            ),
+            Argument(
+                names=("S", "ssh-config"),
+                help="Path to runtime SSH config file.",
+            ),
+            Argument(
+                names=("t", "connect-timeout"),
+                kind=int,
+                help="Specifies default connection timeout, in seconds.",
+            ),
+        ]
+        return core_args + my_args
+
+    @property
+    def _remainder_only(self):
+        # No 'unparsed' (i.e. tokens intended for task contexts), and remainder
+        # (text after a double-dash) implies a contextless/taskless remainder
+        # execution of the style 'fab -H host -- command'.
+        # NOTE: must ALSO check to ensure the double dash isn't being used for
+        # tab completion machinery...
+        return (
+            not self.core.unparsed
+            and self.core.remainder
+            and not self.args.complete.value
+        )
+
+    def load_collection(self):
+        # Stick in a dummy Collection if it looks like we were invoked w/o any
+        # tasks, and with a remainder.
+        # This isn't super ideal, but Invoke proper has no obvious "just run my
+        # remainder" use case, so having it be capable of running w/o any task
+        # module, makes no sense. But we want that capability for testing &
+        # things like 'fab -H x,y,z -- mycommand'.
+        if self._remainder_only:
+            # TODO: hm we're probably not honoring project-specific configs in
+            # this branch; is it worth having it assume CWD==project, since
+            # that's often what users expect? Even tho no task collection to
+            # honor the real "lives by task coll"?
+            self.collection = Collection()
+        else:
+            super().load_collection()
+
+    def no_tasks_given(self):
+        # As above, neuter the usual "hey you didn't give me any tasks, let me
+        # print help for you" behavior, if necessary.
+        if not self._remainder_only:
+            super().no_tasks_given()
+
+    def create_config(self):
+        # Create config, as parent does, but with lazy=True to avoid our own
+        # SSH config autoload. (Otherwise, we can't correctly load _just_ the
+        # runtime file if one's being given later.)
+        self.config = self.config_class(lazy=True)
+        # However, we don't really want the parent class' lazy behavior (which
+        # skips loading system/global invoke-type conf files) so we manually do
+        # that here to match upstream behavior.
+        self.config.load_base_conf_files()
+        # And merge again so that data is available.
+        # TODO: really need to either A) stop giving fucks about calling
+        # merge() "too many times", or B) make merge() itself determine whether
+        # it needs to run and/or just merge stuff that's changed, so log spam
+        # isn't as bad.
+        self.config.merge()
+
+    def update_config(self):
+        # Note runtime SSH path, if given, and load SSH configurations.
+        # NOTE: must do parent before our work, in case users want to disable
+        # SSH config loading within a runtime-level conf file/flag.
+        super().update_config(merge=False)
+        self.config.set_runtime_ssh_path(self.args["ssh-config"].value)
+        self.config.load_ssh_config()
+        # Load -i identity file, if given, into connect_kwargs, at overrides
+        # level.
+        connect_kwargs = {}
+        paths = self.args["identity"].value
+        if paths:
+            connect_kwargs["key_filename"] = paths
+            # New, non-sshclient based config location
+            # Also new: Path! (which we couldn't use above until paramiko knew
+            # about it)
+            self.config._overrides["authentication"] = dict(
+                identities=[Path(x) for x in paths]
+            )
+        # Ditto for connect timeout
+        timeout = self.args["connect-timeout"].value
+        if timeout:
+            connect_kwargs["timeout"] = timeout
+        # Secrets prompts that want to happen at handoff time instead of
+        # later/at user-time.
+        # TODO: should this become part of Invoke proper in case other
+        # downstreams have need of it? E.g. a prompt Argument 'type'? We're
+        # already doing a similar thing there for sudo password...
+        if self.args["prompt-for-login-password"].value:
+            prompt = "Enter login password for use with SSH auth: "
+            connect_kwargs["password"] = getpass.getpass(prompt)
+        if self.args["prompt-for-passphrase"].value:
+            prompt = "Enter passphrase for use unlocking SSH keys: "
+            connect_kwargs["passphrase"] = getpass.getpass(prompt)
+        # TODO: this (directly manipulating _overrides) feels a little gross,
+        # but since the parent has already called load_overrides, this is best
+        # we can do for now w/o losing data. Still feels correct; just might be
+        # cleaner to have even more Config API members around this sort of
+        # thing. Shrug.
+        self.config._overrides["connect_kwargs"] = connect_kwargs
+        # Since we gave merge=False above, we must do it ourselves here. (Also
+        # allows us to 'compile' our overrides manipulation.)
+        self.config.merge()
+
+    # TODO: make this an explicit hookpoint in Invoke, i.e. some default-noop
+    # method called at the end of parse_core() that we can override here
+    # instead of doing this.
+    def parse_core(self, *args, **kwargs):
+        super().parse_core(*args, **kwargs)
+        if self.args["list-agent-keys"].value:
+            keys = Agent().get_keys()
+            for key in keys:
+                tpl = "{} {} {} ({})"
+                # TODO: _could_ use new PKey.__repr__ but I like the mimicry of
+                # OpenSSH ssh-add -l for now...
+                print(
+                    tpl.format(
+                        key.get_bits(),
+                        key.fingerprint,
+                        key.comment,
+                        key.algorithm_name,
+                    )
+                )
+            raise Exit
+
+
+# Mostly a concession to testing.
+def make_program():
+    return Fab(
+        name="Fabric",
+        version=fabric,
+        executor_class=Executor,
+        config_class=Config,
+    )


 program = make_program()
diff --git a/fabric/runners.py b/fabric/runners.py
index 53763d27..587e5f79 100644
--- a/fabric/runners.py
+++ b/fabric/runners.py
@@ -1,8 +1,16 @@
 import signal
 import threading
+
 from invoke import Runner, pty_size, Result as InvokeResult


+def cares_about_SIGWINCH():
+    return (
+        hasattr(signal, "SIGWINCH")
+        and threading.current_thread() is threading.main_thread()
+    )
+
+
 class Remote(Runner):
     """
     Run a shell command over an SSH connection.
@@ -34,20 +42,131 @@ class Remote(Runner):
             Changed the default value of ``inline_env`` from ``False`` to
             ``True``.
         """
-        self.inline_env = kwargs.pop('inline_env', None)
+        self.inline_env = kwargs.pop("inline_env", None)
         super().__init__(*args, **kwargs)

+    def start(self, command, shell, env, timeout=None):
+        self.channel = self.context.create_session()
+        if self.using_pty:
+            # Set initial size to match local size
+            cols, rows = pty_size()
+            self.channel.get_pty(width=cols, height=rows)
+            # If platform supports, also respond to SIGWINCH (window change) by
+            # sending the sshd a window-change message to update
+            if cares_about_SIGWINCH():
+                signal.signal(signal.SIGWINCH, self.handle_window_change)
+        if env:
+            # TODO: honor SendEnv from ssh_config (but if we do, _should_ we
+            # honor it even when prefixing? That would depart from OpenSSH
+            # somewhat (albeit as a "what we can do that it cannot" feature...)
+            if self.inline_env:
+                # TODO: escaping, if we can find a FOOLPROOF THIRD PARTY METHOD
+                # for doing so!
+                # TODO: switch to using a higher-level generic command
+                # prefixing functionality, when implemented.
+                parameters = " ".join(
+                    ["{}={}".format(k, v) for k, v in sorted(env.items())]
+                )
+                # NOTE: we can assume 'export' and '&&' relatively safely, as
+                # sshd always brings some shell into play, even if it's just
+                # /bin/sh.
+                command = "export {} && {}".format(parameters, command)
+            else:
+                self.channel.update_environment(env)
+        self.send_start_message(command)
+
+    def send_start_message(self, command):
+        self.channel.exec_command(command)
+
+    def run(self, command, **kwargs):
+        kwargs.setdefault("replace_env", True)
+        return super().run(command, **kwargs)
+
+    def read_proc_stdout(self, num_bytes):
+        return self.channel.recv(num_bytes)
+
+    def read_proc_stderr(self, num_bytes):
+        return self.channel.recv_stderr(num_bytes)
+
+    def _write_proc_stdin(self, data):
+        return self.channel.sendall(data)
+
+    def close_proc_stdin(self):
+        return self.channel.shutdown_write()
+
+    @property
+    def process_is_finished(self):
+        return self.channel.exit_status_ready()
+
+    def send_interrupt(self, interrupt):
+        # NOTE: in v1, we just reraised the KeyboardInterrupt unless a PTY was
+        # present; this seems to have been because without a PTY, the
+        # below escape sequence is ignored, so all we can do is immediately
+        # terminate on our end.
+        # NOTE: also in v1, the raising of the KeyboardInterrupt completely
+        # skipped all thread joining & cleanup; presumably regular interpreter
+        # shutdown suffices to tie everything off well enough.
+        if self.using_pty:
+            # Submit hex ASCII character 3, aka ETX, which most Unix PTYs
+            # interpret as a foreground SIGINT.
+            # TODO: is there anything else we can do here to be more portable?
+            self.channel.send("\x03")
+        else:
+            raise interrupt
+
+    def returncode(self):
+        return self.channel.recv_exit_status()
+
+    def generate_result(self, **kwargs):
+        kwargs["connection"] = self.context
+        return Result(**kwargs)
+
+    def stop(self):
+        super().stop()
+        if hasattr(self, "channel"):
+            self.channel.close()
+        if cares_about_SIGWINCH():
+            signal.signal(signal.SIGWINCH, signal.SIG_DFL)
+
+    def kill(self):
+        # Just close the channel immediately, which is about as close as we can
+        # get to a local SIGKILL unfortunately.
+        # TODO: consider _also_ calling .send_interrupt() and only doing this
+        # after another few seconds; but A) kinda fragile/complex and B) would
+        # belong in invoke.Runner anyways?
+        self.channel.close()
+
     def handle_window_change(self, signum, frame):
         """
         Respond to a `signal.SIGWINCH` (as a standard signal handler).

         Sends a window resize command via Paramiko channel method.
         """
-        pass
+        self.channel.resize_pty(*pty_size())
+
+    # TODO: shit that is in fab 1 run() but could apply to invoke.Local too:
+    # * see rest of stuff in _run_command/_execute in operations.py...there is
+    # a bunch that applies generally like optional exit codes, etc
+
+    # TODO: general shit not done yet
+    # * stdin; Local relies on local process management to ensure stdin is
+    # hooked up; we cannot do that.
+    # * output prefixing
+    # * agent forwarding
+    # * reading at 4096 bytes/time instead of whatever inv defaults to (also,
+    # document why we are doing that, iirc it changed recentlyish via ticket)
+    # * TODO: oh god so much more, go look it up
+
+    # TODO: shit that has no Local equivalent that we probs need to backfill
+    # into Runner, probably just as a "finish()" or "stop()" (to mirror
+    # start()):
+    # * channel close()
+    # * agent-forward close()


 class RemoteShell(Remote):
-    pass
+    def send_start_message(self, command):
+        self.channel.invoke_shell()


 class Result(InvokeResult):
@@ -62,6 +181,9 @@ class Result(InvokeResult):
     """

     def __init__(self, **kwargs):
-        connection = kwargs.pop('connection')
+        connection = kwargs.pop("connection")
         super().__init__(**kwargs)
         self.connection = connection
+
+    # TODO: have useful str/repr differentiation from invoke.Result,
+    # transfer.Result etc.
diff --git a/fabric/tasks.py b/fabric/tasks.py
index 093c8e74..acf89d0c 100644
--- a/fabric/tasks.py
+++ b/fabric/tasks.py
@@ -1,4 +1,5 @@
 import invoke
+
 from .connection import Connection


@@ -14,7 +15,9 @@ class Task(invoke.Task):
     """

     def __init__(self, *args, **kwargs):
-        self.hosts = kwargs.pop('hosts', None)
+        # Pull out our own kwargs before hitting super, which will TypeError on
+        # anything it doesn't know about.
+        self.hosts = kwargs.pop("hosts", None)
         super().__init__(*args, **kwargs)


@@ -62,7 +65,10 @@ def task(*args, **kwargs):

     .. versionadded:: 2.1
     """
-    pass
+    # Override klass to be our own Task, not Invoke's, unless somebody gave it
+    # explicitly.
+    kwargs.setdefault("klass", Task)
+    return invoke.task(*args, **kwargs)


 class ConnectionCall(invoke.Call):
@@ -81,12 +87,30 @@ class ConnectionCall(invoke.Call):
             Keyword arguments used to create a new `.Connection` when the
             wrapped task is executed. Default: ``None``.
         """
-        init_kwargs = kwargs.pop('init_kwargs')
+        init_kwargs = kwargs.pop("init_kwargs")  # , None)
         super().__init__(*args, **kwargs)
         self.init_kwargs = init_kwargs

+    def clone_kwargs(self):
+        # Extend superclass clone_kwargs to work in init_kwargs.
+        # TODO: this pattern comes up a lot; is there a better way to handle it
+        # without getting too crazy on the metaprogramming/over-engineering?
+        # Maybe something attrs library can help with (re: declaring "These are
+        # my bag-of-attributes attributes I want common stuff done to/with")
+        kwargs = super().clone_kwargs()
+        kwargs["init_kwargs"] = self.init_kwargs
+        return kwargs
+
+    def make_context(self, config):
+        kwargs = self.init_kwargs
+        # TODO: what about corner case of a decorator giving config in a hosts
+        # kwarg member?! For now let's stomp on it, and then if somebody runs
+        # into it, we can identify the use case & decide how best to deal.
+        kwargs["config"] = config
+        return Connection(**kwargs)
+
     def __repr__(self):
         ret = super().__repr__()
         if self.init_kwargs:
-            ret = ret[:-1] + ", host='{}'>".format(self.init_kwargs['host'])
+            ret = ret[:-1] + ", host='{}'>".format(self.init_kwargs["host"])
         return ret
diff --git a/fabric/testing/base.py b/fabric/testing/base.py
index c232e492..97a4b441 100644
--- a/fabric/testing/base.py
+++ b/fabric/testing/base.py
@@ -14,14 +14,20 @@ purposes: ``pip install fabric[testing]``.

 .. versionadded:: 2.1
 """
+
 import os
 from itertools import chain, repeat
 from io import BytesIO
 from unittest.mock import Mock, PropertyMock, call, patch, ANY
+
 from deprecated.sphinx import deprecated
 from deprecated.classic import deprecated as deprecated_no_docstring


+# TODO 4.0: reorganize harder (eg building blocks in one module, central
+# classes in another?)
+
+
 class Command:
     """
     Data record specifying params of a command execution to mock/expect.
@@ -44,7 +50,7 @@ class Command:
     .. versionadded:: 2.1
     """

-    def __init__(self, cmd=None, out=b'', err=b'', in_=None, exit=0, waits=0):
+    def __init__(self, cmd=None, out=b"", err=b"", in_=None, exit=0, waits=0):
         self.cmd = cmd
         self.out = out
         self.err = err
@@ -53,7 +59,9 @@ class Command:
         self.waits = waits

     def __repr__(self):
-        return '<{} cmd={!r}>'.format(self.__class__.__name__, self.cmd)
+        # TODO: just leverage attrs, maybe vendored into Invoke so we don't
+        # grow more dependencies? Ehhh
+        return "<{} cmd={!r}>".format(self.__class__.__name__, self.cmd)

     def expect_execution(self, channel):
         """
@@ -61,7 +69,7 @@ class Command:

         .. versionadded:: 2.7
         """
-        pass
+        channel.exec_command.assert_called_with(self.cmd or ANY)


 class ShellCommand(Command):
@@ -71,6 +79,9 @@ class ShellCommand(Command):
     .. versionadded:: 2.7
     """

+    def expect_execution(self, channel):
+        channel.invoke_shell.assert_called_once_with()
+

 class MockChannel(Mock):
     """
@@ -83,11 +94,27 @@ class MockChannel(Mock):
     """

     def __init__(self, *args, **kwargs):
-        object.__setattr__(self, '__stdout', kwargs.pop('stdout'))
-        object.__setattr__(self, '__stderr', kwargs.pop('stderr'))
-        object.__setattr__(self, '_stdin', BytesIO())
+        # TODO: worth accepting strings and doing the BytesIO setup ourselves?
+        # Stored privately to avoid any possible collisions ever. shrug.
+        object.__setattr__(self, "__stdout", kwargs.pop("stdout"))
+        object.__setattr__(self, "__stderr", kwargs.pop("stderr"))
+        # Stdin less private so it can be asserted about
+        object.__setattr__(self, "_stdin", BytesIO())
         super().__init__(*args, **kwargs)

+    def _get_child_mock(self, **kwargs):
+        # Don't return our own class on sub-mocks.
+        return Mock(**kwargs)
+
+    def recv(self, count):
+        return object.__getattribute__(self, "__stdout").read(count)
+
+    def recv_stderr(self, count):
+        return object.__getattribute__(self, "__stderr").read(count)
+
+    def sendall(self, data):
+        return object.__getattribute__(self, "_stdin").write(data)
+

 class Session:
     """
@@ -131,33 +158,51 @@ class Session:
         Added the ``enable_sftp`` and ``transfers`` parameters.
     """

-    def __init__(self, host=None, user=None, port=None, commands=None, cmd=
-        None, out=None, in_=None, err=None, exit=None, waits=None,
-        enable_sftp=False, transfers=None):
+    def __init__(
+        self,
+        host=None,
+        user=None,
+        port=None,
+        commands=None,
+        cmd=None,
+        out=None,
+        in_=None,
+        err=None,
+        exit=None,
+        waits=None,
+        enable_sftp=False,
+        transfers=None,
+    ):
+        # Safety check
         params = cmd or out or err or exit or waits
         if commands and params:
             raise ValueError(
-                "You can't give both 'commands' and individual Command parameters!"
-                )
+                "You can't give both 'commands' and individual "
+                "Command parameters!"
+            )  # noqa
+        # Early test for "did user actually request expectations?"
         self.guard_only = not (commands or cmd or transfers)
+        # Fill in values
         self.host = host
         self.user = user
         self.port = port
         self.commands = commands
         if params:
+            # Honestly dunno which is dumber, this or duplicating Command's
+            # default kwarg values in this method's signature...sigh
             kwargs = {}
             if cmd is not None:
-                kwargs['cmd'] = cmd
+                kwargs["cmd"] = cmd
             if out is not None:
-                kwargs['out'] = out
+                kwargs["out"] = out
             if err is not None:
-                kwargs['err'] = err
+                kwargs["err"] = err
             if in_ is not None:
-                kwargs['in_'] = in_
+                kwargs["in_"] = in_
             if exit is not None:
-                kwargs['exit'] = exit
+                kwargs["exit"] = exit
             if waits is not None:
-                kwargs['waits'] = waits
+                kwargs["waits"] = waits
             self.commands = [Command(**kwargs)]
         if not self.commands:
             self.commands = [Command()]
@@ -184,7 +229,121 @@ class Session:

         .. versionadded:: 2.1
         """
-        pass
+        client = Mock()
+        transport = client.get_transport.return_value  # another Mock
+
+        # NOTE: this originally did chain([False], repeat(True)) so that
+        # get_transport().active was False initially, then True. However,
+        # because we also have to consider when get_transport() comes back None
+        # (which it does initially), the case where we get back a non-None
+        # transport _and_ it's not active yet, isn't useful to test, and
+        # complicates text expectations. So we don't, for now.
+        actives = repeat(True)
+        # NOTE: setting PropertyMocks on a mock's type() is apparently
+        # How It Must Be Done, otherwise it sets the real attr value.
+        type(transport).active = PropertyMock(side_effect=actives)
+
+        channels = []
+        for command in self.commands:
+            # Mock of a Channel instance, not e.g. Channel-the-class.
+            # Specifically, one that can track individual state for recv*().
+            channel = MockChannel(
+                stdout=BytesIO(command.out), stderr=BytesIO(command.err)
+            )
+            channel.recv_exit_status.return_value = command.exit
+
+            # If requested, make exit_status_ready return False the first N
+            # times it is called in the wait() loop.
+            readies = chain(repeat(False, command.waits), repeat(True))
+            channel.exit_status_ready.side_effect = readies
+
+            channels.append(channel)
+
+        # Have our transport yield those channel mocks in order when
+        # open_session() is called.
+        transport.open_session.side_effect = channels
+
+        # SFTP, if enabled
+        if self._enable_sftp:
+            self._start_sftp(client)
+
+        self.client = client
+        self.channels = channels
+
+    def _start_sftp(self, client):
+        # Patch os module for local stat and similar
+        self.os_patcher = patch("fabric.transfer.os")
+        mock_os = self.os_patcher.start()
+        # Patch Path class inside transfer.py to prevent real fs touchery
+        self.path_patcher = patch("fabric.transfer.Path")
+        self.path_patcher.start()
+        self.sftp = sftp = client.open_sftp.return_value
+
+        # Handle common filepath massage actions; tests will assume these.
+        def fake_abspath(path):
+            # Run normpath to avoid tests not seeing abspath wrinkles (like
+            # trailing slash chomping)
+            return "/local/{}".format(os.path.normpath(path))
+
+        mock_os.path.abspath.side_effect = fake_abspath
+        sftp.getcwd.return_value = "/remote"
+        # Ensure stat st_mode is a real number; Python 3's stat.S_IMODE doesn't
+        # like just being handed a MagicMock?
+        fake_mode = 0o644  # arbitrary real-ish mode
+        sftp.stat.return_value.st_mode = fake_mode
+        mock_os.stat.return_value.st_mode = fake_mode
+        # Not super clear to me why the 'wraps' functionality in mock isn't
+        # working for this :( reinstate a bunch of os(.path) so it still works
+        mock_os.sep = os.sep
+        for name in ("basename", "split", "join", "normpath"):
+            getattr(mock_os.path, name).side_effect = getattr(os.path, name)
+
+    @deprecated_no_docstring(
+        version="3.2",
+        reason="This method has been renamed to `safety_check` & will be removed in 4.0",  # noqa
+    )
+    def sanity_check(self):
+        return self.safety_check()
+
+    def safety_check(self):
+        # Short-circuit if user didn't give any expectations; otherwise our
+        # assumptions below will be inaccurately violated and explode.
+        if self.guard_only:
+            return
+
+        # Per-session we expect a single transport get
+        transport = self.client.get_transport
+        transport.assert_called_once_with()
+        # And a single connect to our target host.
+        self.client.connect.assert_called_once_with(
+            username=self.user or ANY,
+            hostname=self.host or ANY,
+            port=self.port or ANY,
+        )
+
+        # Calls to open_session will be 1-per-command but are on transport, not
+        # channel, so we can only really inspect how many happened in
+        # aggregate. Save a list for later comparison to call_args.
+        session_opens = []
+
+        for channel, command in zip(self.channels, self.commands):
+            # Expect an open_session for each command exec
+            session_opens.append(call())
+            # Expect that the channel gets an exec_command or etc
+            command.expect_execution(channel=channel)
+            # Expect written stdin, if given
+            if command.in_:
+                assert channel._stdin.getvalue() == command.in_
+
+        # Make sure open_session was called expected number of times.
+        calls = transport.return_value.open_session.call_args_list
+        assert calls == session_opens
+
+        # SFTP transfers
+        for transfer in self.transfers or []:
+            method_name = transfer.pop("method")
+            method = getattr(self.sftp, method_name)
+            method.assert_any_call(**transfer)

     def stop(self):
         """
@@ -192,7 +351,10 @@ class Session:

         .. versionadded:: 3.2
         """
-        pass
+        if hasattr(self, "os_patcher"):
+            self.os_patcher.stop()
+        if hasattr(self, "path_patcher"):
+            self.path_patcher.stop()


 class MockRemote:
@@ -222,6 +384,7 @@ class MockRemote:
         remember to call `safety`/`stop`.
     """

+    # TODO 4.0: delete enable_sftp and make its behavior default
     def __init__(self, enable_sftp=False):
         self._enable_sftp = enable_sftp
         self.expect_sessions(Session(enable_sftp=enable_sftp))
@@ -234,7 +397,8 @@ class MockRemote:

         .. versionadded:: 2.1
         """
-        pass
+        kwargs.setdefault("enable_sftp", self._enable_sftp)
+        return self.expect_sessions(Session(*args, **kwargs))[0]

     def expect_sessions(self, *sessions):
         """
@@ -244,15 +408,39 @@ class MockRemote:

         .. versionadded:: 2.1
         """
-        pass
-
+        # First, stop the default session to clean up its state, if it seems to
+        # be running.
+        self.stop()
+        # Update sessions list with new session(s)
+        self.sessions = sessions
+        # And start patching again, returning mocked channels
+        return self.start()
+
+    # TODO 4.0: definitely clean this up once the SFTP bit isn't opt-in, doing
+    # that backwards compatibly was real gross
     def start(self):
         """
         Start patching SSHClient with the stored sessions, returning channels.

         .. versionadded:: 2.1
         """
-        pass
+        # Patch SSHClient so the sessions' generated mocks can be set as its
+        # return values
+        self.patcher = patcher = patch("fabric.connection.SSHClient")
+        SSHClient = patcher.start()
+        # Mock clients, to be inspected afterwards during safety-checks
+        clients = []
+        for session in self.sessions:
+            session.generate_mocks()
+            clients.append(session.client)
+        # Each time the mocked SSHClient class is instantiated, it will
+        # yield one of our mocked clients (w/ mocked transport & channel, and
+        # optionally SFTP subclient) generated above.
+        SSHClient.side_effect = clients
+        sessions = list(chain.from_iterable(x.channels for x in self.sessions))
+        # TODO: in future we _may_ want to change this so it returns SFTP file
+        # data as well?
+        return sessions

     def stop(self):
         """
@@ -260,17 +448,26 @@ class MockRemote:

         .. versionadded:: 2.1
         """
-        pass
-
-    @deprecated(version='3.2', reason=
-        'This method has been renamed to `safety` & will be removed in 4.0')
+        # Short circuit if we don't seem to have start()ed yet.
+        if not hasattr(self, "patcher"):
+            return
+        # Stop patching SSHClient
+        self.patcher.stop()
+        # Also ask all sessions to stop any of their self-owned mocks
+        for session in self.sessions:
+            session.stop()
+
+    @deprecated(
+        version="3.2",
+        reason="This method has been renamed to `safety` & will be removed in 4.0",  # noqa
+    )
     def sanity(self):
         """
         Run post-execution sanity checks (usually 'was X called' tests.)

         .. versionadded:: 2.1
         """
-        pass
+        return self.safety()

     def safety(self):
         """
@@ -278,7 +475,8 @@ class MockRemote:

         .. versionadded:: 3.2
         """
-        pass
+        for session in self.sessions:
+            session.safety_check()

     def __enter__(self):
         return self
@@ -290,9 +488,10 @@ class MockRemote:
             self.stop()


-@deprecated(version='3.2', reason=
-    'This class has been merged with `MockRemote` which can now handle SFTP mocking too. Please switch to it!'
-    )
+@deprecated(
+    version="3.2",
+    reason="This class has been merged with `MockRemote` which can now handle SFTP mocking too. Please switch to it!",  # noqa
+)
 class MockSFTP:
     """
     Class managing mocked SFTP remote state.
@@ -306,3 +505,39 @@ class MockSFTP:
     def __init__(self, autostart=True):
         if autostart:
             self.start()
+
+    def start(self):
+        # Set up mocks
+        self.os_patcher = patch("fabric.transfer.os")
+        self.client_patcher = patch("fabric.connection.SSHClient")
+        self.path_patcher = patch("fabric.transfer.Path")
+        mock_os = self.os_patcher.start()
+        Client = self.client_patcher.start()
+        self.path_patcher.start()
+        sftp = Client.return_value.open_sftp.return_value
+
+        # Handle common filepath massage actions; tests will assume these.
+        def fake_abspath(path):
+            # Run normpath to avoid tests not seeing abspath wrinkles (like
+            # trailing slash chomping)
+            return "/local/{}".format(os.path.normpath(path))
+
+        mock_os.path.abspath.side_effect = fake_abspath
+        sftp.getcwd.return_value = "/remote"
+        # Ensure stat st_mode is a real number; Python 3's stat.S_IMODE doesn't
+        # like just being handed a MagicMock?
+        fake_mode = 0o644  # arbitrary real-ish mode
+        sftp.stat.return_value.st_mode = fake_mode
+        mock_os.stat.return_value.st_mode = fake_mode
+        # Not super clear to me why the 'wraps' functionality in mock isn't
+        # working for this :( reinstate a bunch of os(.path) so it still works
+        mock_os.sep = os.sep
+        for name in ("basename", "split", "join", "normpath"):
+            getattr(mock_os.path, name).side_effect = getattr(os.path, name)
+        # Return the sftp and OS mocks for use by decorator use case.
+        return sftp, mock_os
+
+    def stop(self):
+        self.os_patcher.stop()
+        self.client_patcher.stop()
+        self.path_patcher.stop()
diff --git a/fabric/testing/fixtures.py b/fabric/testing/fixtures.py
index 15717185..29c1b243 100644
--- a/fabric/testing/fixtures.py
+++ b/fabric/testing/fixtures.py
@@ -14,18 +14,24 @@ For example, if you intend to use the `remote` and `client` fixtures::

 .. versionadded:: 2.1
 """
+
 from unittest.mock import patch, Mock
+
 try:
     from pytest import fixture
 except ImportError:
     import warnings
+
     warning = (
-        "You appear to be missing some optional test-related dependencies;please 'pip install fabric[pytest]'."
-        )
+        "You appear to be missing some optional test-related dependencies;"
+        "please 'pip install fabric[pytest]'."
+    )
     warnings.warn(warning, ImportWarning)
     raise
+
 from .. import Connection
 from ..transfer import Transfer
+
 from .base import MockRemote, MockSFTP


@@ -45,12 +51,24 @@ def connection():

     .. versionadded:: 2.1
     """
-    pass
-
-
+    c = Connection(host="host", user="user")
+    c.config.run.in_stream = False
+    c.run = Mock()
+    c.local = Mock()
+    # TODO: rest of API should get mocked too
+    # TODO: is there a nice way to mesh with MockRemote et al? Is that ever
+    # really that useful for code that just wants to assert about how run() and
+    # friends were called?
+    yield c
+
+
+#: A convenience rebinding of `connection`.
+#:
+#: .. versionadded:: 2.1
 cxn = connection


+# TODO 4.0: remove old remote() and make this the new remote()
 @fixture
 def remote_with_sftp():
     """
@@ -60,7 +78,11 @@ def remote_with_sftp():
     functionality was called), note that the returned `MockRemote` object has a
     ``.sftp`` attribute when created in this mode.
     """
-    pass
+    # NOTE: recall that by default an instantiated MockRemote has a single
+    # internal anonymous session; so these fixtures are useful for autouse
+    # guardrails.
+    with MockRemote(enable_sftp=True) as remote:
+        yield remote


 @fixture
@@ -75,7 +97,10 @@ def remote():

     .. versionadded:: 2.1
     """
-    pass
+    remote = MockRemote()
+    yield remote
+    remote.safety()
+    remote.stop()


 @fixture
@@ -91,7 +116,12 @@ def sftp():

     .. versionadded:: 2.1
     """
-    pass
+    mock = MockSFTP(autostart=False)
+    client, mock_os = mock.start()
+    # Regular ol transfer to save some time
+    transfer = Transfer(Connection("host"))
+    yield transfer, client, mock_os
+    # TODO: old mock_sftp() lacked any 'stop'...why? feels bad man


 @fixture
@@ -101,7 +131,7 @@ def sftp_objs(sftp):

     .. versionadded:: 2.1
     """
-    pass
+    yield sftp[:2]


 @fixture
@@ -111,7 +141,7 @@ def transfer(sftp):

     .. versionadded:: 2.1
     """
-    pass
+    yield sftp[0]


 @fixture
@@ -154,4 +184,7 @@ def client():

     .. versionadded:: 2.1
     """
-    pass
+    with patch("fabric.connection.SSHClient") as SSHClient:
+        client = SSHClient.return_value
+        client.get_transport.return_value = Mock(active=True)
+        yield client
diff --git a/fabric/transfer.py b/fabric/transfer.py
index a241aa09..337513e9 100644
--- a/fabric/transfer.py
+++ b/fabric/transfer.py
@@ -1,11 +1,21 @@
 """
 File transfer via SFTP and/or SCP.
 """
+
 import os
 import posixpath
 import stat
+
 from pathlib import Path
-from .util import debug
+
+from .util import debug  # TODO: actual logging! LOL
+
+# TODO: figure out best way to direct folks seeking rsync, to patchwork's rsync
+# call (which needs updating to use invoke.run() & fab 2 connection methods,
+# but is otherwise suitable).
+# UNLESS we want to try and shoehorn it into this module after all? Delegate
+# any recursive get/put to it? Requires users to have rsync available of
+# course.


 class Transfer:
@@ -15,9 +25,21 @@ class Transfer:
     .. versionadded:: 2.0
     """

+    # TODO: SFTP clear default, but how to do SCP? subclass? init kwarg?
+
     def __init__(self, connection):
         self.connection = connection

+    @property
+    def sftp(self):
+        return self.connection.sftp()
+
+    def is_remote_dir(self, path):
+        try:
+            return stat.S_ISDIR(self.sftp.stat(path).st_mode)
+        except IOError:
+            return False
+
     def get(self, remote, local=None, preserve_mode=True):
         """
         Copy a file from wrapped connection's host to the local filesystem.
@@ -87,7 +109,80 @@ class Transfer:
         .. versionchanged:: 2.6
             Create missing ``local`` directories automatically.
         """
-        pass
+        # TODO: how does this API change if we want to implement
+        # remote-to-remote file transfer? (Is that even realistic?)
+        # TODO: callback support
+        # TODO: how best to allow changing the behavior/semantics of
+        # remote/local (e.g. users might want 'safer' behavior that complains
+        # instead of overwriting existing files) - this likely ties into the
+        # "how to handle recursive/rsync" and "how to handle scp" questions
+
+        # Massage remote path
+        if not remote:
+            raise ValueError("Remote path must not be empty!")
+        orig_remote = remote
+        remote = posixpath.join(
+            self.sftp.getcwd() or self.sftp.normalize("."), remote
+        )
+
+        # Massage local path
+        orig_local = local
+        is_file_like = hasattr(local, "write") and callable(local.write)
+        remote_filename = posixpath.basename(remote)
+        if not local:
+            local = remote_filename
+        # Path-driven local downloads need interpolation, abspath'ing &
+        # directory creation
+        if not is_file_like:
+            local = local.format(
+                host=self.connection.host,
+                user=self.connection.user,
+                port=self.connection.port,
+                dirname=posixpath.dirname(remote),
+                basename=remote_filename,
+            )
+            # Must treat dir vs file paths differently, lest we erroneously
+            # mkdir what was intended as a filename, and so that non-empty
+            # dir-like paths still get remote filename tacked on.
+            if local.endswith(os.sep):
+                dir_path = local
+                local = os.path.join(local, remote_filename)
+            else:
+                dir_path, _ = os.path.split(local)
+            local = os.path.abspath(local)
+            Path(dir_path).mkdir(parents=True, exist_ok=True)
+            # TODO: reimplement mkdir (or otherwise write a testing function)
+            # allowing us to track what was created so we can revert if
+            # transfer fails.
+            # TODO: Alternately, transfer to temp location and then move, but
+            # that's basically inverse of v1's sudo-put which gets messy
+
+        # Run Paramiko-level .get() (side-effects only. womp.)
+        # TODO: push some of the path handling into Paramiko; it should be
+        # responsible for dealing with path cleaning etc.
+        # TODO: probably preserve warning message from v1 when overwriting
+        # existing files. Use logging for that obviously.
+        #
+        # If local appears to be a file-like object, use sftp.getfo, not get
+        if is_file_like:
+            self.sftp.getfo(remotepath=remote, fl=local)
+        else:
+            self.sftp.get(remotepath=remote, localpath=local)
+            # Set mode to same as remote end
+            # TODO: Push this down into SFTPClient sometime (requires backwards
+            # incompat release.)
+            if preserve_mode:
+                remote_mode = self.sftp.stat(remote).st_mode
+                mode = stat.S_IMODE(remote_mode)
+                os.chmod(local, mode)
+        # Return something useful
+        return Result(
+            orig_remote=orig_remote,
+            remote=remote,
+            orig_local=orig_local,
+            local=local,
+            connection=self.connection,
+        )

     def put(self, local, remote=None, preserve_mode=True):
         """
@@ -135,7 +230,97 @@ class Transfer:

         .. versionadded:: 2.0
         """
-        pass
+        if not local:
+            raise ValueError("Local path must not be empty!")
+
+        is_file_like = hasattr(local, "write") and callable(local.write)
+
+        # Massage remote path
+        orig_remote = remote
+        if is_file_like:
+            local_base = getattr(local, "name", None)
+        else:
+            local_base = os.path.basename(local)
+        if not remote:
+            if is_file_like:
+                raise ValueError(
+                    "Must give non-empty remote path when local is a file-like object!"  # noqa
+                )
+            else:
+                remote = local_base
+                debug("Massaged empty remote path into {!r}".format(remote))
+        elif self.is_remote_dir(remote):
+            # non-empty local_base implies a) text file path or b) FLO which
+            # had a non-empty .name attribute. huzzah!
+            if local_base:
+                remote = posixpath.join(remote, local_base)
+            else:
+                if is_file_like:
+                    raise ValueError(
+                        "Can't put a file-like-object into a directory unless it has a non-empty .name attribute!"  # noqa
+                    )
+                else:
+                    # TODO: can we ever really end up here? implies we want to
+                    # reorganize all this logic so it has fewer potential holes
+                    raise ValueError(
+                        "Somehow got an empty local file basename ({!r}) when uploading to a directory ({!r})!".format(  # noqa
+                            local_base, remote
+                        )
+                    )
+
+        prejoined_remote = remote
+        remote = posixpath.join(
+            self.sftp.getcwd() or self.sftp.normalize("."), remote
+        )
+        if remote != prejoined_remote:
+            msg = "Massaged relative remote path {!r} into {!r}"
+            debug(msg.format(prejoined_remote, remote))
+
+        # Massage local path
+        orig_local = local
+        if not is_file_like:
+            local = os.path.abspath(local)
+            if local != orig_local:
+                debug(
+                    "Massaged relative local path {!r} into {!r}".format(
+                        orig_local, local
+                    )
+                )  # noqa
+
+        # Run Paramiko-level .put() (side-effects only. womp.)
+        # TODO: push some of the path handling into Paramiko; it should be
+        # responsible for dealing with path cleaning etc.
+        # TODO: probably preserve warning message from v1 when overwriting
+        # existing files. Use logging for that obviously.
+        #
+        # If local appears to be a file-like object, use sftp.putfo, not put
+        if is_file_like:
+            msg = "Uploading file-like object {!r} to {!r}"
+            debug(msg.format(local, remote))
+            pointer = local.tell()
+            try:
+                local.seek(0)
+                self.sftp.putfo(fl=local, remotepath=remote)
+            finally:
+                local.seek(pointer)
+        else:
+            debug("Uploading {!r} to {!r}".format(local, remote))
+            self.sftp.put(localpath=local, remotepath=remote)
+            # Set mode to same as local end
+            # TODO: Push this down into SFTPClient sometime (requires backwards
+            # incompat release.)
+            if preserve_mode:
+                local_mode = os.stat(local).st_mode
+                mode = stat.S_IMODE(local_mode)
+                self.sftp.chmod(remote, mode)
+        # Return something useful
+        return Result(
+            orig_remote=orig_remote,
+            remote=remote,
+            orig_local=orig_local,
+            local=local,
+            connection=self.connection,
+        )


 class Result:
@@ -154,9 +339,26 @@ class Result:
     .. versionadded:: 2.0
     """

+    # TODO: how does this differ from put vs get? field stating which? (feels
+    # meh) distinct classes differing, for now, solely by name? (also meh)
     def __init__(self, local, orig_local, remote, orig_remote, connection):
+        #: The local path the file was saved as, or the object it was saved
+        #: into if a file-like object was given instead.
+        #:
+        #: If a string path, this value is massaged to be absolute; see
+        #: `.orig_local` for the original argument value.
         self.local = local
+        #: The original value given as the returning method's ``local``
+        #: argument.
         self.orig_local = orig_local
+        #: The remote path downloaded from. Massaged to be absolute; see
+        #: `.orig_remote` for the original argument value.
         self.remote = remote
+        #: The original argument value given as the returning method's
+        #: ``remote`` argument.
         self.orig_remote = orig_remote
+        #: The `.Connection` object this result was obtained from.
         self.connection = connection
+
+    # TODO: ensure str/repr makes it easily differentiable from run() or
+    # local() result objects (and vice versa).
diff --git a/fabric/tunnels.py b/fabric/tunnels.py
index c0a117ba..bec69b51 100644
--- a/fabric/tunnels.py
+++ b/fabric/tunnels.py
@@ -4,10 +4,12 @@ Tunnel and connection forwarding internals.
 If you're looking for simple, end-user-focused connection forwarding, please
 see `.Connection`, e.g. `.Connection.forward_local`.
 """
+
 import select
 import socket
 import time
 from threading import Event
+
 from invoke.exceptions import ThreadException
 from invoke.util import ExceptionHandlingThread

@@ -27,14 +29,82 @@ class TunnelManager(ExceptionHandlingThread):
     .. versionadded:: 2.0
     """

-    def __init__(self, local_host, local_port, remote_host, remote_port,
-        transport, finished):
+    def __init__(
+        self,
+        local_host,
+        local_port,
+        remote_host,
+        remote_port,
+        transport,
+        finished,
+    ):
         super().__init__()
-        self.local_address = local_host, local_port
-        self.remote_address = remote_host, remote_port
+        self.local_address = (local_host, local_port)
+        self.remote_address = (remote_host, remote_port)
         self.transport = transport
         self.finished = finished

+    def _run(self):
+        # Track each tunnel that gets opened during our lifetime
+        tunnels = []
+
+        # Set up OS-level listener socket on forwarded port
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        # TODO: why do we want REUSEADDR exactly? and is it portable?
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        # NOTE: choosing to deal with nonblocking semantics and a fast loop,
+        # versus an older approach which blocks & expects outer scope to cause
+        # a socket exception by close()ing the socket.
+        sock.setblocking(0)
+        sock.bind(self.local_address)
+        sock.listen(1)
+
+        while not self.finished.is_set():
+            # Main loop-wait: accept connections on the local listener
+            # NOTE: EAGAIN means "you're nonblocking and nobody happened to
+            # connect at this point in time"
+            try:
+                tun_sock, local_addr = sock.accept()
+                # Set TCP_NODELAY to match OpenSSH's forwarding socket behavior
+                tun_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+            except BlockingIOError:  # ie errno.EAGAIN
+                # TODO: make configurable
+                time.sleep(0.01)
+                continue
+
+            # Set up direct-tcpip channel on server end
+            # TODO: refactor w/ what's used for gateways
+            channel = self.transport.open_channel(
+                "direct-tcpip", self.remote_address, local_addr
+            )
+
+            # Set up 'worker' thread for this specific connection to our
+            # tunnel, plus its dedicated signal event (which will appear as a
+            # public attr, no need to track both independently).
+            finished = Event()
+            tunnel = Tunnel(channel=channel, sock=tun_sock, finished=finished)
+            tunnel.start()
+            tunnels.append(tunnel)
+
+        exceptions = []
+        # Propogate shutdown signal to all tunnels & wait for closure
+        # TODO: would be nice to have some output or at least logging here,
+        # especially for "sets up a handful of tunnels" use cases like
+        # forwarding nontrivial HTTP traffic.
+        for tunnel in tunnels:
+            tunnel.finished.set()
+            tunnel.join()
+            wrapper = tunnel.exception()
+            if wrapper:
+                exceptions.append(wrapper)
+        # Handle exceptions
+        if exceptions:
+            raise ThreadException(exceptions)
+
+        # All we have left to close is our own sock.
+        # TODO: use try/finally?
+        sock.close()
+

 class Tunnel(ExceptionHandlingThread):
     """
@@ -51,6 +121,25 @@ class Tunnel(ExceptionHandlingThread):
         self.channel_chunk_size = 1024
         super().__init__()

+    def _run(self):
+        try:
+            empty_sock, empty_chan = None, None
+            while not self.finished.is_set():
+                r, w, x = select.select([self.sock, self.channel], [], [], 1)
+                if self.sock in r:
+                    empty_sock = self.read_and_write(
+                        self.sock, self.channel, self.socket_chunk_size
+                    )
+                if self.channel in r:
+                    empty_chan = self.read_and_write(
+                        self.channel, self.sock, self.channel_chunk_size
+                    )
+                if empty_sock or empty_chan:
+                    break
+        finally:
+            self.channel.close()
+            self.sock.close()
+
     def read_and_write(self, reader, writer, chunk_size):
         """
         Read ``chunk_size`` from ``reader``, writing result to ``writer``.
@@ -59,4 +148,7 @@ class Tunnel(ExceptionHandlingThread):

         .. versionadded:: 2.0
         """
-        pass
+        data = reader.recv(chunk_size)
+        if len(data) == 0:
+            return True
+        writer.sendall(data)
diff --git a/fabric/util.py b/fabric/util.py
index f39ab255..c47c422b 100644
--- a/fabric/util.py
+++ b/fabric/util.py
@@ -1,9 +1,16 @@
 import logging
 import sys
-log = logging.getLogger('fabric')
-for x in ('debug',):
+
+
+# Ape the half-assed logging junk from Invoke, but ensuring the logger reflects
+# our name, not theirs. (Assume most contexts will rely on Invoke itself to
+# literally enable/disable logging, for now.)
+log = logging.getLogger("fabric")
+for x in ("debug",):
     globals()[x] = getattr(log, x)
-win32 = sys.platform == 'win32'
+
+
+win32 = sys.platform == "win32"


 def get_local_user():
@@ -12,4 +19,27 @@ def get_local_user():

     .. versionadded:: 2.0
     """
-    pass
+    # TODO: I don't understand why these lines were added outside the
+    # try/except, since presumably it means the attempt at catching ImportError
+    # wouldn't work. However, that's how the contributing user committed it.
+    # Need an older Windows box to test it out, most likely.
+    import getpass
+
+    username = None
+    # All Unix and most Windows systems support the getpass module.
+    try:
+        username = getpass.getuser()
+    # Some SaaS platforms raise KeyError, implying there is no real user
+    # involved. They get the default value of None.
+    except KeyError:
+        pass
+    # Older (?) Windows systems don't support getpass well; they should
+    # have the `win32` module instead.
+    except ImportError:  # pragma: nocover
+        if win32:
+            import win32api
+            import win32security  # noqa
+            import win32profile  # noqa
+
+            username = win32api.GetUserName()
+    return username