Top

lentiq.lentiq_dict module

import lentiq
from urllib.parse import urlparse
from lentiq.api_client import ApiClient
from lentiq.configuration import Configuration
from lentiq.models.project_bucket_information import ProjectBucketInformation
from lentiq.models.datapool_shared_bucket_information import DatapoolSharedBucketInformation
from lentiq.models.eks_credentials import EKSCredentials
from lentiq.models.gkep12_credentials import GKEP12Credentials
from lentiq.models.existing_cluster_basic_auth_credentials import ExistingClusterBasicAuthCredentials
from lentiq.models.gkejson_credentials import GKEJSONCredentials
from lentiq.models.existing_cluster_certificate_credentials import ExistingClusterCertificateCredentials
from lentiq.storage_wrappers.gcs_wrapper import GCSWrapper
from lentiq.storage_wrappers.s3_wrapper import S3Wrapper
import os


class LentiqDict:
    __instance = None
    @staticmethod
    def get_instance():
        if LentiqDict.__instance is None:
            LentiqDict()
        return LentiqDict.__instance

    _theBucketList = {}
  
    def getenv_or_raise(self,var_name):
        value=os.getenv(var_name)
        if value is None:
            raise Exception("{} environment variable is not set. \
Excepting API_ENDPOINT,AUTH_APIKEY,BDL_DEFAULT_PATH,PROJECT,DATAPOOL_NAME \
variables to be configured before using this client. \
Refer to the documentation for more details".format(var_name))
        return value

    def __init__(self):
        if LentiqDict.__instance is not None:
            raise Exception("LentiqDict is a singleton!")
        else:
            self._LENTIQ_API_ENDPOINT = self.getenv_or_raise("API_ENDPOINT") 
            self._DEFAULT_AUTHORIZATION = self.getenv_or_raise("AUTH_APIKEY")
            self._BDL_DEFAULT_PATH=self.getenv_or_raise("BDL_DEFAULT_PATH")
            self._PROJECT = self.getenv_or_raise("PROJECT")
            self._DATAPOOL_NAME = self.getenv_or_raise("DATAPOOL_NAME")
             
            LentiqDict.__instance = self

    def get_storage_wrapper(self, url):
        parsed_url = urlparse(url)
        bucket_domain = parsed_url.netloc
        if bucket_domain not in self._theBucketList:
            self.authenticate_bucket(url, authorization=None, force=False)
        if bucket_domain in self._theBucketList:
            return self._theBucketList[bucket_domain]
        else:
            return None

    def authenticate_bucket(self, url, authorization=None, force=False):
        if authorization is None:
            authorization = self._DEFAULT_AUTHORIZATION

        parsed_url = urlparse(url)
        bucket_domain = parsed_url.netloc
        bucket_domain_parts = bucket_domain.split('.')

        if bucket_domain in self._theBucketList and force is False:
            raise Exception("Bucket with url " + url + " already initialized.")

        conf = Configuration()
        conf.host = self._LENTIQ_API_ENDPOINT
        api_client = ApiClient(conf)

        if len(bucket_domain_parts) == 1:
            datapools_api = lentiq.DatapoolPublicApiControllerApi(api_client)

            datapool_name = bucket_domain_parts[0]
            datapool_shared_bucket_information = datapools_api.get_datapool_shared_bucket_information(
                authorization,
                datapool_name
            )
            if type(datapool_shared_bucket_information) is not DatapoolSharedBucketInformation:
                raise Exception("API returned unexpected response")
            bucket_information = datapool_shared_bucket_information
        elif len(bucket_domain_parts) == 2:
            projects_api = lentiq.ProjectPublicApiControllerApi(api_client)

            datapool_name = bucket_domain_parts[0]
            project_name = bucket_domain_parts[1]
            project_bucket_information = projects_api.get_project_bucket_information(
                authorization,
                datapool_name,
                project_name
            )
            if type(project_bucket_information) is not ProjectBucketInformation:
                raise Exception("API returned unexpected response")
            bucket_information = project_bucket_information
        else:
            raise Exception("Unsupported bucket path type")

        self._theBucketList[bucket_domain] = self._get_storage_wrapper_for_bucket_information(bucket_information)

    def _get_storage_wrapper_for_bucket_information(self, bucket_information):
        bucket_uri = bucket_information.bucket_uri
        credentials = bucket_information.credential
        t = type(credentials)
        if t is EKSCredentials:
            return S3Wrapper(credentials, bucket_uri)
        elif t is GKEP12Credentials:
            raise Exception("Bucket type not yet supported!")
        elif t is ExistingClusterBasicAuthCredentials:
            raise Exception("Bucket type not yet supported!")
        elif t is GKEJSONCredentials:
            return GCSWrapper(credentials, bucket_uri)
        elif t is ExistingClusterCertificateCredentials:
            raise Exception("Bucket type not yet supported!")
        else:
            raise Exception("Unknown bucket type!")

    def get_default_path(self):
        return self._BDL_DEFAULT_PATH
  
    def get_default_api_client(self):
        conf = Configuration()
        conf.host = self._LENTIQ_API_ENDPOINT
        return ApiClient(conf)
  
    def get_default_project_name(self):
        return self._PROJECT

    def get_default_data_pool_name(self):
        return self._DATAPOOL_NAME  

    def get_default_auth(self):
        return self._DEFAULT_AUTHORIZATION  

