Source code for monday.http_adapters

"""
HTTP transport adapters for monday-client.

Provides a pluggable transport abstraction so we can support different HTTP stacks
like aiohttp or httpx without changing the public client API.
"""

from __future__ import annotations

import base64
import importlib.util as _import_util
import logging
from typing import Any
from urllib.parse import urlparse

import aiohttp

logger = logging.getLogger(__name__)


[docs] class AiohttpAdapter: """Adapter that uses aiohttp for requests.""" def __init__( # noqa: PLR0913 self, *, proxy_url: str | None, proxy_auth: tuple[str, str] | None, proxy_auth_type: str, proxy_trust_env: bool, proxy_ssl_verify: bool, timeout_seconds: int, ) -> None: self._proxy_url = proxy_url self._proxy_auth = proxy_auth self._proxy_auth_type = proxy_auth_type self._proxy_trust_env = proxy_trust_env self._proxy_ssl_verify = proxy_ssl_verify self._timeout_seconds = timeout_seconds def _build_request_proxy_kwargs(self) -> dict[str, Any]: if not self._proxy_url: return {} scheme = urlparse(self._proxy_url).scheme.lower() if scheme.startswith('socks'): return {} kwargs: dict[str, Any] = {'proxy': self._proxy_url} if self._proxy_auth: if self._proxy_auth_type and self._proxy_auth_type != 'basic': # Silent fallback to basic; the client already logs a warning pass kwargs['proxy_auth'] = aiohttp.BasicAuth( self._proxy_auth[0], self._proxy_auth[1] ) if scheme == 'https' and self._proxy_ssl_verify is False: kwargs['proxy_ssl'] = False return kwargs def _create_connector(self) -> aiohttp.BaseConnector | None: if not self._proxy_url: return None scheme = urlparse(self._proxy_url).scheme.lower() if scheme.startswith('socks'): try: from aiohttp_socks import ProxyConnector # noqa: PLC0415 except ImportError: return None username = self._proxy_auth[0] if self._proxy_auth else None password = self._proxy_auth[1] if self._proxy_auth else None return ProxyConnector.from_url( self._proxy_url, username=username, password=password ) return None
[docs] async def post( self, *, url: str, json: dict[str, Any], headers: dict[str, str], timeout_seconds: int | None = None, ) -> tuple[dict[str, Any], dict[str, str]]: """Send a POST request with JSON and return the parsed JSON and headers.""" timeout = aiohttp.ClientTimeout(total=timeout_seconds or self._timeout_seconds) connector = self._create_connector() async with ( aiohttp.ClientSession( connector=connector, timeout=timeout, trust_env=self._proxy_trust_env, ) as session, session.post( url, json=json, headers=headers, **self._build_request_proxy_kwargs() ) as response, ): response_headers = dict(response.headers) try: data = await response.json() except aiohttp.ContentTypeError: text = await response.text() return {'error': f'Non-JSON response: {text[:200]}'}, response_headers return data, response_headers
[docs] class HttpxAdapter: """ Adapter that uses httpx for requests. Notes: - Requires httpx to be installed by the user. Imported lazily. - Supports basic proxy auth. Advanced auth may require plugins. - Allows disabling TLS verification when talking to an HTTPS proxy via a custom proxy SSL context. """ # Handle proxy 407 challenge for Kerberos/SPNEGO - single retry PROXY_AUTH_REQUIRED = 407 def __init__( # noqa: PLR0913 self, *, proxy_url: str | None, proxy_auth: tuple[str, str] | None, proxy_auth_type: str, proxy_trust_env: bool, proxy_headers: dict[str, str] | None, proxy_ssl_verify: bool, timeout_seconds: int, ) -> None: self._proxy_url = proxy_url self._proxy_auth = proxy_auth self._proxy_auth_type = proxy_auth_type self._proxy_trust_env = proxy_trust_env self._proxy_headers = proxy_headers or {} self._proxy_ssl_verify = proxy_ssl_verify self._timeout_seconds = timeout_seconds def _build_client( # noqa: PLR0912 self, *, override_proxy_headers: dict[str, str] | None = None, negotiate_token_b64: str | None = None, ): import httpx # noqa: PLC0415 proxy_url: str | None = None if self._proxy_url: parsed = urlparse(self._proxy_url) if self._proxy_auth and not (parsed.username or parsed.password): username, password = self._proxy_auth proxy_url = f'{parsed.scheme}://{username}:{password}@{parsed.hostname}' if parsed.port: proxy_url += f':{parsed.port}' else: proxy_url = self._proxy_url # Warn if SOCKS scheme is used but socks support likely missing if ( parsed.scheme.lower().startswith('socks') and _import_util.find_spec('socksio') is None ): logger.warning( 'SOCKS proxy specified but socks support not installed. ' 'Install with: pip install "httpx[socks]" or monday-client[httpx]' ) # Build proxy object with headers proxy_headers: dict[str, str] = dict(self._proxy_headers) if override_proxy_headers: proxy_headers.update(override_proxy_headers) if negotiate_token_b64: proxy_headers['Proxy-Authorization'] = f'Negotiate {negotiate_token_b64}' client_kwargs: dict[str, Any] = { 'trust_env': self._proxy_trust_env, 'timeout': self._timeout_seconds, } if proxy_url: proxy_obj: Any if proxy_headers: proxy_obj = httpx.Proxy(proxy_url, headers=proxy_headers) else: proxy_obj = proxy_url client_kwargs['proxy'] = proxy_obj # Note: Disabling TLS verification for HTTPS proxies is not directly configurable # in httpx per-proxy. If needed, a custom transport would be required. client = httpx.AsyncClient(**client_kwargs) # Attach advanced auth plugins if available (optional deps) # NTLM (server-side or transparent proxies) if self._proxy_auth_type and self._proxy_auth_type.lower() == 'ntlm': try: from httpx_ntlm import HttpNtlmAuth # noqa: PLC0415 if self._proxy_auth: username, password = self._proxy_auth client.auth = HttpNtlmAuth(username, password) except Exception: # noqa: BLE001 logger.warning('httpx-ntlm not installed; NTLM not enabled') # Kerberos / SPNEGO via pyspnego - initialize context if ( self._proxy_url and self._proxy_auth_type and self._proxy_auth_type.lower() in {'kerberos', 'spnego'} ): try: import spnego # noqa: PLC0415 if not hasattr(self, '_spnego_ctx'): self._spnego_ctx = spnego.client(protocol='negotiate') except Exception: # noqa: BLE001 logger.warning( 'Kerberos/SPNEGO initialization failed; proceeding without proxy token' ) return client
[docs] async def post( self, *, url: str, json: dict[str, Any], headers: dict[str, str], timeout_seconds: int | None = None, ) -> tuple[dict[str, Any], dict[str, str]]: """Send a POST request with JSON and return the parsed JSON and headers.""" # If Kerberos/SPNEGO, attempt an initial token for CONNECT negotiate_b64: str | None = None if ( self._proxy_url and self._proxy_auth_type and self._proxy_auth_type.lower() in {'kerberos', 'spnego'} ): try: if hasattr(self, '_spnego_ctx') and self._spnego_ctx is not None: initial = self._spnego_ctx.step(None) if initial: negotiate_b64 = base64.b64encode(initial).decode('ascii') except Exception: # noqa: BLE001 logger.warning('Kerberos initial token generation failed') client = self._build_client(negotiate_token_b64=negotiate_b64) try: resp = await client.post( url, json=json, headers=headers, timeout=timeout_seconds or self._timeout_seconds, ) if ( resp.status_code == self.PROXY_AUTH_REQUIRED and hasattr(self, '_spnego_ctx') and self._spnego_ctx is not None ): challenge = resp.headers.get('Proxy-Authenticate', '') if 'Negotiate ' in challenge: try: token_b64 = challenge.split('Negotiate ', 1)[1].strip() in_token = base64.b64decode(token_b64) out_token = self._spnego_ctx.step(in_token) if out_token: b64 = base64.b64encode(out_token).decode('ascii') # Rebuild client with updated Proxy-Authorization and resend once new_client = self._build_client(negotiate_token_b64=b64) resp = await new_client.post( url, json=json, headers=headers, timeout=timeout_seconds or self._timeout_seconds, ) response_headers = dict(resp.headers) data = resp.json() await new_client.aclose() return data, response_headers except Exception: # noqa: BLE001 logger.warning('Kerberos proxy challenge handling failed') response_headers = dict(resp.headers) data = resp.json() return data, response_headers finally: await client.aclose()