# pystackql/core/stackql.py
"""
Main StackQL class for PyStackQL.
This module provides the main StackQL class that serves as the primary
interface for executing StackQL queries.
"""
import os
import json
from .server import ServerConnection
from .query import QueryExecutor, AsyncQueryExecutor
from .output import OutputFormatter
from ..utils import setup_local_mode
[docs]
class StackQL:
"""A class representing an instance of the StackQL query engine.
:param server_mode: Connect to a StackQL server
(defaults to `False`)
:type server_mode: bool, optional
:param server_address: The address of the StackQL server
(`server_mode` only, defaults to `'127.0.0.1'`)
:type server_address: str, optional
:param server_port: The port of the StackQL server
(`server_mode` only, defaults to `5466`)
:type server_port: int, optional
:param backend_storage_mode: Specifies backend storage mode, options are 'memory' and 'file'
(defaults to `'memory'`, this option is ignored in `server_mode`)
:type backend_storage_mode: str, optional
:param backend_file_storage_location: Specifies location for database file, only applicable when `backend_storage_mode` is 'file'
(defaults to `'{cwd}/stackql.db'`, this option is ignored in `server_mode`)
:type backend_file_storage_location: str, optional
:param output: Determines the format of the output, options are 'dict', 'pandas', 'csv', and 'markdownkv'
(defaults to `'dict'`, `'csv'` is not supported in `server_mode`, 'markdownkv' is optimized for LLM understanding)
:type output: str, optional
:param sep: Seperator for values in CSV output
(defaults to `','`, `output='csv'` only)
:type sep: str, optional
:param header: Show column headers in CSV output
(defaults to `False`, `output='csv'` only)
:type header: bool, optional
:param download_dir: The download directory for the StackQL executable
(defaults to `site.getuserbase()`, not supported in `server_mode`)
:type download_dir: str, optional
:param app_root: Application config and cache root path
(defaults to `{cwd}/.stackql`)
:type app_root: str, optional
:param execution_concurrency_limit: Concurrency limit for query execution
(defaults to `-1` - unlimited)
:type execution_concurrency_limit: int, optional
:param dataflow_dependency_max: Max dataflow weakly connected components for a given query
(defaults to `50`)
:type dataflow_dependency_max: int, optional
:param dataflow_components_max: Max dataflow components for a given query
(defaults to `50`)
:type dataflow_components_max: int, optional
:param api_timeout: API timeout
(defaults to `45`, not supported in `server_mode`)
:type api_timeout: int, optional
:param proxy_host: HTTP proxy host
(not supported in `server_mode`)
:type proxy_host: str, optional
:param proxy_password: HTTP proxy password
(only applicable when `proxy_host` is set)
:type proxy_password: str, optional
:param proxy_port: HTTP proxy port
(defaults to `-1`, only applicable when `proxy_host` is set)
:type proxy_port: int, optional
:param proxy_scheme: HTTP proxy scheme
(defaults to `'http'`, only applicable when `proxy_host` is set)
:type proxy_scheme: str, optional
:param proxy_user: HTTP proxy user
(only applicable when `proxy_host` is set)
:type proxy_user: str, optional
:param max_results: Max results per HTTP request
(defaults to `-1` for no limit, not supported in `server_mode`)
:type max_results: int, optional
:param page_limit: Max pages of results that will be returned per resource
(defaults to `20`, not supported in `server_mode`)
:type page_limit: int, optional
:param max_depth: Max depth for indirect queries: views and subqueries
(defaults to `5`, not supported in `server_mode`)
:type max_depth: int, optional
:param custom_registry: Custom StackQL provider registry URL
(e.g. https://registry-dev.stackql.app/providers) supplied using the class constructor
:type custom_registry: str, optional
:param custom_auth: Custom StackQL provider authentication object supplied using the class constructor
(not supported in `server_mode`)
:type custom_auth: dict, optional
:param debug: Enable debug logging
(defaults to `False`)
:type debug: bool, optional
:param debug_log_file: Path to debug log file
(defaults to `~/.pystackql/debug.log`, only available if debug is `True`)
:type debug_log_file: str, optional
--- Read-Only Attributes ---
:param platform: The operating system platform
:type platform: str, readonly
:param package_version: The version number of the `pystackql` Python package
:type package_version: str, readonly
:param version: The version number of the `stackql` executable
(not supported in `server_mode`)
:type version: str, readonly
:param params: A list of command-line parameters passed to the `stackql` executable
(not supported in `server_mode`)
:type params: list, readonly
:param bin_path: The full path of the `stackql` executable
(not supported in `server_mode`).
:type bin_path: str, readonly
:param sha: The commit (short) sha for the installed `stackql` binary build
(not supported in `server_mode`).
:type sha: str, readonly
"""
[docs]
def __init__(self,
server_mode=False,
server_address='127.0.0.1',
server_port=5466,
output='dict',
sep=',',
header=False,
debug=False,
debug_log_file=None,
**kwargs):
"""Constructor method
"""
# Get package information from utils
from ..utils import get_platform, get_package_version
self.platform, this_os = get_platform()
self.package_version = get_package_version("pystackql")
# Setup debug logging
self.debug = debug
if debug:
if debug_log_file is None:
self.debug_log_file = os.path.join(os.path.expanduser("~"), '.pystackql', 'debug.log')
else:
self.debug_log_file = debug_log_file
# Check if the path exists. If not, try to create it.
log_dir = os.path.dirname(self.debug_log_file)
if not os.path.exists(log_dir):
try:
os.makedirs(log_dir, exist_ok=True)
except OSError as e:
raise ValueError(f"Unable to create the log directory {log_dir}: {str(e)}")
else:
self.debug_log_file = None
# Setup output formatter
self.local_output_formatter = OutputFormatter(output)
self.output = output.lower()
# Server mode setup
self.server_mode = server_mode
if self.server_mode and self.output == 'csv':
raise ValueError("CSV output is not supported in server mode, use 'dict' or 'pandas' instead.")
elif self.output == 'csv':
self.sep = sep
self.header = header
if self.server_mode:
# Server mode - connect to a server via the postgres wire protocol
self.server_address = server_address
self.server_port = server_port
self.server_connection = ServerConnection(server_address, server_port)
else:
# Local mode - execute the binary locally
# Get all parameters from local variables (excluding 'self')
local_params = locals().copy()
local_params.pop('self')
# Set up local mode - this sets the instance attributes and returns params
self.params = setup_local_mode(self, **local_params)
# Initialize query executor
self.local_query_executor = QueryExecutor(
self.bin_path,
self.params,
self.debug,
self.debug_log_file
)
# Initialize async query executor (only for local mode)
if not self.server_mode:
self.async_executor = AsyncQueryExecutor(
self._sync_query_wrapper,
output_format=self.output
)
def _sync_query_wrapper(self, query):
"""Wrapper for synchronous query execution used by AsyncQueryExecutor.
This method is exclusively used for local mode async queries.
Server mode is not supported for async queries.
Args:
query (str): The query to execute
Returns:
The formatted query result
"""
# Execute query
query_result = self.local_query_executor.execute(query)
# Format the result using the OutputFormatter
# This will handle SQL type objects through the _format_data method
return self.local_output_formatter.format_query_result(query_result)
[docs]
def properties(self):
"""Retrieves the properties of the StackQL instance.
This method collects all the attributes of the StackQL instance and
returns them in a dictionary format.
:return: A dictionary containing the properties of the StackQL instance.
:rtype: dict
Example:
::
{
"platform": "Darwin x86_64 (macOS-12.0.1-x86_64-i386-64bit), Python 3.10.9",
"output": "dict",
...
}
"""
props = {}
for var in vars(self):
# Skip internal objects
if var.startswith('_') or var in ['local_output_formatter', 'local_query_executor', 'async_executor', 'binary_manager', 'server_connection']:
continue
props[var] = getattr(self, var)
return props
[docs]
def upgrade(self, showprogress=True):
"""Upgrades the StackQL binary to the latest version available.
This method initiates an upgrade of the StackQL binary. Post-upgrade,
it updates the `version` and `sha` attributes of the StackQL instance
to reflect the newly installed version.
:param showprogress: Indicates if progress should be displayed during the upgrade. Defaults to True.
:type showprogress: bool, optional
:return: A message indicating the new version of StackQL post-upgrade.
:rtype: str
"""
if self.server_mode:
raise ValueError("The upgrade method is not supported in server mode.")
# Use the binary manager to upgrade
message = self.binary_manager.upgrade(showprogress)
# Update the version and sha attributes
self.version = self.binary_manager.version
self.sha = self.binary_manager.sha
return message
[docs]
def executeStmt(self, query, custom_auth=None, env_vars=None, **kwargs):
"""Executes a query using the StackQL instance and returns the output as a string.
This is intended for operations which do not return a result set, for example a mutation
operation such as an `INSERT` or a `DELETE` or life cycle method such as an `EXEC` operation
or a `REGISTRY PULL` operation.
This method determines the mode of operation (server_mode or local execution) based
on the `server_mode` attribute of the instance. If `server_mode` is True, it runs the query
against the server. Otherwise, it executes the query using a subprocess.
:param query: The StackQL query string to be executed.
:type query: str, list of dict objects, or Pandas DataFrame
:param custom_auth: Custom authentication dictionary.
:type custom_auth: dict, optional
:param env_vars: Command-specific environment variables for this execution.
:type env_vars: dict, optional
:param kwargs: Additional keyword arguments that override constructor parameters for this execution.
Supported overrides: output, sep, header, auth, custom_registry, max_results, page_limit,
max_depth, api_timeout, http_debug, proxy_host, proxy_port, proxy_user, proxy_password,
proxy_scheme, backend_storage_mode, backend_file_storage_location, app_root,
execution_concurrency_limit, dataflow_dependency_max, dataflow_components_max
:type kwargs: optional
:return: The output result of the query in string format. If in `server_mode`, it
returns a JSON string representation of the result.
:rtype: dict, Pandas DataFrame or str (for `csv` output)
Example:
>>> from pystackql import StackQL
>>> stackql = StackQL()
>>> stackql_query = "REGISTRY PULL okta"
>>> result = stackql.executeStmt(stackql_query)
>>> result
"""
if self.server_mode:
# Server mode: handle output override
output_format = kwargs.get('output', self.output)
result = self.server_connection.execute_query(query, is_statement=True)
# Format result based on output type
if output_format == 'pandas':
import pandas as pd
return pd.DataFrame(result)
elif output_format == 'csv':
# Return the string representation of the result
return result[0]['message']
elif output_format == 'markdownkv':
from .output import OutputFormatter
temp_formatter = OutputFormatter('markdownkv')
# Extract message from result
message = result[0].get('message', '') if result else ''
return temp_formatter._format_markdownkv_statement(message)
else:
return result
else:
# Local mode: handle parameter overrides
override_params = None
output_format = kwargs.get('output', self.output)
# If custom_auth is provided as kwarg, use it
if 'auth' in kwargs:
custom_auth = kwargs['auth']
# Generate override params if kwargs provided
if kwargs:
from ..utils import generate_params_for_execution
override_params = generate_params_for_execution(self._base_kwargs, kwargs)
# Execute the query
result = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars, override_params=override_params)
# Format the result with appropriate output formatter
if output_format != self.output:
# Create a temporary formatter for this execution
from .output import OutputFormatter
temp_formatter = OutputFormatter(output_format)
return temp_formatter.format_statement_result(result)
else:
return self.local_output_formatter.format_statement_result(result)
[docs]
def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None, **kwargs):
"""
Executes a StackQL query and returns the output based on the specified output format.
This method supports execution both in server mode and locally using a subprocess. In server mode,
the query is sent to a StackQL server, while in local mode, it runs the query using a local binary.
:param query: The StackQL query string to be executed.
:type query: str
:param suppress_errors: If set to True, the method will return an empty list if an error occurs.
:type suppress_errors: bool, optional
:param custom_auth: Custom authentication dictionary.
:type custom_auth: dict, optional
:param env_vars: Command-specific environment variables for this execution.
:type env_vars: dict, optional
:param kwargs: Additional keyword arguments that override constructor parameters for this execution.
Supported overrides: output, sep, header, auth, custom_registry, max_results, page_limit,
max_depth, api_timeout, http_debug, proxy_host, proxy_port, proxy_user, proxy_password,
proxy_scheme, backend_storage_mode, backend_file_storage_location, app_root,
execution_concurrency_limit, dataflow_dependency_max, dataflow_components_max
:type kwargs: optional
:return: The output of the query, which can be a list of dictionary objects, a Pandas DataFrame,
or a raw CSV string, depending on the configured output format.
:rtype: list(dict) | pd.DataFrame | str
:raises ValueError: If an unsupported output format is specified.
:example:
>>> stackql = StackQL()
>>> query = '''
... SELECT SPLIT_PART(machineType, '/', -1) as machine_type, status, COUNT(*) as num_instances
... FROM google.compute.instances
... WHERE project = 'stackql-demo' AND zone = 'australia-southeast1-a'
... GROUP BY machine_type, status HAVING COUNT(*) > 2
... '''
>>> result = stackql.execute(query)
"""
if self.server_mode:
# Server mode: handle output override
output_format = kwargs.get('output', self.output)
result = self.server_connection.execute_query(query)
# Format result based on output type
if output_format == 'pandas':
import pandas as pd
import json
from io import StringIO
json_str = json.dumps(result)
return pd.read_json(StringIO(json_str))
elif output_format == 'csv':
raise ValueError("CSV output is not supported in server_mode.")
elif output_format == 'markdownkv':
from .output import OutputFormatter
temp_formatter = OutputFormatter('markdownkv')
return temp_formatter._format_markdownkv(result)
else: # Assume 'dict' output
return result
else:
# Local mode: handle parameter overrides
override_params = None
output_format = kwargs.get('output', self.output)
http_debug = kwargs.get('http_debug', self.http_debug)
# If custom_auth is provided as kwarg, use it
if 'auth' in kwargs:
custom_auth = kwargs['auth']
# Generate override params if kwargs provided
if kwargs:
from ..utils import generate_params_for_execution
override_params = generate_params_for_execution(self._base_kwargs, kwargs)
# Apply HTTP debug setting
if http_debug:
suppress_errors = False
# Execute the query
output = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars, override_params=override_params)
# Format the result with appropriate output formatter
if output_format != self.output:
# Create a temporary formatter for this execution
from .output import OutputFormatter
temp_formatter = OutputFormatter(output_format)
return temp_formatter.format_query_result(output, suppress_errors)
else:
return self.local_output_formatter.format_query_result(output, suppress_errors)
[docs]
async def executeQueriesAsync(self, queries):
"""Executes multiple StackQL queries asynchronously using the current StackQL instance.
This method utilizes an asyncio event loop to concurrently run a list of provided
StackQL queries. Each query is executed independently, and the combined results of
all the queries are returned as a list of JSON objects if 'dict' output mode is selected,
or as a concatenated DataFrame if 'pandas' output mode is selected.
The order of the results in the returned list or DataFrame may not necessarily
correspond to the order of the queries in the input list due to the asynchronous nature
of execution.
:param queries: A list of StackQL query strings to be executed concurrently.
:type queries: list[str], required
:return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame.
:rtype: list[dict] or pd.DataFrame
:raises ValueError: If server_mode is True (async is only supported in local mode).
:raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas').
Example:
>>> from pystackql import StackQL
>>> stackql = StackQL()
>>> queries = [
>>> \"\"\"SELECT '%s' as region, instance_type, COUNT(*) as num_instances
... FROM aws.ec2.instances
... WHERE region = '%s'
... GROUP BY instance_type\"\"\" % (region, region)
>>> for region in regions ]
>>> result = stackql.executeQueriesAsync(queries)
Note:
- This method is only supported in local mode.
"""
if self.server_mode:
raise ValueError(
"The executeQueriesAsync method is not supported in server mode. "
"Please use the standard execute method with individual queries instead, "
"or switch to local mode if you need to run multiple queries concurrently."
)
# Verify that async_executor is available (should only be initialized in local mode)
if not hasattr(self, 'async_executor'):
raise RuntimeError("Async executor not initialized. This should not happen.")
return await self.async_executor.execute_queries(queries)
[docs]
def test_connection(self):
"""Tests if the server connection is working by executing a simple query.
This method is only valid when server_mode=True.
Returns:
bool: True if the connection is working, False otherwise.
Raises:
ValueError: If called when not in server mode.
"""
if not self.server_mode:
raise ValueError("The test_connectivity method is only available in server mode.")
try:
result = self.server_connection.execute_query("SELECT 'test' as test_value")
return (isinstance(result, list) and
len(result) == 1 and
'test_value' in result[0] and
result[0]['test_value'] == 'test')
except Exception as e:
if self.debug:
print(f"Connection test failed: {str(e)}")
return False