Source code for globus_sdk.tokenstorage.sqlite_adapter

from __future__ import annotations

import json
import pathlib
import sqlite3
import typing as t

from globus_sdk.services.auth import OAuthTokenResponse
from globus_sdk.tokenstorage.base import FileAdapter
from globus_sdk.version import __version__


[docs] class SQLiteAdapter(FileAdapter): """ :param dbname: The name of the DB file to write to and read from. If the string ":memory:" is used, an in-memory database will be used instead. :param namespace: A "namespace" to use within the database. All operations will be performed indexed under this string, so that multiple distinct sets of tokens may be stored in the database. You might use usernames as the namespace to implement a multi-user system, or profile names to allow multiple Globus accounts to be used by a single user. :param connect_params: A pass-through dictionary for fine-tuning the SQLite connection. A storage adapter for storing tokens in sqlite databases. SQLite adapters are for more complex cases, where there may be multiple users or "profiles" in play, and additionally a dynamic set of resource servers which need to be stored in an extensible way. The ``namespace`` is a user-supplied way of partitioning data, and any token responses passed to the storage adapter are broken apart and stored indexed by *resource_server*. If you have a more complex use-case in which this scheme will be insufficient, you should encode that in your choice of ``namespace`` values. The ``connect_params`` is an optional dictionary whose elements are passed directly to the underlying ``sqlite3.connect()`` method, enabling developers to fine-tune the connection to the SQLite database. Refer to the ``sqlite3.connect()`` documentation for SQLite-specific parameters. """ def __init__( self, dbname: pathlib.Path | str, *, namespace: str = "DEFAULT", connect_params: dict[str, t.Any] | None = None, ): self.filename = self.dbname = str(dbname) self.namespace = namespace self._connection = self._init_and_connect(connect_params) def _is_memory_db(self) -> bool: return self.dbname == ":memory:" def _init_and_connect( self, connect_params: dict[str, t.Any] | None, ) -> sqlite3.Connection: init_tables = self._is_memory_db() or not self.file_exists() connect_params = connect_params or {} if init_tables and not self._is_memory_db(): # real file needs to be created with self.user_only_umask(): conn = sqlite3.connect(self.dbname, **connect_params) else: conn = sqlite3.connect(self.dbname, **connect_params) if init_tables: conn.executescript( """ CREATE TABLE config_storage ( namespace VARCHAR NOT NULL, config_name VARCHAR NOT NULL, config_data_json VARCHAR NOT NULL, PRIMARY KEY (namespace, config_name) ); CREATE TABLE token_storage ( namespace VARCHAR NOT NULL, resource_server VARCHAR NOT NULL, token_data_json VARCHAR NOT NULL, PRIMARY KEY (namespace, resource_server) ); CREATE TABLE sdk_storage_adapter_internal ( attribute VARCHAR NOT NULL, value VARCHAR NOT NULL, PRIMARY KEY (attribute) ); """ ) # mark the version which was used to create the DB # also mark the "database schema version" in case we ever need to handle # graceful upgrades conn.executemany( "INSERT INTO sdk_storage_adapter_internal(attribute, value) " "VALUES (?, ?)", [ ("globus-sdk.version", __version__), ("globus-sdk.database_schema_version", "1"), ], ) conn.commit() return conn
[docs] def close(self) -> None: """ Close the underlying database connection. """ self._connection.close()
[docs] def store_config( self, config_name: str, config_dict: t.Mapping[str, t.Any] ) -> None: """ :param config_name: A string name for the configuration value :param config_dict: A dict of config which will be stored serialized as JSON Store a config dict under the current namespace in the config table. Allows arbitrary configuration data to be namespaced under the namespace, so that application config may be associated with the stored tokens. Uses sqlite "REPLACE" to perform the operation. """ self._connection.execute( "REPLACE INTO config_storage(namespace, config_name, config_data_json) " "VALUES (?, ?, ?)", (self.namespace, config_name, json.dumps(config_dict)), ) self._connection.commit()
[docs] def read_config(self, config_name: str) -> dict[str, t.Any] | None: """ :param config_name: A string name for the configuration value Load a config dict under the current namespace in the config table. If no value is found, returns None """ row = self._connection.execute( "SELECT config_data_json FROM config_storage " "WHERE namespace=? AND config_name=?", (self.namespace, config_name), ).fetchone() if row is None: return None config_data_json = row[0] val = json.loads(config_data_json) if not isinstance(val, dict): raise ValueError("reading config data and got non-dict result") return val
[docs] def remove_config(self, config_name: str) -> bool: """ :param config_name: A string name for the configuration value Delete a previously stored configuration value. Returns True if data was deleted, False if none was found to delete. """ rowcount = self._connection.execute( "DELETE FROM config_storage WHERE namespace=? AND config_name=?", (self.namespace, config_name), ).rowcount self._connection.commit() return rowcount != 0
[docs] def store(self, token_response: OAuthTokenResponse) -> None: """ :param token_response: a globus_sdk.OAuthTokenResponse object containing token data to store By default, ``self.on_refresh`` is just an alias for this function. Given a token response, extract the token data for the resource servers and write it to ``self.dbname``, stored under the adapter's namespace """ pairs = [] for rs_name, token_data in token_response.by_resource_server.items(): pairs.append((rs_name, token_data)) self._connection.executemany( "REPLACE INTO token_storage(namespace, resource_server, token_data_json) " "VALUES(?, ?, ?)", [ (self.namespace, rs_name, json.dumps(token_data)) for (rs_name, token_data) in pairs ], ) self._connection.commit()
[docs] def get_token_data(self, resource_server: str) -> dict[str, t.Any] | None: """ Load the token data JSON for a specific resource server. In the event that the server cannot be found in the DB, return None. :param resource_server: The name of a resource server to lookup in the DB, as one would use as a key in OAuthTokenResponse.by_resource_server """ for row in self._connection.execute( "SELECT token_data_json FROM token_storage " "WHERE namespace=? AND resource_server=?", (self.namespace, resource_server), ): (token_data_json,) = row val = json.loads(token_data_json) if not isinstance(val, dict): raise ValueError("data error: token data was not saved as a dict") return val return None
[docs] def get_by_resource_server(self) -> dict[str, t.Any]: """ Load the token data JSON and return the resulting dict objects, indexed by resource server. This should look identical to an OAuthTokenResponse.by_resource_server in format and content. (But it is not attached to a token response object.) """ data = {} for row in self._connection.execute( "SELECT resource_server, token_data_json " "FROM token_storage WHERE namespace=?", (self.namespace,), ): resource_server, token_data_json = row data[resource_server] = json.loads(token_data_json) return data
[docs] def remove_tokens_for_resource_server(self, resource_server: str) -> bool: """ Given a resource server to target, delete tokens for that resource server from the database (limited to the current namespace). You can use this as part of a logout command implementation, loading token data as a dict, and then deleting the data for each resource server. Returns True if token data was deleted, False if none was found to delete. :param resource_server: The name of the resource server to remove from the DB, as one would use as a key in OAuthTokenResponse.by_resource_server """ rowcount = self._connection.execute( "DELETE FROM token_storage WHERE namespace=? AND resource_server=?", (self.namespace, resource_server), ).rowcount self._connection.commit() return rowcount != 0
[docs] def iter_namespaces( self, *, include_config_namespaces: bool = False ) -> t.Iterator[str]: """ Iterate over the namespaces which are in use in this storage adapter's database. The presence of tokens for a namespace does not indicate that those tokens are valid, only that they have been stored and have not been removed. :param include_config_namespaces: Include namespaces which appear only in the configuration storage section of the sqlite database. By default, only namespaces which were used for token storage will be returned """ seen: set[str] = set() for row in self._connection.execute( "SELECT DISTINCT namespace FROM token_storage;" ): namespace = row[0] seen.add(namespace) yield namespace if include_config_namespaces: for row in self._connection.execute( "SELECT DISTINCT namespace FROM config_storage;" ): namespace = row[0] if namespace not in seen: yield namespace