Top

lentiq.storage_wrappers.gcs_wrapper module

from lentiq.storage_wrappers.generic_storage_wrapper import GenericStorageWrapper
from gcsfs import GCSFileSystem
from urllib.parse import urlparse
from os.path import basename
from lentiq.models.gkep12_credentials import GKEP12Credentials
from lentiq.models.gkejson_credentials import GKEJSONCredentials
import json


class GCSWrapper(GenericStorageWrapper):
    def __init__(self, credentials, bucket_uri):
        fs = self._init_fs(credentials)
        super().__init__(fs, bucket_uri)

    def _init_fs(self, credentials):
        if type(credentials) is not GKEP12Credentials and type(credentials) is not GKEJSONCredentials:
            raise Exception("GCSWrapper requires a GKEP12Credentials or GKEJSONCredentials object.")
        return GCSFileSystem(
            project=credentials.project_id,
            token=json.loads(credentials.json)
        )

    def open_file(self, remote_path, mode='rb'):
        return self._fs.open(self._to_underlying_path(remote_path), mode)

    def get_file(self, remote_path, file_obj):
        byte_count = 0
        with self.open_file(remote_path) as lq_file:
            for line in lq_file:
                byte_count += file_obj.write(line)
        return byte_count

    def get_file_as_string(self, remote_path):
        with self.open_file(remote_path) as lq_file:
            return lq_file.read()

    def get(self, remote_path, local_path=None):
        if local_path is None:
            local_path = basename(urlparse(remote_path).path)
        with open(local_path, 'wb+') as file_obj:
            self.get_file(remote_path, file_obj)

    def put_file(self, file_obj, remote_path, overwrite=False):
        if overwrite is False:
            if self.file_exists(remote_path):
                raise Exception("File already exists.")
        byte_count = 0
        with self.open_file(remote_path, mode='wb') as lq_file:
            for line in file_obj:
                byte_count += lq_file.write(line)
        return byte_count

    def put_file_from_local(self, local_path, remote_path, overwrite=False):
        with open(local_path, 'rb') as file_obj:
            return self.put_file(file_obj, remote_path, overwrite)

    def put_file_from_string(self, s, remote_path, overwrite=False):
        if overwrite is False:
            if self.file_exists(remote_path):
                raise Exception("File already exists.")
        lq_file = self.open_file(remote_path, mode='w')
        return lq_file.write(s)

    def put(self, path_or_object, remote_path, overwrite=False):
        if type(path_or_object) is str:
            return self.put_file_from_local(path_or_object, remote_path, overwrite=overwrite)
        else:
            return self.put_file(path_or_object, remote_path, overwrite=overwrite)

    def create_directory(self, remote_path, create_all=False):
        underlying_remote_path = self._to_underlying_path(remote_path)
        if underlying_remote_path[-1] is not '/':
            underlying_remote_path += '/'
        self._fs.touch(underlying_remote_path)

    def mkdir(self, remote_path, create_all=False):
        return self.create_directory(remote_path, create_all)

    def list_directory(self, remote_path, extended=False):
        underlying_remote_path = self._to_underlying_path(remote_path)
        return self._fs.ls(underlying_remote_path)

    def ls(self, remote_path, extended=False):
        return self.list_directory(remote_path, extended)

    def file_exists(self, remote_path):
        return self._fs.exists(self._to_underlying_path(remote_path))

    def get_file_size(self, remote_path):
        return self.open_file(remote_path).size

Classes

class GCSWrapper

Helper class that provides a standard way to create an ABC using inheritance.

class GCSWrapper(GenericStorageWrapper):
    def __init__(self, credentials, bucket_uri):
        fs = self._init_fs(credentials)
        super().__init__(fs, bucket_uri)

    def _init_fs(self, credentials):
        if type(credentials) is not GKEP12Credentials and type(credentials) is not GKEJSONCredentials:
            raise Exception("GCSWrapper requires a GKEP12Credentials or GKEJSONCredentials object.")
        return GCSFileSystem(
            project=credentials.project_id,
            token=json.loads(credentials.json)
        )

    def open_file(self, remote_path, mode='rb'):
        return self._fs.open(self._to_underlying_path(remote_path), mode)

    def get_file(self, remote_path, file_obj):
        byte_count = 0
        with self.open_file(remote_path) as lq_file:
            for line in lq_file:
                byte_count += file_obj.write(line)
        return byte_count

    def get_file_as_string(self, remote_path):
        with self.open_file(remote_path) as lq_file:
            return lq_file.read()

    def get(self, remote_path, local_path=None):
        if local_path is None:
            local_path = basename(urlparse(remote_path).path)
        with open(local_path, 'wb+') as file_obj:
            self.get_file(remote_path, file_obj)

    def put_file(self, file_obj, remote_path, overwrite=False):
        if overwrite is False:
            if self.file_exists(remote_path):
                raise Exception("File already exists.")
        byte_count = 0
        with self.open_file(remote_path, mode='wb') as lq_file:
            for line in file_obj:
                byte_count += lq_file.write(line)
        return byte_count

    def put_file_from_local(self, local_path, remote_path, overwrite=False):
        with open(local_path, 'rb') as file_obj:
            return self.put_file(file_obj, remote_path, overwrite)

    def put_file_from_string(self, s, remote_path, overwrite=False):
        if overwrite is False:
            if self.file_exists(remote_path):
                raise Exception("File already exists.")
        lq_file = self.open_file(remote_path, mode='w')
        return lq_file.write(s)

    def put(self, path_or_object, remote_path, overwrite=False):
        if type(path_or_object) is str:
            return self.put_file_from_local(path_or_object, remote_path, overwrite=overwrite)
        else:
            return self.put_file(path_or_object, remote_path, overwrite=overwrite)

    def create_directory(self, remote_path, create_all=False):
        underlying_remote_path = self._to_underlying_path(remote_path)
        if underlying_remote_path[-1] is not '/':
            underlying_remote_path += '/'
        self._fs.touch(underlying_remote_path)

    def mkdir(self, remote_path, create_all=False):
        return self.create_directory(remote_path, create_all)

    def list_directory(self, remote_path, extended=False):
        underlying_remote_path = self._to_underlying_path(remote_path)
        return self._fs.ls(underlying_remote_path)

    def ls(self, remote_path, extended=False):
        return self.list_directory(remote_path, extended)

    def file_exists(self, remote_path):
        return self._fs.exists(self._to_underlying_path(remote_path))

    def get_file_size(self, remote_path):
        return self.open_file(remote_path).size

