Files
herolib/lib/osal/sshagent/security.v
2025-10-12 12:30:19 +03:00

242 lines
6.6 KiB
V

module sshagent
import os
import incubaid.herolib.core.texttools
// Security validation functions for SSH agent operations
// validate_key_name ensures SSH key names are safe and follow conventions
pub fn validate_key_name(name string) !string {
if name.len == 0 {
return error('SSH key name cannot be empty')
}
if name.len > 255 {
return error('SSH key name too long (max 255 characters)')
}
// Check for dangerous characters
dangerous_chars := ['/', '\\', '..', '~', '$', '`', ';', '|', '&', '>', '<', '*', '?', '[',
']', '{', '}', '(', ')', '"', "'", ' ']
for dangerous_char in dangerous_chars {
if name.contains(dangerous_char) {
return error('SSH key name contains invalid character: ${dangerous_char}')
}
}
// Ensure it starts with alphanumeric
if !name[0].is_alnum() {
return error('SSH key name must start with alphanumeric character')
}
return texttools.name_fix(name)
}
// validate_private_key checks if the provided string is a valid SSH private key
pub fn validate_private_key(privkey string) !string {
if privkey.len == 0 {
return error('Private key cannot be empty')
}
// Check for valid private key headers
valid_headers := [
'-----BEGIN OPENSSH PRIVATE KEY-----',
'-----BEGIN RSA PRIVATE KEY-----',
'-----BEGIN DSA PRIVATE KEY-----',
'-----BEGIN EC PRIVATE KEY-----',
'-----BEGIN PRIVATE KEY-----',
]
mut has_valid_header := false
for header in valid_headers {
if privkey.contains(header) {
has_valid_header = true
break
}
}
if !has_valid_header {
return error('Invalid private key format - missing valid header')
}
// Check for corresponding footer
valid_footers := [
'-----END OPENSSH PRIVATE KEY-----',
'-----END RSA PRIVATE KEY-----',
'-----END DSA PRIVATE KEY-----',
'-----END EC PRIVATE KEY-----',
'-----END PRIVATE KEY-----',
]
mut has_valid_footer := false
for footer in valid_footers {
if privkey.contains(footer) {
has_valid_footer = true
break
}
}
if !has_valid_footer {
return error('Invalid private key format - missing valid footer')
}
// Basic length check (private keys should be substantial)
if privkey.len < 200 {
return error('Private key appears to be too short')
}
return privkey
}
// validate_file_path ensures file paths are safe and within expected directories
pub fn validate_file_path(path string, base_dir string) !string {
if path.len == 0 {
return error('File path cannot be empty')
}
// Resolve absolute path
abs_path := os.abs_path(path)
abs_base := os.abs_path(base_dir)
// Ensure path is within base directory (prevent directory traversal)
if !abs_path.starts_with(abs_base) {
return error('File path outside of allowed directory: ${path}')
}
// Check for dangerous path components
dangerous_components := ['..', './', '~/', '$']
for component in dangerous_components {
if path.contains(component) {
return error('File path contains dangerous component: ${component}')
}
}
return abs_path
}
// secure_file_permissions sets appropriate permissions for SSH key files
pub fn secure_file_permissions(file_path string, is_private bool) ! {
if !os.exists(file_path) {
return error('File does not exist: ${file_path}')
}
if is_private {
// Private keys should be readable/writable only by owner
os.chmod(file_path, 0o600)!
} else {
// Public keys can be readable by others
os.chmod(file_path, 0o644)!
}
}
// get_secure_socket_path returns a secure socket path for the given user
pub fn get_secure_socket_path(user string) !string {
if user.len == 0 {
return error('User cannot be empty')
}
// Validate user name
validated_user := validate_key_name(user)!
// Use more secure temporary directory if available
mut temp_dir := '/tmp'
// Check for user-specific temp directory
user_temp := os.getenv('XDG_RUNTIME_DIR')
if user_temp.len > 0 && os.exists(user_temp) {
temp_dir = user_temp
}
socket_path := '${temp_dir}/ssh-agent-${validated_user}.sock'
// Ensure parent directory exists and has correct permissions
parent_dir := os.dir(socket_path)
if !os.exists(parent_dir) {
os.mkdir_all(parent_dir)!
os.chmod(parent_dir, 0o700)! // Only owner can access
}
return socket_path
}
// sanitize_environment_variables cleans SSH-related environment variables
pub fn sanitize_environment_variables() {
// List of SSH-related environment variables that might need cleaning
ssh_env_vars := ['SSH_AUTH_SOCK', 'SSH_AGENT_PID', 'SSH_CLIENT', 'SSH_CONNECTION']
for var in ssh_env_vars {
env_val := os.getenv(var)
if env_val.len > 0 {
// Basic validation of environment variable values
if env_val.contains('..') || env_val.contains(';') || env_val.contains('|') {
// Unset potentially dangerous environment variables
os.unsetenv(var)
}
}
}
}
// validate_passphrase checks passphrase strength (basic validation)
pub fn validate_passphrase(passphrase string) !string {
// Allow empty passphrase (user choice)
if passphrase.len == 0 {
return passphrase
}
// Basic length check
if passphrase.len < 8 {
return error('Passphrase should be at least 8 characters long')
}
// Check for common weak passphrases
weak_passphrases := ['password', '12345678', 'qwerty', 'admin', 'root', 'test']
for weak in weak_passphrases {
if passphrase.to_lower() == weak {
return error('Passphrase is too weak - avoid common passwords')
}
}
return passphrase
}
// check_system_security performs basic system security checks
pub fn check_system_security() !map[string]string {
mut security_status := map[string]string{}
// Check if running as root (generally not recommended)
if os.getuid() == 0 {
security_status['root_user'] = 'WARNING: Running as root user'
} else {
security_status['root_user'] = 'OK: Not running as root'
}
// Check SSH directory permissions
ssh_dir := '${os.home_dir()}/.ssh'
if os.exists(ssh_dir) {
// Get directory permissions (simplified check)
if os.is_readable(ssh_dir) && os.is_writable(ssh_dir) {
security_status['ssh_dir_permissions'] = 'OK: SSH directory accessible'
} else {
security_status['ssh_dir_permissions'] = 'WARNING: SSH directory permission issues'
}
} else {
security_status['ssh_dir_permissions'] = 'INFO: SSH directory does not exist'
}
// Check for SSH agent processes
user := os.getenv('USER')
res := os.execute('pgrep -u ${user} ssh-agent | wc -l')
if res.exit_code == 0 {
agent_count := res.output.trim_space().int()
if agent_count == 0 {
security_status['ssh_agents'] = 'INFO: No SSH agents running'
} else if agent_count == 1 {
security_status['ssh_agents'] = 'OK: One SSH agent running'
} else {
security_status['ssh_agents'] = 'WARNING: Multiple SSH agents running (${agent_count})'
}
}
return security_status
}