lentiq.storage_wrappers.generic_storage_wrapper module
from abc import ABC, abstractmethod from urllib.parse import urlparse class GenericStorageWrapper(ABC): def __init__(self, fs, bucket_uri): self._fs = fs self._bucket_uri = bucket_uri def _to_underlying_path(self, lq_path): parsed_url = urlparse(lq_path) path = parsed_url.path return self._bucket_uri + path @abstractmethod def _init_fs(self, credentials): pass @abstractmethod def open_file(self, remote_path, mode): pass @abstractmethod def get_file(self, remote_path, file_obj): pass @abstractmethod def get_file_as_string(self, remote_path): pass @abstractmethod def get(self, remote_path, local_path=None): pass @abstractmethod def put_file(self, file_obj, remote_path, overwrite=False): pass @abstractmethod def put_file_from_local(self, local_path, remote_path, overwrite=False): pass @abstractmethod def put_file_from_string(self, s, remote_path, overwrite=False): pass @abstractmethod def put(self, path_or_object, remote_path): pass @abstractmethod def create_directory(self, remote_path, create_all=False): pass @abstractmethod def mkdir(self, remote_path, create_all=False): pass @abstractmethod def list_directory(self, remote_path, extended=False): pass @abstractmethod def ls(self, remote_path, extended=False): pass @abstractmethod def file_exists(self, remote_path): pass @abstractmethod def get_file_size(self, remote_path): pass
Classes
class GenericStorageWrapper
Helper class that provides a standard way to create an ABC using inheritance.
class GenericStorageWrapper(ABC): def __init__(self, fs, bucket_uri): self._fs = fs self._bucket_uri = bucket_uri def _to_underlying_path(self, lq_path): parsed_url = urlparse(lq_path) path = parsed_url.path return self._bucket_uri + path @abstractmethod def _init_fs(self, credentials): pass @abstractmethod def open_file(self, remote_path, mode): pass @abstractmethod def get_file(self, remote_path, file_obj): pass @abstractmethod def get_file_as_string(self, remote_path): pass @abstractmethod def get(self, remote_path, local_path=None): pass @abstractmethod def put_file(self, file_obj, remote_path, overwrite=False): pass @abstractmethod def put_file_from_local(self, local_path, remote_path, overwrite=False): pass @abstractmethod def put_file_from_string(self, s, remote_path, overwrite=False): pass @abstractmethod def put(self, path_or_object, remote_path): pass @abstractmethod def create_directory(self, remote_path, create_all=False): pass @abstractmethod def mkdir(self, remote_path, create_all=False): pass @abstractmethod def list_directory(self, remote_path, extended=False): pass @abstractmethod def ls(self, remote_path, extended=False): pass @abstractmethod def file_exists(self, remote_path): pass @abstractmethod def get_file_size(self, remote_path): pass
Ancestors (in MRO)
- GenericStorageWrapper
- abc.ABC
- builtins.object
Static methods
def __init__(
self, fs, bucket_uri)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, fs, bucket_uri): self._fs = fs self._bucket_uri = bucket_uri
def create_directory(
self, remote_path, create_all=False)
@abstractmethod def create_directory(self, remote_path, create_all=False): pass
def file_exists(
self, remote_path)
@abstractmethod def file_exists(self, remote_path): pass
def get(
self, remote_path, local_path=None)
@abstractmethod def get(self, remote_path, local_path=None): pass
def get_file(
self, remote_path, file_obj)
@abstractmethod def get_file(self, remote_path, file_obj): pass
def get_file_as_string(
self, remote_path)
@abstractmethod def get_file_as_string(self, remote_path): pass
def get_file_size(
self, remote_path)
@abstractmethod def get_file_size(self, remote_path): pass
def list_directory(
self, remote_path, extended=False)
@abstractmethod def list_directory(self, remote_path, extended=False): pass
def ls(
self, remote_path, extended=False)
@abstractmethod def ls(self, remote_path, extended=False): pass
def mkdir(
self, remote_path, create_all=False)
@abstractmethod def mkdir(self, remote_path, create_all=False): pass
def open_file(
self, remote_path, mode)
@abstractmethod def open_file(self, remote_path, mode): pass
def put(
self, path_or_object, remote_path)
@abstractmethod def put(self, path_or_object, remote_path): pass
def put_file(
self, file_obj, remote_path, overwrite=False)
@abstractmethod def put_file(self, file_obj, remote_path, overwrite=False): pass
def put_file_from_local(
self, local_path, remote_path, overwrite=False)
@abstractmethod def put_file_from_local(self, local_path, remote_path, overwrite=False): pass
def put_file_from_string(
self, s, remote_path, overwrite=False)
@abstractmethod def put_file_from_string(self, s, remote_path, overwrite=False): pass