Ancestors (in MRO)

  • GCSWrapper
  • lentiq.storage_wrappers.generic_storage_wrapper.GenericStorageWrapper
  • abc.ABC
  • builtins.object

Static methods

def __init__(

self, credentials, bucket_uri)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, credentials, bucket_uri):
    fs = self._init_fs(credentials)
    super().__init__(fs, bucket_uri)

def create_directory(

self, remote_path, create_all=False)

def create_directory(self, remote_path, create_all=False):
    underlying_remote_path = self._to_underlying_path(remote_path)
    if underlying_remote_path[-1] is not '/':
        underlying_remote_path += '/'
    self._fs.touch(underlying_remote_path)

def file_exists(

self, remote_path)

def file_exists(self, remote_path):
    return self._fs.exists(self._to_underlying_path(remote_path))

def get(

self, remote_path, local_path=None)

def get(self, remote_path, local_path=None):
    if local_path is None:
        local_path = basename(urlparse(remote_path).path)
    with open(local_path, 'wb+') as file_obj:
        self.get_file(remote_path, file_obj)

def get_file(

self, remote_path, file_obj)

def get_file(self, remote_path, file_obj):
    byte_count = 0
    with self.open_file(remote_path) as lq_file:
        for line in lq_file:
            byte_count += file_obj.write(line)
    return byte_count

def get_file_as_string(

self, remote_path)

def get_file_as_string(self, remote_path):
    with self.open_file(remote_path) as lq_file:
        return lq_file.read()

def get_file_size(

self, remote_path)

def get_file_size(self, remote_path):
    return self.open_file(remote_path).size

def list_directory(

self, remote_path, extended=False)

def list_directory(self, remote_path, extended=False):
    underlying_remote_path = self._to_underlying_path(remote_path)
    return self._fs.ls(underlying_remote_path)

def ls(

self, remote_path, extended=False)

def ls(self, remote_path, extended=False):
    return self.list_directory(remote_path, extended)

def mkdir(

self, remote_path, create_all=False)

def mkdir(self, remote_path, create_all=False):
    return self.create_directory(remote_path, create_all)

def open_file(

self, remote_path, mode='rb')

def open_file(self, remote_path, mode='rb'):
    return self._fs.open(self._to_underlying_path(remote_path), mode)

def put(

self, path_or_object, remote_path, overwrite=False)

def put(self, path_or_object, remote_path, overwrite=False):
    if type(path_or_object) is str:
        return self.put_file_from_local(path_or_object, remote_path, overwrite=overwrite)
    else:
        return self.put_file(path_or_object, remote_path, overwrite=overwrite)

def put_file(

self, file_obj, remote_path, overwrite=False)

def put_file(self, file_obj, remote_path, overwrite=False):
    if overwrite is False:
        if self.file_exists(remote_path):
            raise Exception("File already exists.")
    byte_count = 0
    with self.open_file(remote_path, mode='wb') as lq_file:
        for line in file_obj:
            byte_count += lq_file.write(line)
    return byte_count

def put_file_from_local(

self, local_path, remote_path, overwrite=False)

def put_file_from_local(self, local_path, remote_path, overwrite=False):
    with open(local_path, 'rb') as file_obj:
        return self.put_file(file_obj, remote_path, overwrite)

def put_file_from_string(

self, s, remote_path, overwrite=False)

def put_file_from_string(self, s, remote_path, overwrite=False):
    if overwrite is False:
        if self.file_exists(remote_path):
            raise Exception("File already exists.")
    lq_file = self.open_file(remote_path, mode='w')
    return lq_file.write(s)