Top

lentiq.pandas_extensions module

import lentiq.io as lq
import os 
from warnings import catch_warnings

try:
        import pandas as pd
        import wrapt

        @wrapt.patch_function_wrapper('pandas.io.parsers', 'get_filepath_or_buffer')
        def get_filepath_or_buffer_wrapper_parsers(wrapped, instance, args, kwargs):
            from urllib.parse import urlparse

            url = args[0]
            parsed_url = urlparse(url)
            encoding = args[1] if len(args)>1 else None
            compression = args[2] if len(args)>2 else None

            if parsed_url.scheme in ['lq', 'bdl']:
                f = lq.open_file(url, mode='rb')
                return f, encoding, compression, True

            else:
                return wrapped(*args, **kwargs)


        @wrapt.patch_function_wrapper('pandas.io.formats.csvs', '_get_handle')
        def _get_handle_wrapper(wrapped, instance, args, kwargs):
            from urllib.parse import urlparse

            parsed_url = urlparse(args[0])
            if parsed_url.scheme in ['lq', 'bdl']:
                url = args[0]
                mode = args[1] if len(args)>1 else 'wb'
                encoding=args[2] if len(args)>2 else 'utf-8'
                compression=args[3] if len(args)>3 else None
                memory_map=args[4] if len(args)>4 else None
                is_text=args[5] if len(args)>5 else True
                f = lq.open_file(url, mode="wb")
                handles = list()

                if is_text:

                  from io import TextIOWrapper
                  f = TextIOWrapper(f, encoding=encoding)
                  handles.append(f)

                else:
                  handles.append(f)
                return f, handles
            else:
                return wrapped(*args, **kwargs)


        @wrapt.patch_function_wrapper('pandas.io.parquet', 'FastParquetImpl.read')
        def FastParquetImpl_read(wrapped, instance, args, kwargs):
            from urllib.parse import urlparse
            path = args[0]
            parsed_url = urlparse(path)
            if parsed_url.scheme in ['lq', 'bdl']:
                parquet_file = instance.api.ParquetFile(path, open_with=lq.open_file)
                return parquet_file.to_pandas( **kwargs) 
            else:
                return wrapped(*args, **kwargs)

        @wrapt.patch_function_wrapper('pandas.io.parquet', 'FastParquetImpl.write')
        def FastParquetImpl_write(wrapped, instance, args, kwargs):
            from urllib.parse import urlparse

            df = args[0]
            instance.validate_dataframe(df)

            path = args[1]
            parsed_url = urlparse(path)
            if parsed_url.scheme in ['lq', 'bdl']:
              with catch_warnings(record=True):
                instance.api.write(path, df,  open_with=lq.open_file, **kwargs) 
            else:
              return wrapped(*args, **kwargs)

except ImportError:
  #silently ignoring import error to allow our client to be used on systems without pandas
  pass