# This file is part of monday-client.
#
# Copyright (C) 2024 Leet Cyber Security <https://leetcybersecurity.com/>
#
# monday-client is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# monday-client is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with monday-client. If not, see <https://www.gnu.org/licenses/>.
"""
Configuration system for monday-client.
This module provides configuration classes and providers for managing
monday-client settings from various sources.
"""
import json
import os
import warnings
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from monday.protocols import ConfigProtocol
[docs]
@dataclass
class Config:
"""
Configuration for MondayClient.
This class centralizes all configuration options for the MondayClient,
focusing on settings that are actually used.
"""
api_key: str
"""The API key for authenticating with the monday.com API."""
url: str = 'https://api.monday.com/v2'
"""The endpoint URL for the monday.com API."""
version: str | None = None
"""The API version to use. If None, will automatically fetch current version."""
headers: dict[str, Any] = field(default_factory=dict)
"""Additional HTTP headers for API requests."""
max_retries: int = 4
"""Maximum number of retry attempts."""
timeout: int = 30
"""Request timeout in seconds."""
rate_limit_seconds: int = 60
"""Rate limit window in seconds."""
proxy_url: str | None = None
"""Proxy URL (e.g., 'http://proxy.example.com:8080' or 'socks5://proxy.example.com:1080')."""
proxy_auth: tuple[str, str] | None = None
"""Proxy authentication as (username, password) tuple for basic authentication."""
proxy_auth_type: str = 'basic'
"""
Proxy authentication type.
Supported values:
- 'basic': Supported by both transports
- 'ntlm': Supported by httpx transport (requires httpx-ntlm)
- 'kerberos' or 'spnego': Supported by httpx transport (requires pyspnego)
- 'none': No proxy authentication
"""
proxy_trust_env: bool = False
"""Whether to trust proxy settings from environment variables (HTTP_PROXY, HTTPS_PROXY, etc.)."""
proxy_headers: dict[str, str] = field(default_factory=dict)
"""Additional headers to send to the proxy server."""
proxy_ssl_verify: bool = True
"""Whether to verify SSL certificates when connecting through HTTPS proxies."""
[docs]
def validate(self) -> None:
"""Validate all configuration values."""
if not self.api_key:
error_msg = 'api_key is required'
raise ValueError(error_msg)
if self.max_retries < 0:
error_msg = 'max_retries must be non-negative'
raise ValueError(error_msg)
if self.timeout <= 0:
error_msg = 'timeout must be positive'
raise ValueError(error_msg)
if self.rate_limit_seconds <= 0:
error_msg = 'rate_limit_seconds must be positive'
raise ValueError(error_msg)
# Validate proxy authentication type
valid_proxy_auth_types = {
'basic',
'ntlm', # httpx transport only
'kerberos', # httpx transport only
'spnego', # httpx transport only
'none',
}
if self.proxy_auth_type not in valid_proxy_auth_types:
error_msg = f'proxy_auth_type must be one of {valid_proxy_auth_types}'
raise ValueError(error_msg)
# Validate proxy authentication consistency
if (
self.proxy_auth_type != 'none'
and self.proxy_auth is None
and self.proxy_url
):
# Only warn if proxy_url is set but no auth provided for non-none auth types
warnings.warn(
f'proxy_auth_type is set to "{self.proxy_auth_type}" but proxy_auth is None. '
'Consider setting proxy_auth_type to "none" if authentication is not required.',
UserWarning,
stacklevel=2,
)
[docs]
@classmethod
def from_env(cls, prefix: str = 'MONDAY_') -> 'Config':
"""Create configuration from environment variables."""
# Handle proxy auth from environment
proxy_auth = None
proxy_user = os.environ.get(f'{prefix}PROXY_USER')
proxy_pass = os.environ.get(f'{prefix}PROXY_PASS')
if proxy_user and proxy_pass:
proxy_auth = (proxy_user, proxy_pass)
# Handle proxy headers from environment (JSON format)
proxy_headers = {}
proxy_headers_env = os.environ.get(f'{prefix}PROXY_HEADERS')
if proxy_headers_env:
try:
proxy_headers = json.loads(proxy_headers_env)
except json.JSONDecodeError:
warnings.warn(
f'Invalid JSON in {prefix}PROXY_HEADERS environment variable. Using empty headers.',
UserWarning,
stacklevel=2,
)
return cls(
api_key=os.environ[f'{prefix}API_KEY'],
url=os.environ.get(f'{prefix}URL', 'https://api.monday.com/v2'),
version=os.environ.get(f'{prefix}VERSION'),
max_retries=int(os.environ.get(f'{prefix}MAX_RETRIES', '4')),
timeout=int(os.environ.get(f'{prefix}TIMEOUT', '30')),
rate_limit_seconds=int(os.environ.get(f'{prefix}RATE_LIMIT_SECONDS', '60')),
proxy_url=os.environ.get(f'{prefix}PROXY_URL'),
proxy_auth=proxy_auth,
proxy_auth_type=os.environ.get(f'{prefix}PROXY_AUTH_TYPE', 'basic'),
proxy_trust_env=os.environ.get(f'{prefix}PROXY_TRUST_ENV', 'false').lower()
== 'true',
proxy_headers=proxy_headers,
proxy_ssl_verify=os.environ.get(f'{prefix}PROXY_SSL_VERIFY', 'true').lower()
== 'true',
)
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> 'Config':
"""Create configuration from dictionary."""
# Handle proxy auth from dict
proxy_auth = None
if 'proxy_auth' in data:
proxy_auth_data = data['proxy_auth']
if isinstance(proxy_auth_data, (list, tuple)) and len(proxy_auth_data) == 2: # noqa: PLR2004
proxy_auth = tuple(proxy_auth_data)
return cls(
api_key=data['api_key'],
url=data.get('url', 'https://api.monday.com/v2'),
version=data.get('version'),
headers=data.get('headers', {}),
max_retries=data.get('max_retries', 4),
timeout=data.get('timeout', 30),
rate_limit_seconds=data.get('rate_limit_seconds', 60),
proxy_url=data.get('proxy_url'),
proxy_auth=proxy_auth,
proxy_auth_type=data.get('proxy_auth_type', 'basic'),
proxy_trust_env=data.get('proxy_trust_env', False),
proxy_headers=data.get('proxy_headers', {}),
proxy_ssl_verify=data.get('proxy_ssl_verify', True),
)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert configuration to dictionary."""
return asdict(self)
[docs]
class EnvConfig(ConfigProtocol):
"""Load configuration from environment variables."""
def __init__(self, prefix: str = 'MONDAY_'):
self.prefix = prefix
self._config: Config | None = None
[docs]
def get_config(self) -> Config:
"""Get configuration from environment variables."""
if self._config is None:
self._config = Config.from_env(self.prefix)
return self._config
[docs]
def validate_config(self) -> bool:
"""Validate the configuration."""
try:
config = self.get_config()
config.validate()
except (ValueError, KeyError):
return False
else:
return True
[docs]
def reload_config(self) -> None:
"""Reload configuration from environment."""
self._config = None
[docs]
def get_env_value(self, key: str, default: Any = None) -> Any:
"""Get an environment variable value with optional prefix."""
# Try with prefix first, then without prefix
prefixed_key = f'{self.prefix}{key}'.upper()
value = os.environ.get(prefixed_key)
if value is not None:
return value
# Try without prefix as fallback
return os.environ.get(key.upper(), default)
[docs]
def get_value(self, key: str, default: Any = None) -> Any:
"""Get a value from environment variables (alias for get_env_value)."""
return self.get_env_value(key, default)
[docs]
class JsonConfig(ConfigProtocol):
"""Load configuration from JSON file."""
def __init__(self, config_path: str | Path):
self.config_path = Path(config_path)
self._config: Config | None = None
self._last_modified: float | None = None
self._raw_data: dict[str, Any] | None = None
[docs]
def get_config(self) -> Config:
"""Get configuration from JSON file."""
if self._should_reload():
self._load_config()
if self._config is None:
error_msg = 'Failed to load configuration'
raise ValueError(error_msg)
return self._config
[docs]
def validate_config(self) -> bool:
"""Validate the configuration."""
try:
if not self.config_path.exists():
return False
config = self.get_config()
config.validate()
except (ValueError, json.JSONDecodeError, KeyError):
return False
else:
return True
[docs]
def reload_config(self) -> None:
"""Force reload configuration from file."""
self._config = None
self._last_modified = None
self._raw_data = None
[docs]
def get_raw_data(self) -> dict[str, Any]:
"""Get raw JSON data for application-specific values."""
if self._should_reload():
self._load_config()
if self._raw_data is None:
error_msg = 'Failed to load JSON data'
raise ValueError(error_msg)
return self._raw_data
[docs]
def get_value(self, key: str, default: Any = None) -> Any:
"""Get a specific value from the raw JSON data."""
data = self.get_raw_data()
return data.get(key, default)
def _should_reload(self) -> bool:
"""Check if config should be reloaded."""
if self._config is None:
return True
if not self.config_path.exists():
return False
current_modified = self.config_path.stat().st_mtime
return current_modified != self._last_modified
def _load_config(self) -> None:
"""Load config from file."""
with self.config_path.open() as f:
data = json.load(f)
self._raw_data = data
self._config = Config.from_dict(data)
self._last_modified = self.config_path.stat().st_mtime
[docs]
class MultiSourceConfig(ConfigProtocol):
"""Load configuration from multiple sources with priority."""
def __init__(self, providers: list[ConfigProtocol]):
self.providers = providers
self._config: Config | None = None
[docs]
def get_config(self) -> Config:
"""Get merged configuration from all providers."""
if self._config is None:
self._merge_configs()
if self._config is None:
error_msg = 'Failed to load configuration from any provider'
raise ValueError(error_msg)
return self._config
[docs]
def validate_config(self) -> bool:
"""Validate the merged configuration."""
try:
config = self.get_config()
config.validate()
except ValueError:
return False
else:
return True
[docs]
def reload_config(self) -> None:
"""Reload all configurations."""
for provider in self.providers:
reload_method = getattr(provider, 'reload_config', None)
if callable(reload_method):
reload_method()
self._config = None
def _merge_configs(self) -> None:
"""Merge configurations from all providers."""
if not self.providers:
error_msg = 'No config providers specified'
raise ValueError(error_msg)
# Start with first provider
base_config = self.providers[0].get_config()
# Merge additional providers (later providers override earlier ones)
for provider in self.providers[1:]:
try:
overlay_config = provider.get_config()
base_config = self._merge_two_configs(base_config, overlay_config)
except Exception: # noqa: BLE001,S112
continue
self._config = base_config
def _merge_two_configs(self, base: Config, overlay: Config) -> Config:
"""Merge two configurations with overlay taking precedence."""
base_dict = base.to_dict()
overlay_dict = overlay.to_dict()
# Merge headers
if 'headers' in overlay_dict and 'headers' in base_dict:
merged_headers = {**base_dict['headers'], **overlay_dict['headers']}
overlay_dict['headers'] = merged_headers
# Merge dictionaries (overlay takes precedence)
merged_data = {**base_dict, **overlay_dict}
return Config.from_dict(merged_data)
try:
import yaml
[docs]
class YamlConfig(ConfigProtocol):
"""Load configuration from YAML file."""
def __init__(self, config_path: str | Path):
self.config_path = Path(config_path)
self._config: Config | None = None
self._raw_data: dict[str, Any] | None = None
[docs]
def get_config(self) -> Config:
"""Get configuration from YAML file."""
if self._config is None:
self._load_config()
if self._config is None:
error_msg = 'Failed to load YAML configuration'
raise ValueError(error_msg)
return self._config
[docs]
def validate_config(self) -> bool:
"""Validate the configuration."""
try:
if not self.config_path.exists():
return False
config = self.get_config()
config.validate()
except (ValueError, yaml.YAMLError, KeyError):
return False
else:
return True
[docs]
def reload_config(self) -> None:
"""Reload configuration from file."""
self._config = None
self._raw_data = None
[docs]
def get_raw_data(self) -> dict[str, Any]:
"""Get raw YAML data for application-specific values."""
if self._raw_data is None:
self._load_config()
if self._raw_data is None:
error_msg = 'Failed to load YAML data'
raise ValueError(error_msg)
return self._raw_data
[docs]
def get_value(self, key: str, default: Any = None) -> Any:
"""Get a specific value from the raw YAML data."""
data = self.get_raw_data()
return data.get(key, default)
def _load_config(self) -> None:
"""Load config from YAML file."""
with self.config_path.open() as f:
data = yaml.safe_load(f)
self._raw_data = data
self._config = Config.from_dict(data)
except ImportError:
pass