lentiq.storage_wrappers.gcs_wrapper module
from lentiq.storage_wrappers.generic_storage_wrapper import GenericStorageWrapper from gcsfs import GCSFileSystem from urllib.parse import urlparse from os.path import basename from lentiq.models.gkep12_credentials import GKEP12Credentials from lentiq.models.gkejson_credentials import GKEJSONCredentials import json class GCSWrapper(GenericStorageWrapper): def __init__(self, credentials, bucket_uri): fs = self._init_fs(credentials) super().__init__(fs, bucket_uri) def _init_fs(self, credentials): if type(credentials) is not GKEP12Credentials and type(credentials) is not GKEJSONCredentials: raise Exception("GCSWrapper requires a GKEP12Credentials or GKEJSONCredentials object.") return GCSFileSystem( project=credentials.project_id, token=json.loads(credentials.json) ) def open_file(self, remote_path, mode='rb'): return self._fs.open(self._to_underlying_path(remote_path), mode) def get_file(self, remote_path, file_obj): byte_count = 0 with self.open_file(remote_path) as lq_file: for line in lq_file: byte_count += file_obj.write(line) return byte_count def get_file_as_string(self, remote_path): with self.open_file(remote_path) as lq_file: return lq_file.read() def get(self, remote_path, local_path=None): if local_path is None: local_path = basename(urlparse(remote_path).path) with open(local_path, 'wb+') as file_obj: self.get_file(remote_path, file_obj) def put_file(self, file_obj, remote_path, overwrite=False): if overwrite is False: if self.file_exists(remote_path): raise Exception("File already exists.") byte_count = 0 with self.open_file(remote_path, mode='wb') as lq_file: for line in file_obj: byte_count += lq_file.write(line) return byte_count def put_file_from_local(self, local_path, remote_path, overwrite=False): with open(local_path, 'rb') as file_obj: return self.put_file(file_obj, remote_path, overwrite) def put_file_from_string(self, s, remote_path, overwrite=False): if overwrite is False: if self.file_exists(remote_path): raise Exception("File already exists.") lq_file = self.open_file(remote_path, mode='w') return lq_file.write(s) def put(self, path_or_object, remote_path, overwrite=False): if type(path_or_object) is str: return self.put_file_from_local(path_or_object, remote_path, overwrite=overwrite) else: return self.put_file(path_or_object, remote_path, overwrite=overwrite) def create_directory(self, remote_path, create_all=False): underlying_remote_path = self._to_underlying_path(remote_path) if underlying_remote_path[-1] is not '/': underlying_remote_path += '/' self._fs.touch(underlying_remote_path) def mkdir(self, remote_path, create_all=False): return self.create_directory(remote_path, create_all) def list_directory(self, remote_path, extended=False): underlying_remote_path = self._to_underlying_path(remote_path) return self._fs.ls(underlying_remote_path) def ls(self, remote_path, extended=False): return self.list_directory(remote_path, extended) def file_exists(self, remote_path): return self._fs.exists(self._to_underlying_path(remote_path)) def get_file_size(self, remote_path): return self.open_file(remote_path).size
Classes
class GCSWrapper
Helper class that provides a standard way to create an ABC using inheritance.
class GCSWrapper(GenericStorageWrapper): def __init__(self, credentials, bucket_uri): fs = self._init_fs(credentials) super().__init__(fs, bucket_uri) def _init_fs(self, credentials): if type(credentials) is not GKEP12Credentials and type(credentials) is not GKEJSONCredentials: raise Exception("GCSWrapper requires a GKEP12Credentials or GKEJSONCredentials object.") return GCSFileSystem( project=credentials.project_id, token=json.loads(credentials.json) ) def open_file(self, remote_path, mode='rb'): return self._fs.open(self._to_underlying_path(remote_path), mode) def get_file(self, remote_path, file_obj): byte_count = 0 with self.open_file(remote_path) as lq_file: for line in lq_file: byte_count += file_obj.write(line) return byte_count def get_file_as_string(self, remote_path): with self.open_file(remote_path) as lq_file: return lq_file.read() def get(self, remote_path, local_path=None): if local_path is None: local_path = basename(urlparse(remote_path).path) with open(local_path, 'wb+') as file_obj: self.get_file(remote_path, file_obj) def put_file(self, file_obj, remote_path, overwrite=False): if overwrite is False: if self.file_exists(remote_path): raise Exception("File already exists.") byte_count = 0 with self.open_file(remote_path, mode='wb') as lq_file: for line in file_obj: byte_count += lq_file.write(line) return byte_count def put_file_from_local(self, local_path, remote_path, overwrite=False): with open(local_path, 'rb') as file_obj: return self.put_file(file_obj, remote_path, overwrite) def put_file_from_string(self, s, remote_path, overwrite=False): if overwrite is False: if self.file_exists(remote_path): raise Exception("File already exists.") lq_file = self.open_file(remote_path, mode='w') return lq_file.write(s) def put(self, path_or_object, remote_path, overwrite=False): if type(path_or_object) is str: return self.put_file_from_local(path_or_object, remote_path, overwrite=overwrite) else: return self.put_file(path_or_object, remote_path, overwrite=overwrite) def create_directory(self, remote_path, create_all=False): underlying_remote_path = self._to_underlying_path(remote_path) if underlying_remote_path[-1] is not '/': underlying_remote_path += '/' self._fs.touch(underlying_remote_path) def mkdir(self, remote_path, create_all=False): return self.create_directory(remote_path, create_all) def list_directory(self, remote_path, extended=False): underlying_remote_path = self._to_underlying_path(remote_path) return self._fs.ls(underlying_remote_path) def ls(self, remote_path, extended=False): return self.list_directory(remote_path, extended) def file_exists(self, remote_path): return self._fs.exists(self._to_underlying_path(remote_path)) def get_file_size(self, remote_path): return self.open_file(remote_path).size
Ancestors (in MRO)
- GCSWrapper
- lentiq.storage_wrappers.generic_storage_wrapper.GenericStorageWrapper
- abc.ABC
- builtins.object
Static methods
def __init__(
self, credentials, bucket_uri)
Initialize self. See help(type(self)) for accurate signature.
def __init__(self, credentials, bucket_uri): fs = self._init_fs(credentials) super().__init__(fs, bucket_uri)
def create_directory(
self, remote_path, create_all=False)
def create_directory(self, remote_path, create_all=False): underlying_remote_path = self._to_underlying_path(remote_path) if underlying_remote_path[-1] is not '/': underlying_remote_path += '/' self._fs.touch(underlying_remote_path)
def file_exists(
self, remote_path)
def file_exists(self, remote_path): return self._fs.exists(self._to_underlying_path(remote_path))
def get(
self, remote_path, local_path=None)
def get(self, remote_path, local_path=None): if local_path is None: local_path = basename(urlparse(remote_path).path) with open(local_path, 'wb+') as file_obj: self.get_file(remote_path, file_obj)
def get_file(
self, remote_path, file_obj)
def get_file(self, remote_path, file_obj): byte_count = 0 with self.open_file(remote_path) as lq_file: for line in lq_file: byte_count += file_obj.write(line) return byte_count
def get_file_as_string(
self, remote_path)
def get_file_as_string(self, remote_path): with self.open_file(remote_path) as lq_file: return lq_file.read()
def get_file_size(
self, remote_path)
def get_file_size(self, remote_path): return self.open_file(remote_path).size
def list_directory(
self, remote_path, extended=False)
def list_directory(self, remote_path, extended=False): underlying_remote_path = self._to_underlying_path(remote_path) return self._fs.ls(underlying_remote_path)
def ls(
self, remote_path, extended=False)
def ls(self, remote_path, extended=False): return self.list_directory(remote_path, extended)
def mkdir(
self, remote_path, create_all=False)
def mkdir(self, remote_path, create_all=False): return self.create_directory(remote_path, create_all)
def open_file(
self, remote_path, mode='rb')
def open_file(self, remote_path, mode='rb'): return self._fs.open(self._to_underlying_path(remote_path), mode)
def put(
self, path_or_object, remote_path, overwrite=False)
def put(self, path_or_object, remote_path, overwrite=False): if type(path_or_object) is str: return self.put_file_from_local(path_or_object, remote_path, overwrite=overwrite) else: return self.put_file(path_or_object, remote_path, overwrite=overwrite)
def put_file(
self, file_obj, remote_path, overwrite=False)
def put_file(self, file_obj, remote_path, overwrite=False): if overwrite is False: if self.file_exists(remote_path): raise Exception("File already exists.") byte_count = 0 with self.open_file(remote_path, mode='wb') as lq_file: for line in file_obj: byte_count += lq_file.write(line) return byte_count
def put_file_from_local(
self, local_path, remote_path, overwrite=False)
def put_file_from_local(self, local_path, remote_path, overwrite=False): with open(local_path, 'rb') as file_obj: return self.put_file(file_obj, remote_path, overwrite)
def put_file_from_string(
self, s, remote_path, overwrite=False)
def put_file_from_string(self, s, remote_path, overwrite=False): if overwrite is False: if self.file_exists(remote_path): raise Exception("File already exists.") lq_file = self.open_file(remote_path, mode='w') return lq_file.write(s)