lentiq.lentiq_dict module
import lentiq from urllib.parse import urlparse from lentiq.api_client import ApiClient from lentiq.configuration import Configuration from lentiq.models.project_bucket_information import ProjectBucketInformation from lentiq.models.datapool_shared_bucket_information import DatapoolSharedBucketInformation from lentiq.models.eks_credentials import EKSCredentials from lentiq.models.gkep12_credentials import GKEP12Credentials from lentiq.models.existing_cluster_basic_auth_credentials import ExistingClusterBasicAuthCredentials from lentiq.models.gkejson_credentials import GKEJSONCredentials from lentiq.models.existing_cluster_certificate_credentials import ExistingClusterCertificateCredentials from lentiq.storage_wrappers.gcs_wrapper import GCSWrapper from lentiq.storage_wrappers.s3_wrapper import S3Wrapper import os class LentiqDict: __instance = None @staticmethod def get_instance(): if LentiqDict.__instance is None: LentiqDict() return LentiqDict.__instance _theBucketList = {} def getenv_or_raise(self,var_name): value=os.getenv(var_name) if value is None: raise Exception("{} environment variable is not set. \ Excepting API_ENDPOINT,AUTH_APIKEY,BDL_DEFAULT_PATH,PROJECT,DATAPOOL_NAME \ variables to be configured before using this client. \ Refer to the documentation for more details".format(var_name)) return value def __init__(self): if LentiqDict.__instance is not None: raise Exception("LentiqDict is a singleton!") else: self._LENTIQ_API_ENDPOINT = self.getenv_or_raise("API_ENDPOINT") self._DEFAULT_AUTHORIZATION = self.getenv_or_raise("AUTH_APIKEY") self._BDL_DEFAULT_PATH=self.getenv_or_raise("BDL_DEFAULT_PATH") self._PROJECT = self.getenv_or_raise("PROJECT") self._DATAPOOL_NAME = self.getenv_or_raise("DATAPOOL_NAME") LentiqDict.__instance = self def get_storage_wrapper(self, url): parsed_url = urlparse(url) bucket_domain = parsed_url.netloc if bucket_domain not in self._theBucketList: self.authenticate_bucket(url, authorization=None, force=False) if bucket_domain in self._theBucketList: return self._theBucketList[bucket_domain] else: return None def authenticate_bucket(self, url, authorization=None, force=False): if authorization is None: authorization = self._DEFAULT_AUTHORIZATION parsed_url = urlparse(url) bucket_domain = parsed_url.netloc bucket_domain_parts = bucket_domain.split('.') if bucket_domain in self._theBucketList and force is False: raise Exception("Bucket with url " + url + " already initialized.") conf = Configuration() conf.host = self._LENTIQ_API_ENDPOINT api_client = ApiClient(conf) if len(bucket_domain_parts) == 1: datapools_api = lentiq.DatapoolPublicApiControllerApi(api_client) datapool_name = bucket_domain_parts[0] datapool_shared_bucket_information = datapools_api.get_datapool_shared_bucket_information( authorization, datapool_name ) if type(datapool_shared_bucket_information) is not DatapoolSharedBucketInformation: raise Exception("API returned unexpected response") bucket_information = datapool_shared_bucket_information elif len(bucket_domain_parts) == 2: projects_api = lentiq.ProjectPublicApiControllerApi(api_client) datapool_name = bucket_domain_parts[0] project_name = bucket_domain_parts[1] project_bucket_information = projects_api.get_project_bucket_information( authorization, datapool_name, project_name ) if type(project_bucket_information) is not ProjectBucketInformation: raise Exception("API returned unexpected response") bucket_information = project_bucket_information else: raise Exception("Unsupported bucket path type") self._theBucketList[bucket_domain] = self._get_storage_wrapper_for_bucket_information(bucket_information) def _get_storage_wrapper_for_bucket_information(self, bucket_information): bucket_uri = bucket_information.bucket_uri credentials = bucket_information.credential t = type(credentials) if t is EKSCredentials: return S3Wrapper(credentials, bucket_uri) elif t is GKEP12Credentials: raise Exception("Bucket type not yet supported!") elif t is ExistingClusterBasicAuthCredentials: raise Exception("Bucket type not yet supported!") elif t is GKEJSONCredentials: return GCSWrapper(credentials, bucket_uri) elif t is ExistingClusterCertificateCredentials: raise Exception("Bucket type not yet supported!") else: raise Exception("Unknown bucket type!") def get_default_path(self): return self._BDL_DEFAULT_PATH def get_default_api_client(self): conf = Configuration() conf.host = self._LENTIQ_API_ENDPOINT return ApiClient(conf) def get_default_project_name(self): return self._PROJECT def get_default_data_pool_name(self): return self._DATAPOOL_NAME def get_default_auth(self): return self._DEFAULT_AUTHORIZATION
Classes
class LentiqDict
class LentiqDict: __instance = None @staticmethod def get_instance(): if LentiqDict.__instance is None: LentiqDict() return LentiqDict.__instance _theBucketList = {} def getenv_or_raise(self,var_name): value=os.getenv(var_name) if value is None: raise Exception("{} environment variable is not set. \ Excepting API_ENDPOINT,AUTH_APIKEY,BDL_DEFAULT_PATH,PROJECT,DATAPOOL_NAME \ variables to be configured before using this client. \ Refer to the documentation for more details".format(var_name)) return value def __init__(self): if LentiqDict.__instance is not None: raise Exception("LentiqDict is a singleton!") else: self._LENTIQ_API_ENDPOINT = self.getenv_or_raise("API_ENDPOINT") self._DEFAULT_AUTHORIZATION = self.getenv_or_raise("AUTH_APIKEY") self._BDL_DEFAULT_PATH=self.getenv_or_raise("BDL_DEFAULT_PATH") self._PROJECT = self.getenv_or_raise("PROJECT") self._DATAPOOL_NAME = self.getenv_or_raise("DATAPOOL_NAME") LentiqDict.__instance = self def get_storage_wrapper(self, url): parsed_url = urlparse(url) bucket_domain = parsed_url.netloc if bucket_domain not in self._theBucketList: self.authenticate_bucket(url, authorization=None, force=False) if bucket_domain in self._theBucketList: return self._theBucketList[bucket_domain] else: return None def authenticate_bucket(self, url, authorization=None, force=False): if authorization is None: authorization = self._DEFAULT_AUTHORIZATION parsed_url = urlparse(url) bucket_domain = parsed_url.netloc bucket_domain_parts = bucket_domain.split('.') if bucket_domain in self._theBucketList and force is False: raise Exception("Bucket with url " + url + " already initialized.") conf = Configuration() conf.host = self._LENTIQ_API_ENDPOINT api_client = ApiClient(conf) if len(bucket_domain_parts) == 1: datapools_api = lentiq.DatapoolPublicApiControllerApi(api_client) datapool_name = bucket_domain_parts[0] datapool_shared_bucket_information = datapools_api.get_datapool_shared_bucket_information( authorization, datapool_name ) if type(datapool_shared_bucket_information) is not DatapoolSharedBucketInformation: raise Exception("API returned unexpected response") bucket_information = datapool_shared_bucket_information elif len(bucket_domain_parts) == 2: projects_api = lentiq.ProjectPublicApiControllerApi(api_client) datapool_name = bucket_domain_parts[0] project_name = bucket_domain_parts[1] project_bucket_information = projects_api.get_project_bucket_information( authorization, datapool_name, project_name ) if type(project_bucket_information) is not ProjectBucketInformation: raise Exception("API returned unexpected response") bucket_information = project_bucket_information else: raise Exception("Unsupported bucket path type") self._theBucketList[bucket_domain] = self._get_storage_wrapper_for_bucket_information(bucket_information) def _get_storage_wrapper_for_bucket_information(self, bucket_information): bucket_uri = bucket_information.bucket_uri credentials = bucket_information.credential t = type(credentials) if t is EKSCredentials: return S3Wrapper(credentials, bucket_uri) elif t is GKEP12Credentials: raise Exception("Bucket type not yet supported!") elif t is ExistingClusterBasicAuthCredentials: raise Exception("Bucket type not yet supported!") elif t is GKEJSONCredentials: return GCSWrapper(credentials, bucket_uri) elif t is ExistingClusterCertificateCredentials: raise Exception("Bucket type not yet supported!") else: raise Exception("Unknown bucket type!") def get_default_path(self): return self._BDL_DEFAULT_PATH def get_default_api_client(self): conf = Configuration() conf.host = self._LENTIQ_API_ENDPOINT return ApiClient(conf) def get_default_project_name(self): return self._PROJECT def get_default_data_pool_name(self): return self._DATAPOOL_NAME def get_default_auth(self): return self._DEFAULT_AUTHORIZATION
Ancestors (in MRO)
- LentiqDict
- builtins.object
Static methods
def __init__(
self)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self): if LentiqDict.__instance is not None: raise Exception("LentiqDict is a singleton!") else: self._LENTIQ_API_ENDPOINT = self.getenv_or_raise("API_ENDPOINT") self._DEFAULT_AUTHORIZATION = self.getenv_or_raise("AUTH_APIKEY") self._BDL_DEFAULT_PATH=self.getenv_or_raise("BDL_DEFAULT_PATH") self._PROJECT = self.getenv_or_raise("PROJECT") self._DATAPOOL_NAME = self.getenv_or_raise("DATAPOOL_NAME") LentiqDict.__instance = self
def authenticate_bucket(
self, url, authorization=None, force=False)
def authenticate_bucket(self, url, authorization=None, force=False): if authorization is None: authorization = self._DEFAULT_AUTHORIZATION parsed_url = urlparse(url) bucket_domain = parsed_url.netloc bucket_domain_parts = bucket_domain.split('.') if bucket_domain in self._theBucketList and force is False: raise Exception("Bucket with url " + url + " already initialized.") conf = Configuration() conf.host = self._LENTIQ_API_ENDPOINT api_client = ApiClient(conf) if len(bucket_domain_parts) == 1: datapools_api = lentiq.DatapoolPublicApiControllerApi(api_client) datapool_name = bucket_domain_parts[0] datapool_shared_bucket_information = datapools_api.get_datapool_shared_bucket_information( authorization, datapool_name ) if type(datapool_shared_bucket_information) is not DatapoolSharedBucketInformation: raise Exception("API returned unexpected response") bucket_information = datapool_shared_bucket_information elif len(bucket_domain_parts) == 2: projects_api = lentiq.ProjectPublicApiControllerApi(api_client) datapool_name = bucket_domain_parts[0] project_name = bucket_domain_parts[1] project_bucket_information = projects_api.get_project_bucket_information( authorization, datapool_name, project_name ) if type(project_bucket_information) is not ProjectBucketInformation: raise Exception("API returned unexpected response") bucket_information = project_bucket_information else: raise Exception("Unsupported bucket path type") self._theBucketList[bucket_domain] = self._get_storage_wrapper_for_bucket_information(bucket_information)
def get_default_api_client(
self)
def get_default_api_client(self): conf = Configuration() conf.host = self._LENTIQ_API_ENDPOINT return ApiClient(conf)
def get_default_auth(
self)
def get_default_auth(self): return self._DEFAULT_AUTHORIZATION
def get_default_data_pool_name(
self)
def get_default_data_pool_name(self): return self._DATAPOOL_NAME
def get_default_path(
self)
def get_default_path(self): return self._BDL_DEFAULT_PATH
def get_default_project_name(
self)
def get_default_project_name(self): return self._PROJECT
def get_instance(
)
@staticmethod def get_instance(): if LentiqDict.__instance is None: LentiqDict() return LentiqDict.__instance
def get_storage_wrapper(
self, url)
def get_storage_wrapper(self, url): parsed_url = urlparse(url) bucket_domain = parsed_url.netloc if bucket_domain not in self._theBucketList: self.authenticate_bucket(url, authorization=None, force=False) if bucket_domain in self._theBucketList: return self._theBucketList[bucket_domain] else: return None
def getenv_or_raise(
self, var_name)
def getenv_or_raise(self,var_name): value=os.getenv(var_name) if value is None: raise Exception("{} environment variable is not set. \ pting API_ENDPOINT,AUTH_APIKEY,BDL_DEFAULT_PATH,PROJECT,DATAPOOL_NAME \ ables to be configured before using this client. \ r to the documentation for more details".format(var_name)) return value