# This file is part of monday-client.
#
# Copyright (C) 2024 Leet Cyber Security <https://leetcybersecurity.com/>
#
# monday-client is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# monday-client is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with monday-client. If not, see <https://www.gnu.org/licenses/>.
"""
Utilities for handling GraphQL field combinations.
This module provides the Fields class for managing GraphQL field strings in a structured way.
It handles field parsing, combination, and deduplication while maintaining field order and
nested structure integrity.
Example:
.. code-block:: python
Basic field combination:
>>> fields1 = Fields('id name')
>>> fields2 = Fields('name description')
>>> combined = fields1 + fields2
>>> str(combined)
'id name description'
Handling nested fields:
>>> nested = Fields('id name items { id title }')
>>> str(nested)
'id name items { id title }'
String addition:
>>> fields = Fields('id name') + 'description'
>>> str(fields)
'id name description'
"""
import ast
import logging
import re
from typing import Any, Union
[docs]
class Fields:
"""
Helper class for handling GraphQL field combinations.
This class provides structured handling of GraphQL field strings, including:
- Parsing field strings while preserving nested structures
- Combining multiple field sets while maintaining order
- Converting back to GraphQL-compatible strings
Args:
fields: Either a space-separated string of field names or another Fields instance. Can include nested structures using GraphQL syntax.
Attributes:
fields: List of parsed and normalized field strings.
Example:
>>> basic_fields = Fields('id name')
>>> extended_fields = basic_fields + 'description'
>>> print(extended_fields)
'id name description'
>>> nested_fields = Fields('id items { id name }')
>>> print(nested_fields)
'id items { id name }'
"""
_logger: logging.Logger = logging.getLogger(__name__)
def __init__(self, fields: Union[str, 'Fields']):
"""
Initialize a Fields instance.
"""
fields_str = str(fields)
# Validate fields before parsing/deduplication
self._validate_fields(fields_str)
# Always deduplicate and merge
deduped = self._deduplicate_nested_fields(fields_str)
self.fields = self._parse_fields(deduped)
[docs]
def __str__(self) -> str:
"""
Convert back to a GraphQL-compatible field string.
"""
return ' '.join(self.fields)
[docs]
def __repr__(self) -> str:
"""
Return a string representation of the Fields object.
Returns:
String representation that can be used to recreate the object.
Example:
>>> fields = Fields('id name')
>>> repr(fields)
"Fields('id name')"
"""
return f"Fields('{self!s}')"
[docs]
def __add__(self, other: Union['Fields', str]) -> 'Fields':
"""
Combine two field lists, maintaining order and preserving nested structures.
Args:
other: Either a Fields instance or a field string to combine with this instance.
Returns:
New Fields instance containing combined fields.
Example:
>>> fields1 = Fields('id name top_group { id title }')
>>> fields2 = Fields('groups { id title }')
>>> str(fields1 + fields2)
'id name top_group { id title } groups { id title }'
"""
# Convert string to Fields if needed
if isinstance(other, str):
other = Fields(other)
# Create a combined string and let the parser handle deduplication
# Track the original order by combining the input strings
combined_str = str(self) + ' ' + str(other)
return Fields(combined_str)
[docs]
def __sub__(self, other: Union['Fields', str]) -> 'Fields':
"""
Subtract fields from another Fields object or a string.
Args:
other: Fields instance or string containing fields to subtract.
Returns:
New Fields instance with specified fields removed.
Example:
>>> fields1 = Fields('id name board { id title }')
>>> fields2 = Fields('name board { title }')
>>> str(fields1 - fields2)
'id board { id }'
"""
# Convert string to Fields object if needed
if isinstance(other, str):
other = Fields(other)
if not other.fields:
return Fields(str(self))
result_fields = []
for field in self.fields:
# Check if this is a nested field
if '{' in field:
base_field = field.split(' {')[0].split(' (')[0]
# Find corresponding field in other
other_field = next(
(
f
for f in other.fields
if f.startswith(f'{base_field} {{') or f == base_field
),
None,
)
if other_field:
if '{' in other_field: # Both have nested content
# Extract and compare nested content
self_nested = self._extract_nested_content(field)
other_nested = self._extract_nested_content(other_field)
# Create new Fields objects for nested content
self_nested_fields = Fields(self_nested)
other_nested_fields = Fields(other_nested)
# Recursively subtract nested fields
diff_nested = self_nested_fields - other_nested_fields
if str(diff_nested): # If there are remaining fields
# Preserve arguments if they exist
args_start = field.find('(')
args_end = field.find(')')
if args_start != -1 and args_end != -1:
args = field[args_start : args_end + 1]
result_fields.append(
f'{base_field}{args} {{ {diff_nested!s} }}'
)
else:
result_fields.append(
f'{base_field} {{ {diff_nested!s} }}'
)
else:
# Other field has no nested content, so remove this field entirely
continue
else:
result_fields.append(field)
# Handle non-nested fields
elif field not in other.fields:
result_fields.append(field)
return Fields(' '.join(result_fields))
[docs]
def __contains__(self, field: str) -> bool:
"""
Check if a field exists in the Fields instance.
Args:
field: Field name to check for.
Returns:
True if field exists, False otherwise.
Example:
>>> fields = Fields('id name')
>>> 'name' in fields
True
>>> ' name ' in fields # Whitespace is normalized
True
>>> 'board' in fields
False
"""
field = field.strip() # Normalize the input field by stripping whitespace
return any(
f.strip().startswith(field + ' ') # field at start
or f.strip() == field # exact match
or f' {field} ' in f # field in middle
or f.strip().endswith(f' {field}') # field at end
for f in self.fields
)
[docs]
def __eq__(self, other: object) -> bool:
"""
Check if two Fields instances are equal.
Args:
other: Another object to compare with.
Returns:
True if other is a Fields instance with identical fields, False otherwise.
Example:
>>> fields1 = Fields('id name')
>>> fields2 = Fields('id name')
>>> fields1 == fields2
True
>>> fields3 = Fields('id description')
>>> fields1 == fields3
False
"""
if isinstance(other, Fields):
return self.fields == other.fields
return False
[docs]
def __hash__(self) -> int:
"""
Return hash value for the Fields instance.
Returns:
Hash value based on the fields list.
Example:
>>> fields1 = Fields('id name')
>>> fields2 = Fields('id name')
>>> hash(fields1) == hash(fields2)
True
"""
return hash(tuple(self.fields))
[docs]
def add_temp_fields(self, temp_fields: list[str]) -> 'Fields':
"""
Add temporary fields while preserving nested structures.
Args:
temp_fields: List of field names to temporarily add
Returns:
New Fields instance with temporary fields added
Example:
>>> fields = Fields('id name')
>>> new_fields = fields.add_temp_fields(['temp1', 'temp2'])
>>> str(new_fields)
'id name temp1 temp2'
>>> fields = Fields('name products { kind }')
>>> new_fields = fields.add_temp_fields(['id', 'products { id }'])
>>> str(new_fields)
'name id products { kind id }'
"""
if not temp_fields:
return self
# Create a mapping of base fields to their nested content
field_map = {}
# Process original fields
for field in self.fields:
base_field = field.split(' {')[0].split(' (')[0]
if '{' in field:
nested_content = field[field.find('{') + 1 : field.rfind('}')].strip()
field_map[base_field] = Fields(nested_content)
else:
field_map[base_field] = None
# Process temp fields and merge with existing fields
for field in temp_fields:
base_field = field.split(' {')[0].split(' (')[0]
if '{' in field:
nested_content = field[field.find('{') + 1 : field.rfind('}')].strip()
temp_nested_fields = Fields(nested_content)
if base_field in field_map and field_map[base_field] is not None:
# Merge with existing nested fields
field_map[base_field] = field_map[base_field] + temp_nested_fields
else:
# Create new nested field
field_map[base_field] = temp_nested_fields
# Simple field - add to map if not already present
elif base_field not in field_map:
field_map[base_field] = None
# Reconstruct the field string
result_fields = []
for base_field, nested_fields in field_map.items():
if nested_fields is not None:
result_fields.append(f'{base_field} {{ {nested_fields!s} }}')
else:
result_fields.append(base_field)
return Fields(' '.join(result_fields))
[docs]
@staticmethod
def manage_temp_fields(
data: dict[str, Any] | list[Any],
original_fields: Union[str, set[str], 'Fields'],
temp_fields: list[str],
preserve_fields: list[str] | None = None,
) -> dict[str, Any] | list[Any]:
"""
Remove temporary fields from query results that weren't in original fields.
Args:
data: Query result data
original_fields: Space-separated string, set of field names, or Fields object
temp_fields: List of field names that were temporarily added
preserve_fields: List of field names that should be preserved even if they're temp fields
Returns:
Data structure with temporary fields removed if they weren't in original fields
Example:
>>> data = {
... 'id': '123456789',
... 'name': 'Task',
... 'temp_status': 'active',
... 'board': {'id': '987654321', 'temp_field': 'value'},
... }
>>> original = 'id name board { id }'
>>> temp_fields = ['temp_status', 'temp_field']
>>> Fields.manage_temp_fields(data, original, temp_fields)
{'id': '123456789', 'name': 'Task', 'board': {'id': '987654321'}}
"""
fields_obj = Fields._normalize_original_fields(original_fields)
field_structure = Fields._build_field_structure(fields_obj)
temp_field_info = Fields._analyze_temp_fields(temp_fields)
fields_to_remove = Fields._determine_fields_to_remove(
temp_field_info, field_structure, preserve_fields
)
if not fields_to_remove['top_level'] and not temp_field_info['nested']:
return data
if isinstance(data, list):
return Fields._process_list_data(data, fields_obj, temp_fields)
if isinstance(data, dict):
return Fields._process_dict_data(
data, fields_obj, field_structure, temp_field_info, fields_to_remove
)
return data
@staticmethod
def _normalize_original_fields(
original_fields: Union[str, set[str], 'Fields'],
) -> 'Fields':
"""Convert original_fields to Fields object."""
if isinstance(original_fields, str):
return Fields(original_fields)
if isinstance(original_fields, Fields):
return original_fields
return Fields(' '.join(original_fields))
@staticmethod
def _build_field_structure(
fields_obj: 'Fields',
) -> dict[str, Union['Fields', None]]:
"""Build field structure mapping."""
field_structure = {}
for field in fields_obj.fields:
base_field = field.split(' {')[0].split(' (')[0]
if '{' in field:
nested_content = field[field.find('{') + 1 : field.rfind('}')].strip()
field_structure[base_field] = Fields(nested_content)
else:
field_structure[base_field] = None
return field_structure
@staticmethod
def _analyze_temp_fields(temp_fields: list[str]) -> dict[str, Any]:
"""Analyze temp fields to separate top-level and nested."""
top_level = set()
nested = {}
for temp_field in temp_fields:
if '{' in temp_field:
base_field = temp_field.split(' {')[0].strip()
nested_content = temp_field[
temp_field.find('{') + 1 : temp_field.rfind('}')
].strip()
if base_field not in nested:
nested[base_field] = []
nested[base_field].append(nested_content)
else:
top_level.add(temp_field)
return {'top_level': top_level, 'nested': nested}
@staticmethod
def _determine_fields_to_remove(
temp_field_info: dict[str, Any],
field_structure: dict[str, Union['Fields', None]],
preserve_fields: list[str] | None,
) -> dict[str, Any]:
"""Determine which fields should be removed."""
fields_to_remove = temp_field_info['top_level'] - set(field_structure.keys())
if preserve_fields:
for field in preserve_fields:
if field in fields_to_remove:
fields_to_remove.remove(field)
return {'top_level': fields_to_remove, 'nested': temp_field_info['nested']}
@staticmethod
def _process_list_data(
data: list[Any], fields_obj: 'Fields', temp_fields: list[str]
) -> list[Any]:
"""Process list data recursively."""
return [
Fields.manage_temp_fields(item, fields_obj, temp_fields) for item in data
]
@staticmethod
def _process_dict_data(
data: dict[str, Any],
fields_obj: 'Fields',
field_structure: dict[str, Union['Fields', None]],
temp_field_info: dict[str, Any],
fields_to_remove: dict[str, Any],
) -> dict[str, Any]:
"""Process dictionary data recursively."""
result = {}
for k, v in data.items():
if k in fields_to_remove['top_level']:
continue
if isinstance(v, (dict, list)):
processed = Fields._process_nested_value(
v, k, fields_obj, field_structure, temp_field_info
)
if Fields._should_include_processed_value(processed):
result[k] = processed
else:
result[k] = v
return result
@staticmethod
def _process_nested_value(
value: dict[str, Any] | list[Any],
key: str,
fields_obj: 'Fields',
field_structure: dict[str, Union['Fields', None]],
temp_field_info: dict[str, Any],
) -> dict[str, Any] | list[Any]:
"""Process nested dictionary or list values."""
if key in field_structure and field_structure[key] is not None:
nested_temp_fields = Fields._extract_nested_temp_fields(
temp_field_info['nested'], key
)
field_structure_value = field_structure[key]
if field_structure_value is not None:
return Fields.manage_temp_fields(
value, field_structure_value, nested_temp_fields
)
if key in temp_field_info['nested']:
nested_temp_fields = temp_field_info['nested'].get(key, [])
return Fields.manage_temp_fields(value, Fields(''), nested_temp_fields)
if isinstance(value, list):
nested_temp_fields = Fields._extract_nested_temp_fields(
temp_field_info['nested'], key
)
return Fields.manage_temp_fields(value, fields_obj, nested_temp_fields)
return Fields.manage_temp_fields(value, fields_obj, temp_field_info['nested'])
@staticmethod
def _should_include_processed_value(processed: Any) -> bool:
"""Check if processed value should be included in result."""
if processed or processed == 0 or processed is False:
return not (isinstance(processed, dict) and not processed)
return False
@staticmethod
def _extract_nested_temp_fields(
temp_fields: list[str], parent_field: str
) -> list[str]:
"""
Extract temp fields that are relevant to a specific nested context.
Args:
temp_fields: List of all temp fields
parent_field: The parent field name (e.g., 'products')
Returns:
List of temp fields that apply to the nested context
Example:
>>> Fields._extract_nested_temp_fields(
... ['id', 'products { id }', 'board { id }'], 'products'
... )
['id']
>>> Fields._extract_nested_temp_fields(
... ['id', 'products { id kind }'], 'products'
... )
['id', 'kind']
"""
nested_fields = []
for field in temp_fields:
# Check if this temp field is for the specific parent
if field.startswith(f'{parent_field} {{'):
# Extract the content inside the braces
content_start = field.find('{') + 1
content_end = field.rfind('}')
if content_start > 0 and content_end > content_start:
nested_content = field[content_start:content_end].strip()
# Split the nested content into individual fields
nested_fields.extend(nested_content.split())
elif '{' not in field:
# Top-level fields that should be available in all contexts
nested_fields.append(field)
return nested_fields
@staticmethod
def _parse_args(args_str: str) -> dict[str, Any]:
"""
Parse GraphQL arguments string into a dictionary.
Args:
args_str: String containing GraphQL arguments
Returns:
Dictionary of parsed arguments with their types and values
Example:
>>> fields = Fields('')
>>> fields._parse_args('(limit: 10, ids: ["123", "456"])')
{'limit': 10, 'ids': [('string', '123'), ('string', '456')]}
"""
args_dict = {}
content = args_str.strip('()').strip()
if not content:
return args_dict
parts = Fields._split_args_content(content)
for part in parts:
if ':' in part:
key, value = [x.strip() for x in part.split(':', 1)]
args_dict[key] = Fields._parse_single_value(value)
return args_dict
@staticmethod
def _split_args_content(content: str) -> list[str]:
"""Split argument content into individual parts."""
parts = []
current = []
in_array = 0
in_quotes = False
for char in content:
if char == '[':
in_array += 1
current.append(char)
elif char == ']':
in_array -= 1
current.append(char)
elif char == '"':
in_quotes = not in_quotes
current.append(char)
elif char == ',' and not in_array and not in_quotes:
parts.append(''.join(current).strip())
current = []
else:
current.append(char)
if current:
parts.append(''.join(current).strip())
return parts
@staticmethod
def _parse_single_value(value: str) -> Any:
"""Parse a single argument value."""
if value.startswith('[') and value.endswith(']'):
return Fields._parse_array_value(value)
if value.lower() in ('true', 'false'):
return value.lower() == 'true'
if value.startswith('"') and value.endswith('"'):
return value[1:-1]
if value.isdigit():
return int(value)
try:
return float(value)
except ValueError:
return value
@staticmethod
def _parse_array_value(value: str) -> list[tuple[str, Any]]:
"""Parse array value into list of tuples."""
parsed_values = []
nested_value = value[1:-1].strip()
if not nested_value:
return parsed_values
in_array = 0
in_quotes = False
current = []
for char in nested_value:
if char == '[':
in_array += 1
if in_array == 1:
current = ['[']
else:
current.append(char)
elif char == ']':
in_array -= 1
if in_array == 0:
current.append(']')
parsed_values.append(('array', ''.join(current)))
current = []
elif char == '"':
in_quotes = not in_quotes
current.append(char)
elif char == ',' and not in_array and not in_quotes:
if current:
val = ''.join(current).strip()
parsed_values.append(Fields._parse_array_item(val))
current = []
else:
current.append(char)
if current:
val = ''.join(current).strip()
parsed_values.append(Fields._parse_array_item(val))
return parsed_values
@staticmethod
def _parse_array_item(val: str) -> tuple[str, Any]:
"""Parse individual array item."""
if val.startswith('"') and val.endswith('"'):
return ('string', val.strip('"'))
if val.isdigit():
return ('number', int(val))
return ('string', val)
@staticmethod
def _format_value(value: Any) -> str:
"""
Format a value for GraphQL argument representation.
Args:
value: The value to format (can be list, bool, str, or other types)
Returns:
Formatted string representation suitable for GraphQL arguments
"""
if isinstance(value, list):
formatted = []
for val_type, val in value:
if val_type == 'string':
formatted.append(f'"{val}"')
elif val_type == 'array':
formatted.append(val)
else:
formatted.append(str(val))
return f'[{", ".join(formatted)}]'
if isinstance(value, bool):
return str(value).lower()
if isinstance(value, str):
return f'"{value}"'
return str(value)
@staticmethod
def _validate_fields(fields_str: str) -> None:
"""
Validate the field string format according to GraphQL rules.
Args:
fields_str: String containing fields to validate
Raises:
ValueError: If the field string is malformed
"""
fields_str = fields_str.strip()
if not fields_str:
return
validation_state = Fields._ValidationState()
for i, char in enumerate(fields_str):
Fields._process_validation_char(char, i, fields_str, validation_state)
if validation_state.brace_count != 0:
error_msg = 'Unmatched braces in field string'
raise ValueError(error_msg)
class _ValidationState:
"""State tracker for field validation."""
def __init__(self):
self.brace_count = 0
self.current_field = []
self.last_field = []
@staticmethod
def _process_validation_char(
char: str, index: int, fields_str: str, state: '_ValidationState'
) -> None:
"""Process a single character during validation."""
if char == '{':
Fields._handle_opening_brace(state)
elif char == '}':
Fields._handle_closing_brace(state)
elif char.isspace():
Fields._handle_whitespace(state)
else:
Fields._handle_regular_char(char, state)
Fields._check_invalid_selection_sets(char, index, fields_str)
@staticmethod
def _handle_opening_brace(state: '_ValidationState') -> None:
"""Handle opening brace during validation."""
current_field_str = ''.join(state.last_field).strip()
if not current_field_str:
error_msg = 'Selection set must be preceded by a field name'
raise ValueError(error_msg)
state.brace_count += 1
state.current_field = []
@staticmethod
def _handle_closing_brace(state: '_ValidationState') -> None:
"""Handle closing brace during validation."""
state.brace_count -= 1
if state.brace_count < 0:
error_msg = 'Unmatched closing brace'
raise ValueError(error_msg)
state.current_field = []
state.last_field = []
@staticmethod
def _handle_whitespace(state: '_ValidationState') -> None:
"""Handle whitespace during validation."""
if state.current_field:
state.last_field = state.current_field.copy()
state.current_field = []
@staticmethod
def _handle_regular_char(char: str, state: '_ValidationState') -> None:
"""Handle regular character during validation."""
state.current_field.append(char)
if not char.isspace():
state.last_field = state.current_field.copy()
@staticmethod
def _check_invalid_selection_sets(char: str, index: int, fields_str: str) -> None:
"""Check for invalid selection set syntax."""
if index < len(fields_str) - 1:
next_char = fields_str[index + 1]
if char == '}' and next_char == '{':
error_msg = 'Invalid syntax: multiple selection sets for single field'
raise ValueError(error_msg)
def _parse_fields(self, fields_str: str) -> list[str]:
"""
Parse a GraphQL field string into individual field components.
Args:
fields_str: String containing GraphQL fields with optional arguments and nested structures
Returns:
List of parsed field strings, each representing a complete field with arguments and nested content
"""
fields = []
i = 0
n = len(fields_str)
while i < n:
i = self._skip_whitespace(fields_str, i, n)
if i >= n:
break
field_name, i = self._extract_field_name(fields_str, i, n)
i = self._skip_whitespace(fields_str, i, n)
args_str, i = self._extract_arguments(fields_str, i, n)
i = self._skip_whitespace(fields_str, i, n)
field, i = self._extract_selection_set(
fields_str, i, n, field_name, args_str
)
if field.strip():
fields.append(field)
return fields
def _skip_whitespace(self, fields_str: str, i: int, n: int) -> int:
"""Skip whitespace characters."""
while i < n and fields_str[i].isspace():
i += 1
return i
def _extract_field_name(self, fields_str: str, i: int, n: int) -> tuple[str, int]:
"""Extract field name from the string."""
start = i
while i < n and not fields_str[i].isspace() and fields_str[i] not in '({':
i += 1
field_name = fields_str[start:i].strip()
return field_name, i
def _extract_arguments(self, fields_str: str, i: int, n: int) -> tuple[str, int]:
"""Extract arguments from the string."""
args_str = ''
if i < n and fields_str[i] == '(':
paren_start = i
depth = 1
i += 1
while i < n and depth > 0:
if fields_str[i] == '(':
depth += 1
elif fields_str[i] == ')':
depth -= 1
i += 1
args_str = fields_str[paren_start:i]
if not args_str.endswith(')'):
args_str += ')'
return args_str, i
def _extract_selection_set(
self, fields_str: str, i: int, n: int, field_name: str, args_str: str
) -> tuple[str, int]:
"""Extract selection set and build complete field string."""
if i < n and fields_str[i] == '{':
brace_depth = 1
i += 1
nested_start = i
while i < n and brace_depth > 0:
if fields_str[i] == '{':
brace_depth += 1
elif fields_str[i] == '}':
brace_depth -= 1
i += 1
nested_content = fields_str[nested_start : i - 1].strip()
field = self._build_field_with_selection(
field_name, args_str, nested_content
)
else:
field = self._build_field_without_selection(field_name, args_str)
return field, i
def _build_field_with_selection(
self, field_name: str, args_str: str, nested_content: str
) -> str:
"""Build field string with selection set."""
if args_str:
return f'{field_name} {args_str} {{ {nested_content} }}'
return f'{field_name} {{ {nested_content} }}'
def _build_field_without_selection(self, field_name: str, args_str: str) -> str:
"""Build field string without selection set."""
if args_str:
return f'{field_name} {args_str}'
return field_name
def _deduplicate_nested_fields(self, fields_str: str) -> str:
"""
Deduplicate nested field structures while preserving order.
Args:
fields_str: String containing fields that may have duplicates
Returns:
Deduplicated field string with nested structures preserved
"""
parsed_fields = self._parse_fields(fields_str)
seen = {}
field_pattern = re.compile(r'^(\w+)(\([^)]*\))?(\s*\{.*\})?$')
for field in parsed_fields:
match = field_pattern.match(field.strip())
if not match:
base_name = field.strip()
args = ''
selection = ''
else:
base_name = match.group(1)
args = match.group(2) or ''
selection = match.group(3) or ''
# If this field has a selection set, recursively deduplicate
if selection:
nested_content = self._extract_nested_content(field)
dedup_nested = self._deduplicate_nested_fields(nested_content)
seen[base_name] = f'{base_name}{args} {{ {dedup_nested} }}'
else:
seen[base_name] = f'{base_name}{args}'
return ' '.join(seen.values())
def _merge_field_structures(self, field1: str, field2: str) -> str:
"""
Merge two field structures, combining arguments and nested content.
Args:
field1: First field string to merge
field2: Second field string to merge
Returns:
Merged field string with combined arguments and nested content
"""
base1 = field1.split(' {')[0].strip()
base2 = field2.split(' {')[0].strip()
# Extract arguments if present
args1 = ''
args2 = ''
if '(' in base1:
base1, args1 = base1.split('(', 1)
args1 = '(' + args1
if '(' in base2:
base2, args2 = base2.split('(', 1)
args2 = '(' + args2
# Use the full field strings for canonical order
merged_args = self._merge_args(args1, args2, field1, field2)
# Extract nested content
nested1 = self._extract_nested_content(field1)
nested2 = self._extract_nested_content(field2)
# Merge nested content recursively
if nested1 and nested2:
merged_nested = self._deduplicate_nested_fields(f'{nested1} {nested2}')
elif nested1:
merged_nested = self._deduplicate_nested_fields(nested1)
elif nested2:
merged_nested = self._deduplicate_nested_fields(nested2)
else:
merged_nested = ''
# Build result
result = base1
if merged_args:
result += merged_args
if merged_nested:
result += f' {{ {merged_nested} }}'
return result
def _extract_nested_content(self, field: str) -> str:
"""
Extract the content inside nested braces.
Args:
field: Field string containing nested content
Returns:
Content between the outermost braces, or empty string if no braces found
Example:
>>> fields = Fields('')
>>> fields._extract_nested_content('board { id name }')
'id name'
"""
start = field.find('{')
if start == -1:
return ''
# Count braces to handle nested structures
count = 1
start += 1
for i in range(start, len(field)):
if field[i] == '{':
count += 1
elif field[i] == '}':
count -= 1
if count == 0:
return field[start:i].strip()
return ''
def _process_nested_content(self, content: str) -> str:
"""
Process and deduplicate nested field structures recursively.
Args:
content: String containing nested field structures
Returns:
Processed and deduplicated field string
Example:
>>> fields = Fields('')
>>> fields._process_nested_content('id name board { id id name }')
'id name board { id name }'
"""
if not any(char in content for char in '{}()'):
return self._process_simple_fields(content)
content = ' '.join(content.split())
if not content:
return ''
if content.startswith('...'):
return content
field_name, rest_of_content = self._extract_field_name_and_rest(content)
args, rest_of_content = self._extract_args_from_rest(rest_of_content)
nested_content, rest_of_content = self._extract_nested_from_rest(
rest_of_content
)
processed_nested = self._process_nested_content_recursive(nested_content)
processed_field = self._build_processed_field(
field_name, args, processed_nested
)
if rest_of_content:
additional_fields = self._process_nested_content(rest_of_content)
return f'{processed_field} {additional_fields}'
return processed_field
def _process_simple_fields(self, content: str) -> str:
"""Process simple fields without nesting or arguments."""
unique_fields = []
for field in content.split():
if field not in unique_fields:
unique_fields.append(field)
return ' '.join(unique_fields)
def _extract_field_name_and_rest(self, content: str) -> tuple[str, str]:
"""Extract field name and remaining content."""
field_name = content.split(' ', maxsplit=1)[0].split('(')[0]
rest_of_content = content[len(field_name) :].strip()
return field_name, rest_of_content
def _extract_args_from_rest(self, rest_of_content: str) -> tuple[str, str]:
"""Extract arguments from remaining content."""
args = ''
if rest_of_content.startswith('('):
paren_count = 1
i = 1
while i < len(rest_of_content) and paren_count > 0:
if rest_of_content[i] == '(':
paren_count += 1
elif rest_of_content[i] == ')':
paren_count -= 1
i += 1
args = rest_of_content[:i]
rest_of_content = rest_of_content[i:].strip()
return args, rest_of_content
def _extract_nested_from_rest(self, rest_of_content: str) -> tuple[str, str]:
"""Extract nested content from remaining content."""
nested_content = ''
if rest_of_content.startswith('{'):
brace_count = 1
i = 1
while i < len(rest_of_content) and brace_count > 0:
if rest_of_content[i] == '{':
brace_count += 1
elif rest_of_content[i] == '}':
brace_count -= 1
i += 1
nested_content = rest_of_content[:i]
rest_of_content = rest_of_content[i:].strip()
return nested_content, rest_of_content
def _process_nested_content_recursive(self, nested_content: str) -> str:
"""Process nested content recursively."""
if not nested_content:
return ''
inner_content = nested_content[1:-1].strip()
if inner_content.startswith('{') and inner_content.endswith('}'):
inner_content = inner_content[1:-1].strip()
processed_inner = self._process_nested_content(inner_content)
return f'{{ {processed_inner} }}'
def _build_processed_field(
self, field_name: str, args: str, nested_content: str
) -> str:
"""Build the final processed field string."""
processed_field = field_name
if args:
processed_field += args
if nested_content:
processed_field += f' {nested_content}'
return processed_field
def _parse_structure(self, s: str, start: int) -> tuple[int, str]:
"""
Parse a nested structure starting from a given position.
Args:
s: String containing the structure to parse
start: Starting position in the string
Returns:
Tuple containing (end position, processed content)
Example:
>>> fields = Fields('')
>>> fields._parse_structure('{ id name }', 0)
(11, ' id name ')
"""
brace_count = 1
pos = start
while pos < len(s) and brace_count > 0:
if s[pos] == '{':
brace_count += 1
elif s[pos] == '}':
brace_count -= 1
pos += 1
return pos, s[start : pos - 1]
def _extract_arg_keys_and_array_values_in_order(
self, field_str: str
) -> tuple[list[str], dict[str, list[Any]]]:
"""
Extract the order of argument keys and array values from the full field string.
Args:
field_str: The field string to parse for arguments
Returns:
Tuple of (list of argument keys in order, dict of key -> list of array values in order)
"""
arg_keys = []
array_values = {}
seen_keys = set()
for match in re.finditer(r'\(([^)]*)\)', field_str):
args_content = match.group(1)
self._process_args_content(args_content, arg_keys, array_values, seen_keys)
return arg_keys, array_values
def _process_args_content(
self,
args_content: str,
arg_keys: list[str],
array_values: dict[str, list[Any]],
seen_keys: set[str],
) -> None:
"""Process individual argument content."""
i = 0
n = len(args_content)
while i < n:
i = self._skip_whitespace_in_args(args_content, i, n)
if i >= n:
break
key, i = self._extract_key_from_args(args_content, i, n)
if key and key not in seen_keys:
arg_keys.append(key)
seen_keys.add(key)
i += 1 # skip ':'
value, i = self._extract_value_from_args(args_content, i, n)
self._process_array_value(key, value, array_values)
if i < n and args_content[i] == ',':
i += 1
def _skip_whitespace_in_args(self, args_content: str, i: int, n: int) -> int:
"""Skip whitespace in argument content."""
while i < n and args_content[i].isspace():
i += 1
return i
def _extract_key_from_args(
self, args_content: str, i: int, n: int
) -> tuple[str, int]:
"""Extract key from argument content."""
key_start = i
while i < n and args_content[i] != ':':
i += 1
key = args_content[key_start:i].strip()
return key, i
def _extract_value_from_args(
self, args_content: str, i: int, n: int
) -> tuple[str, int]:
"""Extract value from argument content."""
value_start = i
bracket = 0
brace = 0
paren = 0
in_quotes = False
while i < n:
c = args_content[i]
if c == '"' and (i == 0 or args_content[i - 1] != '\\'):
in_quotes = not in_quotes
elif not in_quotes:
bracket, brace, paren = self._update_bracket_counts(
c, bracket, brace, paren
)
if c == ',' and bracket == 0 and brace == 0 and paren == 0:
break
i += 1
value = args_content[value_start:i].strip()
return value, i
def _update_bracket_counts(
self, char: str, bracket: int, brace: int, paren: int
) -> tuple[int, int, int]:
"""Update bracket counts based on character."""
if char == '[':
bracket += 1
elif char == ']':
bracket -= 1
elif char == '{':
brace += 1
elif char == '}':
brace -= 1
elif char == '(':
paren += 1
elif char == ')':
paren -= 1
return bracket, brace, paren
def _process_array_value(
self, key: str, value: str, array_values: dict[str, list[Any]]
) -> None:
"""Process array value and extract its contents."""
if value.startswith('['):
try:
arr = ast.literal_eval(value)
if isinstance(arr, list):
if key not in array_values:
array_values[key] = []
for v in arr:
if v not in array_values[key]:
array_values[key].append(v)
except (ValueError, SyntaxError):
pass
def _merge_args(
self, args1: str, args2: str, field_str1: str = '', field_str2: str = ''
) -> str:
"""
Merge two sets of GraphQL field arguments, preserving order of first appearance.
Args:
args1: First argument string to merge
args2: Second argument string to merge
field_str1: First field string for order determination
field_str2: Second field string for order determination
Returns:
Merged argument string preserving order of first appearance
"""
if not args1:
return args2
if not args2:
return args1
# Parse both argument strings
args1_dict = self._parse_args(args1)
args2_dict = self._parse_args(args2)
# Use the order from the concatenated field strings
concat_fields = (field_str1 or '') + ' ' + (field_str2 or '')
arg_keys, _ = self._extract_arg_keys_and_array_values_in_order(concat_fields)
seen = set()
ordered_keys = []
for key in arg_keys:
if key not in seen:
ordered_keys.append(key)
seen.add(key)
# Merge arguments using the order of first appearance
merged = {}
for key in ordered_keys:
v1 = args1_dict.get(key)
v2 = args2_dict.get(key)
if isinstance(v1, list) and isinstance(v2, list):
merged_list = self._merge_arrays_preserving_order(
v1, v2, concat_fields, concat_fields, key=str(key)
)
merged[key] = merged_list
elif v1 is not None and v2 is not None and v1 != v2:
merged[key] = v2
elif v1 is not None:
merged[key] = v1
else:
merged[key] = v2
if merged:
formatted_args = [
f'{key}: {self._format_value(merged[key])}' for key in ordered_keys
]
return f'({", ".join(formatted_args)})'
return ''
def _merge_arrays_preserving_order(
self,
arr1: list[Any],
arr2: list[Any],
field_str1: str = '',
field_str2: str = '',
key: str = '',
) -> list[Any]:
"""
Merge two arrays, preserving the order of first appearance in the full field string.
Args:
arr1: First array of tuples (type, value)
arr2: Second array of tuples (type, value)
field_str1: First field string for order determination
field_str2: Second field string for order determination
key: Key name for array order lookup
Returns:
Merged array of tuples (type, value) preserving order
"""
seen = set()
merged = []
# Use the order from the concatenated field strings
concat_fields = (field_str1 or '') + ' ' + (field_str2 or '')
_, arr_order = self._extract_arg_keys_and_array_values_in_order(concat_fields)
order = arr_order.get(key, [])
for v in order:
v_key = str(v)
if v_key not in seen:
merged.append(('string', v) if isinstance(v, str) else ('int', v))
seen.add(v_key)
# Add any remaining from arr1 and arr2
for arr in (arr1, arr2):
for v in arr:
v_key = str(v[1])
if v_key not in seen:
merged.append(v)
seen.add(v_key)
return merged