Top

lentiq.storage_wrappers.generic_storage_wrapper module

from abc import ABC, abstractmethod
from urllib.parse import urlparse


class GenericStorageWrapper(ABC):
    def __init__(self, fs, bucket_uri):
        self._fs = fs
        self._bucket_uri = bucket_uri

    def _to_underlying_path(self, lq_path):
        parsed_url = urlparse(lq_path)
        path = parsed_url.path

        return self._bucket_uri + path

    @abstractmethod
    def _init_fs(self, credentials):
        pass

    @abstractmethod
    def open_file(self, remote_path, mode):
        pass

    @abstractmethod
    def get_file(self, remote_path, file_obj):
        pass

    @abstractmethod
    def get_file_as_string(self, remote_path):
        pass

    @abstractmethod
    def get(self, remote_path, local_path=None):
        pass

    @abstractmethod
    def put_file(self, file_obj, remote_path, overwrite=False):
        pass

    @abstractmethod
    def put_file_from_local(self, local_path, remote_path, overwrite=False):
        pass

    @abstractmethod
    def put_file_from_string(self, s, remote_path, overwrite=False):
        pass

    @abstractmethod
    def put(self, path_or_object, remote_path):
        pass

    @abstractmethod
    def create_directory(self, remote_path, create_all=False):
        pass

    @abstractmethod
    def mkdir(self, remote_path, create_all=False):
        pass

    @abstractmethod
    def list_directory(self, remote_path, extended=False):
        pass

    @abstractmethod
    def ls(self, remote_path, extended=False):
        pass

    @abstractmethod
    def file_exists(self, remote_path):
        pass

    @abstractmethod
    def get_file_size(self, remote_path):
        pass

Classes

class GenericStorageWrapper

Helper class that provides a standard way to create an ABC using inheritance.

class GenericStorageWrapper(ABC):
    def __init__(self, fs, bucket_uri):
        self._fs = fs
        self._bucket_uri = bucket_uri

    def _to_underlying_path(self, lq_path):
        parsed_url = urlparse(lq_path)
        path = parsed_url.path

        return self._bucket_uri + path

    @abstractmethod
    def _init_fs(self, credentials):
        pass

    @abstractmethod
    def open_file(self, remote_path, mode):
        pass

    @abstractmethod
    def get_file(self, remote_path, file_obj):
        pass

    @abstractmethod
    def get_file_as_string(self, remote_path):
        pass

    @abstractmethod
    def get(self, remote_path, local_path=None):
        pass

    @abstractmethod
    def put_file(self, file_obj, remote_path, overwrite=False):
        pass

    @abstractmethod
    def put_file_from_local(self, local_path, remote_path, overwrite=False):
        pass

    @abstractmethod
    def put_file_from_string(self, s, remote_path, overwrite=False):
        pass

    @abstractmethod
    def put(self, path_or_object, remote_path):
        pass

    @abstractmethod
    def create_directory(self, remote_path, create_all=False):
        pass

    @abstractmethod
    def mkdir(self, remote_path, create_all=False):
        pass

    @abstractmethod
    def list_directory(self, remote_path, extended=False):
        pass

    @abstractmethod
    def ls(self, remote_path, extended=False):
        pass

    @abstractmethod
    def file_exists(self, remote_path):
        pass

    @abstractmethod
    def get_file_size(self, remote_path):
        pass

Ancestors (in MRO)

Static methods

def __init__(

self, fs, bucket_uri)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, fs, bucket_uri):
    self._fs = fs
    self._bucket_uri = bucket_uri

def create_directory(

self, remote_path, create_all=False)

@abstractmethod
def create_directory(self, remote_path, create_all=False):
    pass

def file_exists(

self, remote_path)

@abstractmethod
def file_exists(self, remote_path):
    pass

def get(

self, remote_path, local_path=None)

@abstractmethod
def get(self, remote_path, local_path=None):
    pass

def get_file(

self, remote_path, file_obj)

@abstractmethod
def get_file(self, remote_path, file_obj):
    pass

def get_file_as_string(

self, remote_path)

@abstractmethod
def get_file_as_string(self, remote_path):
    pass

def get_file_size(

self, remote_path)

@abstractmethod
def get_file_size(self, remote_path):
    pass

def list_directory(

self, remote_path, extended=False)

@abstractmethod
def list_directory(self, remote_path, extended=False):
    pass

def ls(

self, remote_path, extended=False)

@abstractmethod
def ls(self, remote_path, extended=False):
    pass

def mkdir(

self, remote_path, create_all=False)

@abstractmethod
def mkdir(self, remote_path, create_all=False):
    pass

def open_file(

self, remote_path, mode)

@abstractmethod
def open_file(self, remote_path, mode):
    pass

def put(

self, path_or_object, remote_path)

@abstractmethod
def put(self, path_or_object, remote_path):
    pass

def put_file(

self, file_obj, remote_path, overwrite=False)

@abstractmethod
def put_file(self, file_obj, remote_path, overwrite=False):
    pass

def put_file_from_local(

self, local_path, remote_path, overwrite=False)

@abstractmethod
def put_file_from_local(self, local_path, remote_path, overwrite=False):
    pass

def put_file_from_string(

self, s, remote_path, overwrite=False)

@abstractmethod
def put_file_from_string(self, s, remote_path, overwrite=False):
    pass