Classes

class LentiqDict

class LentiqDict:
    __instance = None
    @staticmethod
    def get_instance():
        if LentiqDict.__instance is None:
            LentiqDict()
        return LentiqDict.__instance

    _theBucketList = {}
  
    def getenv_or_raise(self,var_name):
        value=os.getenv(var_name)
        if value is None:
            raise Exception("{} environment variable is not set. \
Excepting API_ENDPOINT,AUTH_APIKEY,BDL_DEFAULT_PATH,PROJECT,DATAPOOL_NAME \
variables to be configured before using this client. \
Refer to the documentation for more details".format(var_name))
        return value

    def __init__(self):
        if LentiqDict.__instance is not None:
            raise Exception("LentiqDict is a singleton!")
        else:
            self._LENTIQ_API_ENDPOINT = self.getenv_or_raise("API_ENDPOINT") 
            self._DEFAULT_AUTHORIZATION = self.getenv_or_raise("AUTH_APIKEY")
            self._BDL_DEFAULT_PATH=self.getenv_or_raise("BDL_DEFAULT_PATH")
            self._PROJECT = self.getenv_or_raise("PROJECT")
            self._DATAPOOL_NAME = self.getenv_or_raise("DATAPOOL_NAME")
             
            LentiqDict.__instance = self

    def get_storage_wrapper(self, url):
        parsed_url = urlparse(url)
        bucket_domain = parsed_url.netloc
        if bucket_domain not in self._theBucketList:
            self.authenticate_bucket(url, authorization=None, force=False)
        if bucket_domain in self._theBucketList:
            return self._theBucketList[bucket_domain]
        else:
            return None

    def authenticate_bucket(self, url, authorization=None, force=False):
        if authorization is None:
            authorization = self._DEFAULT_AUTHORIZATION

        parsed_url = urlparse(url)
        bucket_domain = parsed_url.netloc
        bucket_domain_parts = bucket_domain.split('.')

        if bucket_domain in self._theBucketList and force is False:
            raise Exception("Bucket with url " + url + " already initialized.")

        conf = Configuration()
        conf.host = self._LENTIQ_API_ENDPOINT
        api_client = ApiClient(conf)

        if len(bucket_domain_parts) == 1:
            datapools_api = lentiq.DatapoolPublicApiControllerApi(api_client)

            datapool_name = bucket_domain_parts[0]
            datapool_shared_bucket_information = datapools_api.get_datapool_shared_bucket_information(
                authorization,
                datapool_name
            )
            if type(datapool_shared_bucket_information) is not DatapoolSharedBucketInformation:
                raise Exception("API returned unexpected response")
            bucket_information = datapool_shared_bucket_information
        elif len(bucket_domain_parts) == 2:
            projects_api = lentiq.ProjectPublicApiControllerApi(api_client)

            datapool_name = bucket_domain_parts[0]
            project_name = bucket_domain_parts[1]
            project_bucket_information = projects_api.get_project_bucket_information(
                authorization,
                datapool_name,
                project_name
            )
            if type(project_bucket_information) is not ProjectBucketInformation:
                raise Exception("API returned unexpected response")
            bucket_information = project_bucket_information
        else:
            raise Exception("Unsupported bucket path type")

        self._theBucketList[bucket_domain] = self._get_storage_wrapper_for_bucket_information(bucket_information)

    def _get_storage_wrapper_for_bucket_information(self, bucket_information):
        bucket_uri = bucket_information.bucket_uri
        credentials = bucket_information.credential
        t = type(credentials)
        if t is EKSCredentials:
            return S3Wrapper(credentials, bucket_uri)
        elif t is GKEP12Credentials:
            raise Exception("Bucket type not yet supported!")
        elif t is ExistingClusterBasicAuthCredentials:
            raise Exception("Bucket type not yet supported!")
        elif t is GKEJSONCredentials:
            return GCSWrapper(credentials, bucket_uri)
        elif t is ExistingClusterCertificateCredentials:
            raise Exception("Bucket type not yet supported!")
        else:
            raise Exception("Unknown bucket type!")

    def get_default_path(self):
        return self._BDL_DEFAULT_PATH
  
    def get_default_api_client(self):
        conf = Configuration()
        conf.host = self._LENTIQ_API_ENDPOINT
        return ApiClient(conf)
  
    def get_default_project_name(self):
        return self._PROJECT

    def get_default_data_pool_name(self):
        return self._DATAPOOL_NAME  

    def get_default_auth(self):
        return self._DEFAULT_AUTHORIZATION  

