Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1!10.16.0
1!10.16.1
55 changes: 24 additions & 31 deletions tests/unit_tests/test_cloud.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Tests related to pycloudlib.cloud module."""

from io import StringIO
import logging
from io import StringIO
from textwrap import dedent
from typing import List, Optional
from typing import List

import mock
import pytest

from pycloudlib.cloud import BaseCloud
Expand Down Expand Up @@ -67,32 +66,26 @@ def test_base_cloud_is_abstract(self):
("a", False, "a"),
),
)
@mock.patch(MPATH + "get_timestamped_tag", return_value="a-timestamp")
@mock.patch(MPATH + "getpass.getuser", return_value="crashoverride")
def test_init_sets_timestamped_tag(
self,
_m_getuser,
_m_get_timestamped_tag,
mocker,
tag,
timestamp_suffix,
expected_tag,
):
"""The timestamp_suffix param of true adds a tag timestamp suffix."""
mocker.patch(MPATH + "getpass.getuser", return_value="crashoverride")
mocker.patch(MPATH + "get_timestamped_tag", return_value="a-timestamp")
args = {"tag": tag}
if timestamp_suffix in (True, False):
args["timestamp_suffix"] = timestamp_suffix
mycloud = CloudSubclass(config_file=StringIO(CONFIG), **args)
assert expected_tag == mycloud.tag

@pytest.mark.dont_mock_ssh_keys
@mock.patch(MPATH + "getpass.getuser", return_value="root")
@mock.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
@mock.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_rsa" in x)
def test_init_sets_key_pair_based_on_getuser(
self,
_m_getuser,
_m_expanduser,
_m_exists,
mocker,
):
"""
The default key_pair for the cloud is based on the current user.
Expand All @@ -101,28 +94,28 @@ def test_init_sets_key_pair_based_on_getuser(
well known $HOME. Also its $HOME is not under /home, so this
verifies that we're not hardcoding /home/<user> paths.
"""
mocker.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_rsa" in x)
mocker.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
mocker.patch(MPATH + "getpass.getuser", return_value="root")
mycloud = CloudSubclass(tag="tag", timestamp_suffix=False, config_file=StringIO(CONFIG))
assert mycloud.key_pair.name == "root"
assert mycloud.key_pair.private_key_path == ("/root/.ssh/id_rsa")
assert mycloud.key_pair.public_key_path == ("/root/.ssh/id_rsa.pub")

@pytest.mark.dont_mock_ssh_keys
@mock.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
@mock.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_ed25519" in x)
@mock.patch(MPATH + "getpass.getuser", return_value="root")
def test_init_can_use_id_ed25519_key(
self,
_m_getuser,
_m_expanduser,
_m_exists,
mocker,
):
"""
Validates that key_pair uses the id_ed25519 key if id_rsa doesn't exist
"""Validates that key_pair uses the id_ed25519 key if id_rsa doesn't exist.

To do this, we mock the os.path.exists function to return True ONLY
for the id_ed25519 key to simulate the absence of the id_rsa key.

"""
mocker.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_ed25519" in x)
mocker.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
mocker.patch(MPATH + "getpass.getuser", return_value="root")
mycloud = CloudSubclass(
tag="tag",
timestamp_suffix=False,
Expand All @@ -136,10 +129,10 @@ def test_init_can_use_id_ed25519_key(
assert mycloud.key_pair.private_key_path == id_ed25519_path.replace(".pub", "")

@pytest.mark.dont_mock_ssh_keys
@mock.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
@mock.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_rsa" in x)
def test_init_sets_key_pair_from_config(self, _m_expanduser, _m_exists):
def test_init_sets_key_pair_from_config(self, mocker):
"""The key_pair is set from the config file."""
mocker.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_rsa" in x)
mocker.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
mycloud = CloudSubclass(
tag="tag",
timestamp_suffix=False,
Expand All @@ -160,10 +153,10 @@ def test_init_sets_key_pair_from_config(self, _m_expanduser, _m_exists):
assert mycloud.key_pair.private_key_path == "/home/asdf/.ssh/my_key"

@pytest.mark.dont_mock_ssh_keys
@mock.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
@mock.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_rsa" in x)
def test_missing_private_key_in_ssh_config(self, _m_expanduser, _m_exists):
def test_missing_private_key_in_ssh_config(self, mocker):
"""The key_pair assumes the private key name."""
mocker.patch("os.path.exists", side_effect=lambda x: "/.ssh/id_rsa" in x)
mocker.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
mycloud = CloudSubclass(
tag="tag",
timestamp_suffix=False,
Expand All @@ -183,12 +176,9 @@ def test_missing_private_key_in_ssh_config(self, _m_expanduser, _m_exists):
assert mycloud.key_pair.private_key_path == "/home/asdf/.ssh/id_rsa"

@pytest.mark.dont_mock_ssh_keys
@mock.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
@mock.patch("os.path.exists", return_value=False)
def test_init_raises_error_when_no_ssh_keys_found(
self,
_m_expanduser,
_m_exists,
mocker,
caplog,
):
"""
Expand All @@ -197,6 +187,8 @@ def test_init_raises_error_when_no_ssh_keys_found(
This test verifies that an error is raised when no SSH keys can be found in the default
locations and no public key path is provided in the config file.
"""
mocker.patch("os.path.exists", return_value=False)
mocker.patch("os.path.expanduser", side_effect=lambda x: x.replace("~", "/root"))
# set log level to Warning to ensure warning gets logged
caplog.set_level(logging.WARNING)
with pytest.raises(UnsetSSHKeyError) as exc_info:
Expand Down Expand Up @@ -228,6 +220,7 @@ def test_init_raises_error_when_no_ssh_keys_found(
],
)
def test_validate_tag(self, tag: str, rules_failed: List[str]):
"""Test that tag validation works correctly."""
if len(rules_failed) == 0:
# test that no exception is raised
BaseCloud._validate_tag(tag)
Expand Down