274 lines
9.1 KiB
Python
274 lines
9.1 KiB
Python
"""shadow multihost role."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import shlex
|
|
from typing import Dict, Tuple
|
|
|
|
from pytest_mh.conn import ProcessLogLevel, ProcessResult
|
|
|
|
from ..hosts.shadow import ShadowHost
|
|
from ..misc.errors import ExpectScriptError
|
|
from .base import BaseLinuxRole
|
|
|
|
__all__ = [
|
|
"Shadow",
|
|
]
|
|
|
|
DEFAULT_INTERACTIVE_TIMEOUT: int = 60
|
|
"""Default timeout for interactive sessions."""
|
|
|
|
|
|
class Shadow(BaseLinuxRole[ShadowHost]):
|
|
"""
|
|
shadow role.
|
|
|
|
Provides unified Python API for managing and testing shadow.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
"""
|
|
Set up the environment.
|
|
"""
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def teardown(self) -> None:
|
|
"""
|
|
Detect file mismatches before cleaning up the environment.
|
|
"""
|
|
self.host.detect_file_mismatches()
|
|
"""
|
|
Clean up the environment.
|
|
"""
|
|
super().teardown()
|
|
|
|
def _parse_args(self, *args) -> Dict[str, str]:
|
|
args_list = shlex.split(*args[0])
|
|
name = args_list[-1]
|
|
|
|
return {"name": name}
|
|
|
|
def useradd(self, *args) -> ProcessResult:
|
|
"""
|
|
Create user.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(f'Creating user "{args_dict["name"]}" on {self.host.hostname}')
|
|
cmd = self.host.conn.run("useradd " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/passwd")
|
|
self.host.discard_file("/etc/shadow")
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def usermod(self, *args) -> ProcessResult:
|
|
"""
|
|
Modify user.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(f'Modifying user "{args_dict["name"]}" on {self.host.hostname}')
|
|
cmd = self.host.conn.run("usermod " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/passwd")
|
|
self.host.discard_file("/etc/shadow")
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def userdel(self, *args) -> ProcessResult:
|
|
"""
|
|
Delete user.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(f'Deleting user "{args_dict["name"]}" on {self.host.hostname}')
|
|
cmd = self.host.conn.run("userdel " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/passwd")
|
|
self.host.discard_file("/etc/shadow")
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def groupadd(self, *args) -> ProcessResult:
|
|
"""
|
|
Create group.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(f'Creating group "{args_dict["name"]}" on {self.host.hostname}')
|
|
cmd = self.host.conn.run("groupadd " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def groupmod(self, *args) -> ProcessResult:
|
|
"""
|
|
Modify group.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(f'Modifying group "{args_dict["name"]}" on {self.host.hostname}')
|
|
cmd = self.host.conn.run("groupmod " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def groupdel(self, *args) -> ProcessResult:
|
|
"""
|
|
Delete group.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(f'Deleting group "{args_dict["name"]}" on {self.host.hostname}')
|
|
cmd = self.host.conn.run("groupdel " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def chage(self, *args) -> ProcessResult:
|
|
"""
|
|
Change user password expiry information.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(
|
|
f'Changing user password expiry information on user "{args_dict["name"]}" on {self.host.hostname}'
|
|
)
|
|
cmd = self.host.conn.run("chage " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/passwd")
|
|
self.host.discard_file("/etc/shadow")
|
|
|
|
return cmd
|
|
|
|
def newusers(self, *args, users_data: str | None = None) -> ProcessResult:
|
|
"""
|
|
Update or create new users in batch.
|
|
|
|
Updates or creates multiple users by reading account information in passwd
|
|
format. If `users_data` is provided, it's passed via stdin and takes
|
|
precedence; otherwise, the command reads from a file specified in `args`.
|
|
"""
|
|
if users_data:
|
|
cmd_args = " ".join(args)
|
|
self.logger.info(f"Creating users from stdin on {self.host.hostname}")
|
|
cmd = self.host.conn.run(f"echo '{users_data}' | newusers {cmd_args}", log_level=ProcessLogLevel.Error)
|
|
else:
|
|
args_dict = self._parse_args(args)
|
|
self.logger.info(f'Creating users from "{args_dict["name"]}" on {self.host.hostname}')
|
|
cmd = self.host.conn.run("newusers " + args[0], log_level=ProcessLogLevel.Error)
|
|
|
|
self.host.discard_file("/etc/passwd")
|
|
self.host.discard_file("/etc/shadow")
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def groupmems(self, *args, run_as: str = "root", password: str | None = None) -> ProcessResult:
|
|
"""
|
|
Administer members of a user's primary group.
|
|
|
|
The groupmems command allows management of group membership lists.
|
|
If `run_as` is provided, then the `password` must be provided and the command groupmems run under this user.
|
|
If `run_as` isn't provided, then the command is run under `root`.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
|
|
if run_as == "root":
|
|
self.logger.info(f'Administer {args_dict["name"]} group membership as root on {self.host.hostname}')
|
|
cmd = self.host.conn.run("groupmems " + args[0], log_level=ProcessLogLevel.Error)
|
|
else:
|
|
self.logger.info(f'Administer {args_dict["name"]} group membership on {self.host.hostname}')
|
|
cmd = self.host.conn.run(
|
|
f"echo '{password}' | su - {run_as} -c 'groupmems {args[0]}'", log_level=ProcessLogLevel.Error
|
|
)
|
|
|
|
self.host.discard_file("/etc/group")
|
|
self.host.discard_file("/etc/gshadow")
|
|
|
|
return cmd
|
|
|
|
def newgrp(self, *args, run_as: str = "root") -> Tuple[ProcessResult, int]:
|
|
"""
|
|
Log in to a new group.
|
|
|
|
The newgrp command is used to change the current group ID during a login session.
|
|
Returns the process result and the group ID after the change.
|
|
"""
|
|
args_dict = self._parse_args(args)
|
|
|
|
self.logger.info(f'Changing {run_as} to group "{args_dict["name"]}" on {self.host.hostname}')
|
|
|
|
# Use expect to handle the interactive newgrp session
|
|
result = self.host.conn.expect(
|
|
rf"""
|
|
set timeout {DEFAULT_INTERACTIVE_TIMEOUT}
|
|
set prompt "\[#\$>\] $"
|
|
|
|
if {{ "{run_as}" eq "root" }} {{
|
|
spawn newgrp {args[0]}
|
|
}} else {{
|
|
spawn su - {run_as}
|
|
expect {{
|
|
-re $prompt {{send "newgrp {args[0]}\n"}}
|
|
timeout {{puts "expect result: Timeout waiting for su prompt"; exit 201}}
|
|
eof {{puts "expect result: Unexpected end of file"; exit 202}}
|
|
}}
|
|
}}
|
|
|
|
expect {{
|
|
-re $prompt {{send "id -g\n"}}
|
|
timeout {{puts "expect result: Timeout waiting for newgrp prompt"; exit 201}}
|
|
eof {{puts "expect result: Unexpected end of file"; exit 202}}
|
|
}}
|
|
|
|
expect {{
|
|
-re "(\[0-9\]+)" {{
|
|
set gid $expect_out(1,string)
|
|
send "exit\n"
|
|
puts "newgrp_gid:$gid"
|
|
}}
|
|
timeout {{puts "expect result: Timeout waiting for id output"; exit 201}}
|
|
eof {{puts "expect result: Unexpected end of file"; exit 202}}
|
|
}}
|
|
|
|
if {{ "{run_as}" ne "root" }} {{
|
|
expect {{
|
|
-re $prompt {{
|
|
send "exit\n"
|
|
}}
|
|
timeout {{puts "expect result: Timeout waiting for original su prompt"; exit 201}}
|
|
}}
|
|
}}
|
|
|
|
expect {{
|
|
eof {{exit 0}}
|
|
timeout {{exit 201}}
|
|
}}
|
|
""",
|
|
verbose=False,
|
|
)
|
|
|
|
if result.rc > 200:
|
|
raise ExpectScriptError(result.rc)
|
|
|
|
gid_line = None
|
|
for line in result.stdout_lines:
|
|
if "newgrp_gid:" in line:
|
|
gid_line = line
|
|
break
|
|
|
|
if gid_line is None:
|
|
raise ValueError("Current GID is required for newgrp")
|
|
|
|
current_gid = int(gid_line.split(":")[1])
|
|
|
|
return result, current_gid
|