Ancestors (in MRO)

Static methods

def __init__(

self)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self):
    if LentiqDict.__instance is not None:
        raise Exception("LentiqDict is a singleton!")
    else:
        self._LENTIQ_API_ENDPOINT = self.getenv_or_raise("API_ENDPOINT") 
        self._DEFAULT_AUTHORIZATION = self.getenv_or_raise("AUTH_APIKEY")
        self._BDL_DEFAULT_PATH=self.getenv_or_raise("BDL_DEFAULT_PATH")
        self._PROJECT = self.getenv_or_raise("PROJECT")
        self._DATAPOOL_NAME = self.getenv_or_raise("DATAPOOL_NAME")
         
        LentiqDict.__instance = self

def authenticate_bucket(

self, url, authorization=None, force=False)

def authenticate_bucket(self, url, authorization=None, force=False):
    if authorization is None:
        authorization = self._DEFAULT_AUTHORIZATION
    parsed_url = urlparse(url)
    bucket_domain = parsed_url.netloc
    bucket_domain_parts = bucket_domain.split('.')
    if bucket_domain in self._theBucketList and force is False:
        raise Exception("Bucket with url " + url + " already initialized.")
    conf = Configuration()
    conf.host = self._LENTIQ_API_ENDPOINT
    api_client = ApiClient(conf)
    if len(bucket_domain_parts) == 1:
        datapools_api = lentiq.DatapoolPublicApiControllerApi(api_client)
        datapool_name = bucket_domain_parts[0]
        datapool_shared_bucket_information = datapools_api.get_datapool_shared_bucket_information(
            authorization,
            datapool_name
        )
        if type(datapool_shared_bucket_information) is not DatapoolSharedBucketInformation:
            raise Exception("API returned unexpected response")
        bucket_information = datapool_shared_bucket_information
    elif len(bucket_domain_parts) == 2:
        projects_api = lentiq.ProjectPublicApiControllerApi(api_client)
        datapool_name = bucket_domain_parts[0]
        project_name = bucket_domain_parts[1]
        project_bucket_information = projects_api.get_project_bucket_information(
            authorization,
            datapool_name,
            project_name
        )
        if type(project_bucket_information) is not ProjectBucketInformation:
            raise Exception("API returned unexpected response")
        bucket_information = project_bucket_information
    else:
        raise Exception("Unsupported bucket path type")
    self._theBucketList[bucket_domain] = self._get_storage_wrapper_for_bucket_information(bucket_information)

def get_default_api_client(

self)

def get_default_api_client(self):
    conf = Configuration()
    conf.host = self._LENTIQ_API_ENDPOINT
    return ApiClient(conf)

def get_default_auth(

self)

def get_default_auth(self):
    return self._DEFAULT_AUTHORIZATION  

def get_default_data_pool_name(

self)

def get_default_data_pool_name(self):
    return self._DATAPOOL_NAME  

def get_default_path(

self)

def get_default_path(self):
    return self._BDL_DEFAULT_PATH

def get_default_project_name(

self)

def get_default_project_name(self):
    return self._PROJECT

def get_instance(

)

@staticmethod
def get_instance():
    if LentiqDict.__instance is None:
        LentiqDict()
    return LentiqDict.__instance

def get_storage_wrapper(

self, url)

def get_storage_wrapper(self, url):
    parsed_url = urlparse(url)
    bucket_domain = parsed_url.netloc
    if bucket_domain not in self._theBucketList:
        self.authenticate_bucket(url, authorization=None, force=False)
    if bucket_domain in self._theBucketList:
        return self._theBucketList[bucket_domain]
    else:
        return None

def getenv_or_raise(

self, var_name)

def getenv_or_raise(self,var_name):
    value=os.getenv(var_name)
    if value is None:
        raise Exception("{} environment variable is not set. \
pting API_ENDPOINT,AUTH_APIKEY,BDL_DEFAULT_PATH,PROJECT,DATAPOOL_NAME \
ables to be configured before using this client. \
r to the documentation for more details".format(var_name))
    return value