Source code for nsaph_utils.utils.pyfst

"""
A reader for `FST files <https://www.fstpackage.org/>`_
providing the same
interface as `CSV reader <https://docs.python.org/3/library/csv.html>`_.


"""


#  Copyright (c) 2021. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import datetime
from typing import Optional

try:
    import rpy2.robjects as robjects
    import rpy2.robjects.packages as rpackages
    from rpy2.rinterface import NULL
    from rpy2.robjects import DataFrame
    from rpy2.robjects.vectors import DateVector


    EPOCH = datetime.date(year=1970, month=1, day=1).toordinal()


    def choose_cran_mirror():
        utils = rpackages.importr('utils')
        #mirrors = utils.getCRANmirrors()
        utils.chooseCRANmirror(ind=1)
        return utils


    def ensure_packages():
        utils = choose_cran_mirror()
        packages = ["fst"]
        for p in packages:
            if not rpackages.isinstalled(p):
                utils.install_packages(p)
            rpackages.importr(p)
        return


    def read_fst(path: str, start=1, end=None) -> (DataFrame, bool):
        ensure_packages()
        f = robjects.r["read_fst"]
        if start != 1 or end is not None:
            df = f(path, NULL, start, end)
            if end is None:
                complete = True
            elif df.nrow <= (end - start):
                complete = True
            else:
                complete = False
        else:
            df = f(path)
            complete = True
        return df, complete


    def vector2list(v):
        if DateVector.isrinstance(v):
            return [datetime.date.fromordinal(EPOCH + int(d)) if d == d else None for d in v]
        return list(v)


    class FSTReader:
        """
        Class providing CSV-like interface for FST files
        """

        def __init__(self, path, buffer_size = 10000, returns_mapping = False):
            if not path.endswith(".fst"):
                raise Exception("Unknown format of file " + path)
            self.path = path
            self.buffer_size = buffer_size
            self.columns = None
            self.pointer = 1
            self.last = 0
            self.first = 0
            self.complete = False
            self.returns_mapping = returns_mapping

        def read_next(self):
            self.first = self.pointer
            end = self.first + self.buffer_size - 1
            df, self.complete = read_fst(self.path, self.first, end)
            self.last = self.pointer + df.nrow
            self.columns = {
                df.colnames[c]: vector2list(df[c]) for c in range(df.ncol)
            }

        def current(self) -> Optional[int]:
            if self.pointer >= self.last:
                if self.complete:
                    return None
                self.read_next()
                if self.pointer >= self.last:
                    return None
            return self.pointer - self.first

        def current_row(self) -> Optional[list]:
            r = self.current()
            if r is None:
                return None
            row = [
                self.columns[column][r] for column in self.columns
            ]
            return row

        def current_mapping(self) -> Optional[dict]:
            r = self.current()
            if r is None:
                return None
            row = {
                column: self.columns[column][r] for column in self.columns
            }
            return row

        def __next__(self):
            if self.returns_mapping:
                row = self.current_mapping()
            else:
                row = self.current_row()
            if row is None:
                raise StopIteration()
            self.pointer += 1
            return row

        def close(self):
            self.columns = None
            self.pointer = -1
            self.complete = True

        def rewind(self):
            if self.pointer == 1:
                return
            self.pointer = 1
            self.read_next()

        def open(self):
            self.rewind()
            self.read_next()

        def __enter__(self):
            self.open()
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()

        def __iter__(self):
            return self
except:
[docs] class FSTReader: def __int__(self): raise NotImplementedError("FSTReader requires R environment")
### END