diff --git a/dataframe.cabal b/dataframe.cabal index 7b0f8aa7..542c6448 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -22,7 +22,8 @@ source-repository head location: https://github.com/mchav/dataframe library - exposed-modules: DataFrame + exposed-modules: DataFrame, + DataFrame.Lazy other-modules: DataFrame.Internal.Types, DataFrame.Internal.Expression, DataFrame.Internal.Parsing, @@ -33,6 +34,7 @@ library DataFrame.Internal.Row, DataFrame.Errors, DataFrame.Operations.Core, + DataFrame.Operations.Merge, DataFrame.Operations.Subset, DataFrame.Operations.Sorting, DataFrame.Operations.Statistics, @@ -40,13 +42,16 @@ library DataFrame.Operations.Typing, DataFrame.Operations.Aggregation, DataFrame.Display.Terminal.Plot, - DataFrame.IO.CSV + DataFrame.IO.CSV, + DataFrame.Lazy.IO.CSV, + DataFrame.Lazy.Internal.DataFrame build-depends: base >= 4.17.2.0 && < 4.22, array ^>= 0.5, attoparsec >= 0.12 && <= 0.14.4, bytestring >= 0.11 && <= 0.12.2.0, containers >= 0.6.7 && < 0.8, directory >= 1.3.0.0 && <= 1.3.9.0, + filepath >= 1.0.0.0 && <= 1.5.4.0, hashable >= 1.2 && <= 1.5.0.0, statistics >= 0.16.2.1 && <= 0.16.3.0, text >= 2.0 && <= 2.1.2, @@ -76,7 +81,8 @@ executable dataframe DataFrame.Operations.Typing, DataFrame.Operations.Aggregation, DataFrame.Display.Terminal.Plot, - DataFrame.IO.CSV + DataFrame.IO.CSV, + DataFrame.Lazy.IO.CSV build-depends: base >= 4.17.2.0 && < 4.22, array ^>= 0.5, attoparsec >= 0.12 && <= 0.14.4, diff --git a/src/DataFrame/IO.hs b/src/DataFrame/IO.hs new file mode 100644 index 00000000..50134000 --- /dev/null +++ b/src/DataFrame/IO.hs @@ -0,0 +1,3 @@ +module DataFrame.IO where + +data InputTypes = CSV deriving Show \ No newline at end of file diff --git a/src/DataFrame/Internal/Column.hs b/src/DataFrame/Internal/Column.hs index b92ae1d0..4f6609ed 100644 --- a/src/DataFrame/Internal/Column.hs +++ b/src/DataFrame/Internal/Column.hs @@ -4,7 +4,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StrictData #-} +{-# LANGUAGE Strict #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} @@ -521,6 +521,31 @@ expandColumn n (UnboxedColumn col) = OptionalColumn $ VB.map Just (VU.convert co expandColumn n (GroupedBoxedColumn col) = GroupedBoxedColumn $ col <> VB.replicate n VB.empty expandColumn n (GroupedUnboxedColumn col) = GroupedUnboxedColumn $ col <> VB.replicate n VU.empty +leftExpandColumn :: Int -> Column -> Column +leftExpandColumn n (OptionalColumn col) = OptionalColumn $ VB.replicate n Nothing <> col +leftExpandColumn n (BoxedColumn col) = OptionalColumn $ VB.replicate n Nothing <> VB.map Just col +leftExpandColumn n (UnboxedColumn col) = OptionalColumn $ VB.replicate n Nothing <> VB.map Just (VU.convert col) +leftExpandColumn n (GroupedBoxedColumn col) = GroupedBoxedColumn $ VB.replicate n VB.empty <> col +leftExpandColumn n (GroupedUnboxedColumn col) = GroupedUnboxedColumn $ VB.replicate n VU.empty <> col + +concatColumns :: Column -> Column -> Maybe Column +concatColumns (OptionalColumn left) (OptionalColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (OptionalColumn $ left <> right) +concatColumns (BoxedColumn left) (BoxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (BoxedColumn $ left <> right) +concatColumns (UnboxedColumn left) (UnboxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (UnboxedColumn $ left <> right) +concatColumns (GroupedBoxedColumn left) (GroupedBoxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (GroupedBoxedColumn $ left <> right) +concatColumns (GroupedUnboxedColumn left) (GroupedUnboxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (GroupedUnboxedColumn $ left <> right) +concatColumns _ _ = Nothing + toVector :: forall a . Columnable a => Column -> VB.Vector a toVector column@(OptionalColumn (col :: VB.Vector b)) = case testEquality (typeRep @a) (typeRep @b) of diff --git a/src/DataFrame/Internal/DataFrame.hs b/src/DataFrame/Internal/DataFrame.hs index cadd2113..4406badb 100644 --- a/src/DataFrame/Internal/DataFrame.hs +++ b/src/DataFrame/Internal/DataFrame.hs @@ -4,7 +4,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE GADTs #-} -{-# LANGUAGE StrictData #-} +{-# LANGUAGE Strict #-} {-# LANGUAGE FlexibleContexts #-} module DataFrame.Internal.DataFrame where diff --git a/src/DataFrame/Internal/Expression.hs b/src/DataFrame/Internal/Expression.hs index 4772b078..ee9d1bea 100644 --- a/src/DataFrame/Internal/Expression.hs +++ b/src/DataFrame/Internal/Expression.hs @@ -32,7 +32,7 @@ data Expr a where Apply :: (Columnable a, Columnable b) => T.Text -> (b -> a) -> Expr b -> Expr a BinOp :: (Columnable c, Columnable b, Columnable a) => T.Text -> (c -> b -> a) -> Expr c -> Expr b -> Expr a -interpret :: forall a b . (Columnable a) => DataFrame -> Expr a -> TypedColumn a +interpret :: forall a . (Columnable a) => DataFrame -> Expr a -> TypedColumn a interpret df (Lit value) = TColumn $ toColumn' $ V.replicate (fst $ dataframeDimensions df) value interpret df (Col name) = case getColumn name df of Nothing -> throw $ ColumnNotFoundException name "" (map fst $ M.toList $ columnIndices df) diff --git a/src/DataFrame/Internal/Types.hs b/src/DataFrame/Internal/Types.hs index 2a72e0a7..3a640786 100644 --- a/src/DataFrame/Internal/Types.hs +++ b/src/DataFrame/Internal/Types.hs @@ -19,4 +19,4 @@ import Data.Word ( Word8, Word16, Word32, Word64 ) import Type.Reflection (TypeRep, typeOf, typeRep) import Data.Type.Equality (TestEquality(..)) -type Columnable' a = (Typeable a, Show a, Ord a, Eq a) +type Columnable' a = (Typeable a, Show a, Ord a, Eq a, Read a) diff --git a/src/DataFrame/Lazy.hs b/src/DataFrame/Lazy.hs new file mode 100644 index 00000000..9d69f7f2 --- /dev/null +++ b/src/DataFrame/Lazy.hs @@ -0,0 +1,3 @@ +module DataFrame.Lazy (module DataFrame.Lazy.Internal.DataFrame) where + +import DataFrame.Lazy.Internal.DataFrame diff --git a/src/DataFrame/Lazy/IO/CSV.hs b/src/DataFrame/Lazy/IO/CSV.hs new file mode 100644 index 00000000..ce373c52 --- /dev/null +++ b/src/DataFrame/Lazy/IO/CSV.hs @@ -0,0 +1,327 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ExplicitNamespaces #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE Strict #-} +module DataFrame.Lazy.IO.CSV where + +import qualified Data.ByteString.Char8 as C +import qualified Data.List as L +import qualified Data.Map as M +import qualified Data.Set as S +import qualified Data.Text as T +import qualified Data.Text.Lazy as TL +import qualified Data.Text.Lazy.IO as TLIO +import qualified Data.Text.IO as TIO +import qualified Data.Vector as V +import qualified Data.Vector.Unboxed as VU +import qualified Data.Vector.Mutable as VM +import qualified Data.Vector.Unboxed.Mutable as VUM + +import Control.Applicative ((<$>), (<|>), (<*>), (<*), (*>), many) +import Control.Monad (forM_, zipWithM_, unless, when, void, replicateM_) +import Data.Attoparsec.Text +import Data.Char +import DataFrame.Internal.Column (Column(..), freezeColumn', writeColumn, columnLength) +import DataFrame.Internal.DataFrame (DataFrame(..)) +import DataFrame.Internal.Parsing +import DataFrame.Operations.Typing +import Data.Foldable (fold) +import Data.Function (on) +import Data.IORef +import Data.Maybe +import Data.Text.Encoding (decodeUtf8Lenient) +import Data.Type.Equality + ( TestEquality (testEquality), + type (:~:) (Refl) + ) +import GHC.IO.Handle (Handle) +import Prelude hiding (concat, takeWhile) +import System.IO +import Type.Reflection + +-- | Record for CSV read options. +data ReadOptions = ReadOptions { + hasHeader :: Bool, + inferTypes :: Bool, + safeRead :: Bool, + rowRange :: !(Maybe (Int, Int)), -- (start, length) + seekPos :: !(Maybe Integer), + totalRows :: !(Maybe Int), + leftOver :: !T.Text, + rowsRead :: !Int +} + +-- | By default we assume the file has a header, we infer the types on read +-- and we convert any rows with nullish objects into Maybe (safeRead). +defaultOptions :: ReadOptions +defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing, totalRows = Nothing, leftOver = "", rowsRead = 0 } + +-- | Reads a CSV file from the given path. +-- Note this file stores intermediate temporary files +-- while converting the CSV from a row to a columnar format. +readCsv :: String -> IO DataFrame +readCsv path = fst <$> readSeparated ',' defaultOptions path + +-- | Reads a tab separated file from the given path. +-- Note this file stores intermediate temporary files +-- while converting the CSV from a row to a columnar format. +readTsv :: String -> IO DataFrame +readTsv path = fst <$> readSeparated '\t' defaultOptions path + +-- | Reads a character separated file into a dataframe using mutable vectors. +readSeparated :: Char -> ReadOptions -> String -> IO (DataFrame, (Integer, T.Text, Int)) +readSeparated c opts path = do + totalRows <- case totalRows opts of + Nothing -> countRows c path >>= \total -> if hasHeader opts then return (total - 1) else return total + Just n -> if hasHeader opts then return (n - 1) else return n + let (begin, len) = case rowRange opts of + Nothing -> (0, totalRows) + Just (start, len) -> (start, min len (totalRows - rowsRead opts)) + withFile path ReadMode $ \handle -> do + firstRow <- map T.strip . parseSep c <$> TIO.hGetLine handle + let columnNames = if hasHeader opts + then map (T.filter (/= '\"')) firstRow + else map (T.singleton . intToDigit) [0..(length firstRow - 1)] + -- If there was no header rewind the file cursor. + unless (hasHeader opts) $ hSeek handle AbsoluteSeek 0 + + currPos <- hTell handle + when (isJust $ seekPos opts) $ hSeek handle AbsoluteSeek (fromMaybe currPos (seekPos opts)) + + -- Initialize mutable vectors for each column + let numColumns = length columnNames + let numRows = len + -- Use this row to infer the types of the rest of the column. + -- TODO: this isn't robust but in so far as this is a guess anyway + -- it's probably fine. But we should probably sample n rows and pick + -- the most likely type from the sample. + -- dataRow <- map T.strip . parseSep c . (<>) (leftOver opts) <$> TIO.hGetLine handle + (!dataRow, !remainder) <- readSingleLine c (leftOver opts) handle + + -- This array will track the indices of all null values for each column. + -- If any exist then the column will be an optional type. + nullIndices <- VM.unsafeNew numColumns + VM.set nullIndices [] + mutableCols <- VM.unsafeNew numColumns + getInitialDataVectors numRows mutableCols dataRow + + -- Read rows into the mutable vectors + (!unconsumed, !r) <- fillColumns numRows c mutableCols nullIndices remainder handle + + -- Freeze the mutable vectors into immutable ones + nulls' <- V.unsafeFreeze nullIndices + cols <- V.mapM (freezeColumn mutableCols nulls' opts) (V.generate numColumns id) + pos <- hTell handle + + return (DataFrame { + columns = cols, + freeIndices = [], + columnIndices = M.fromList (zip columnNames [0..]), + dataframeDimensions = (maybe 0 columnLength (cols V.! 0), V.length cols) + }, (pos, unconsumed, r + 1)) +{-# INLINE readSeparated #-} + +getInitialDataVectors :: Int -> VM.IOVector Column -> [T.Text] -> IO () +getInitialDataVectors n mCol xs = do + forM_ (zip [0..] xs) $ \(i, x) -> do + col <- case inferValueType x of + "Int" -> MutableUnboxedColumn <$> ((VUM.unsafeNew n :: IO (VUM.IOVector Int)) >>= \c -> VUM.unsafeWrite c 0 (fromMaybe 0 $ readInt x) >> return c) + "Double" -> MutableUnboxedColumn <$> ((VUM.unsafeNew n :: IO (VUM.IOVector Double)) >>= \c -> VUM.unsafeWrite c 0 (fromMaybe 0 $ readDouble x) >> return c) + _ -> MutableBoxedColumn <$> ((VM.unsafeNew n :: IO (VM.IOVector T.Text)) >>= \c -> VM.unsafeWrite c 0 x >> return c) + VM.unsafeWrite mCol i col +{-# INLINE getInitialDataVectors #-} + +inferValueType :: T.Text -> T.Text +inferValueType s = let + example = s + in case readInt example of + Just _ -> "Int" + Nothing -> case readDouble example of + Just _ -> "Double" + Nothing -> "Other" +{-# INLINE inferValueType #-} + +readSingleLine :: Char -> T.Text -> Handle -> IO ([T.Text], T.Text) +readSingleLine c unused handle = parseWith (TIO.hGetChunk handle) (parseRow c) unused >>= \case + Fail unconsumed ctx er -> do + erpos <- hTell handle + fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " + <> show er <> "; context: " <> show ctx + Partial c -> do + fail "Partial handler is called" + Done (unconsumed :: T.Text) (row :: [T.Text]) -> do + return (row, unconsumed) + +-- | Reads rows from the handle and stores values in mutable vectors. +fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> T.Text -> Handle -> IO (T.Text, Int) +fillColumns n c mutableCols nullIndices unused handle = do + input <- newIORef unused + rowsRead <- newIORef (0 :: Int) + forM_ [1..(n - 1)] $ \i -> do + isEOF <- hIsEOF handle + input' <- readIORef input + unless (isEOF && input' == mempty) $ do + parseWith (TIO.hGetChunk handle) (parseRow c) input' >>= \case + Fail unconsumed ctx er -> do + erpos <- hTell handle + fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " + <> show er <> "; context: " <> show ctx + Partial c -> do + fail "Partial handler is called" + Done (unconsumed :: T.Text) (row :: [T.Text]) -> do + writeIORef input unconsumed + modifyIORef rowsRead (+1) + zipWithM_ (writeValue mutableCols nullIndices i) [0..] row + l <- readIORef input + r <- readIORef rowsRead + pure (l, r) +{-# INLINE fillColumns #-} + +-- | Writes a value into the appropriate column, resizing the vector if necessary. +writeValue :: VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Int -> Int -> T.Text -> IO () +writeValue mutableCols nullIndices count colIndex value = do + col <- VM.unsafeRead mutableCols colIndex + res <- writeColumn count value col + let modify value = VM.unsafeModify nullIndices ((count, value) :) colIndex + either modify (const (return ())) res +{-# INLINE writeValue #-} + +-- | Freezes a mutable vector into an immutable one, trimming it to the actual row count. +freezeColumn :: VM.IOVector Column -> V.Vector [(Int, T.Text)] -> ReadOptions -> Int -> IO (Maybe Column) +freezeColumn mutableCols nulls opts colIndex = do + col <- VM.unsafeRead mutableCols colIndex + Just <$> freezeColumn' (nulls V.! colIndex) col +{-# INLINE freezeColumn #-} + +parseSep :: Char -> T.Text -> [T.Text] +parseSep c s = either error id (parseOnly (record c) s) +{-# INLINE parseSep #-} + +record :: Char -> Parser [T.Text] +record c = + field c `sepBy1` char c + "record" +{-# INLINE record #-} + +parseRow :: Char -> Parser [T.Text] +parseRow c = (record c <* lineEnd) "record-new-line" + +field :: Char -> Parser T.Text +field c = + quotedField <|> unquotedField c + "field" +{-# INLINE field #-} + +unquotedTerminators :: Char -> S.Set Char +unquotedTerminators sep = S.fromList [sep, '\n', '\r', '"'] + +unquotedField :: Char -> Parser T.Text +unquotedField sep = + takeWhile (not . (`S.member` terminators)) "unquoted field" + where terminators = unquotedTerminators sep +{-# INLINE unquotedField #-} + +quotedField :: Parser T.Text +quotedField = char '"' *> contents <* char '"' "quoted field" + where + contents = fold <$> many (unquote <|> unescape) + where + unquote = takeWhile1 (notInClass "\"\\") + unescape = char '\\' *> do + T.singleton <$> do + char '\\' <|> char '"' +{-# INLINE quotedField #-} + +lineEnd :: Parser () +lineEnd = + (endOfLine <|> endOfInput) + "end of line" +{-# INLINE lineEnd #-} + +-- | First pass to count rows for exact allocation +countRows :: Char -> FilePath -> IO Int +countRows c path = withFile path ReadMode $! go 0 "" + where + go !n !input h = do + isEOF <- hIsEOF h + if isEOF && input == mempty + then pure n + else + parseWith (TIO.hGetChunk h) (parseRow c) input >>= \case + Fail unconsumed ctx er -> do + erpos <- hTell h + fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " + <> show er <> "; context: " <> show ctx <> " " <> show unconsumed + Partial c -> do + fail $ "Partial handler is called; n = " <> show n + Done (unconsumed :: T.Text) _ -> + go (n + 1) unconsumed h +{-# INLINE countRows #-} + +writeCsv :: String -> DataFrame -> IO () +writeCsv = writeSeparated ',' + +writeSeparated :: Char -- ^ Separator + -> String -- ^ Path to write to + -> DataFrame + -> IO () +writeSeparated c filepath df = withFile filepath WriteMode $ \handle ->do + let (rows, columns) = dataframeDimensions df + let headers = map fst (L.sortBy (compare `on` snd) (M.toList (columnIndices df))) + TIO.hPutStrLn handle (T.intercalate ", " headers) + forM_ [0..(rows - 1)] $ \i -> do + let row = getRowAsText df i + TIO.hPutStrLn handle (T.intercalate ", " row) + +getRowAsText :: DataFrame -> Int -> [T.Text] +getRowAsText df i = V.ifoldr go [] (columns df) + where + indexMap = M.fromList (map (\(a, b) -> (b, a)) $ M.toList (columnIndices df)) + go k Nothing acc = acc + go k (Just (BoxedColumn (c :: V.Vector a))) acc = case c V.!? i of + Just e -> textRep : acc + where textRep = case testEquality (typeRep @a) (typeRep @T.Text) of + Just Refl -> e + Nothing -> case typeRep @a of + App t1 t2 -> case eqTypeRep t1 (typeRep @Maybe) of + Just HRefl -> case testEquality t2 (typeRep @T.Text) of + Just Refl -> fromMaybe "null" e + Nothing -> (fromOptional . (T.pack . show)) e + where fromOptional s + | T.isPrefixOf "Just " s = T.drop (T.length "Just ") s + | otherwise = "null" + Nothing -> (T.pack . show) e + _ -> (T.pack . show) e + Nothing -> + error $ + "Column " + ++ T.unpack (indexMap M.! k) + ++ " has less items than " + ++ "the other columns at index " + ++ show i + go k (Just (UnboxedColumn c)) acc = case c VU.!? i of + Just e -> T.pack (show e) : acc + Nothing -> + error $ + "Column " + ++ T.unpack (indexMap M.! k) + ++ " has less items than " + ++ "the other columns at index " + ++ show i + go k (Just (OptionalColumn (c :: V.Vector (Maybe a)))) acc = case c V.!? i of + Just e -> textRep : acc + where textRep = case testEquality (typeRep @a) (typeRep @T.Text) of + Just Refl -> fromMaybe "Nothing" e + Nothing -> (T.pack . show) e + Nothing -> + error $ + "Column " + ++ T.unpack (indexMap M.! k) + ++ " has less items than " + ++ "the other columns at index " + ++ show i diff --git a/src/DataFrame/Lazy/Internal/DataFrame.hs b/src/DataFrame/Lazy/Internal/DataFrame.hs new file mode 100644 index 00000000..9f1b1cb8 --- /dev/null +++ b/src/DataFrame/Lazy/Internal/DataFrame.hs @@ -0,0 +1,92 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE Strict #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE NumericUnderscores #-} +module DataFrame.Lazy.Internal.DataFrame where + +import Control.Monad (forM, foldM) +import Data.IORef +import Data.Kind +import qualified Data.List as L +import qualified Data.Map as M +import qualified Data.Text as T +import qualified Data.Vector as V +import qualified DataFrame.Internal.DataFrame as D +import qualified DataFrame.Internal.Column as C +import qualified DataFrame.Internal.Expression as E +import qualified DataFrame.Operations.Core as D +import DataFrame.Operations.Merge +import qualified DataFrame.Operations.Subset as D +import qualified DataFrame.Operations.Transformations as D +import qualified DataFrame.Lazy.IO.CSV as D +import System.FilePath + +data LazyOperation where + Derive :: C.Columnable a => T.Text -> E.Expr a -> LazyOperation + Select :: [T.Text] -> LazyOperation + Filter :: E.Expr Bool -> LazyOperation + +instance Show LazyOperation where + show :: LazyOperation -> String + show (Derive name expr) = T.unpack name ++ " := " ++ show expr + show (Select columns) = "select(" ++ show columns ++ ")" + show (Filter expr) = "filter(" ++ show expr ++ ")" + +data InputType = ICSV deriving Show + +data LazyDataFrame = LazyDataFrame + { inputPath :: FilePath + , inputType :: InputType + , operations :: [LazyOperation] + , batchSize :: Int + } deriving Show + +eval :: LazyOperation -> D.DataFrame -> D.DataFrame +eval (Derive name expr) = D.derive name expr +eval (Select columns) = D.select columns +eval (Filter expr) = D.filterWhere expr + +runDataFrame :: forall a . (C.Columnable a) => LazyDataFrame -> IO D.DataFrame +runDataFrame df = do + let path = inputPath df + totalRows <- D.countRows ',' path + let batches = batchRanges totalRows (batchSize df) + (df', _) <- foldM (\(!accDf, (!pos, !unused, !r)) (!start, !end) -> do + mapM_ putStr ["Scanning: ", show start, " to ", show end, " rows out of ", show totalRows, "\n"] + + (!sdf, (!pos', !unconsumed, !rowsRead)) <- D.readSeparated ',' ( + D.defaultOptions { D.rowRange = Just (start, batchSize df) + , D.totalRows = Just totalRows + , D.seekPos = pos + , D.rowsRead = r + , D.leftOver = unused}) path + let !rdf = L.foldl' (flip eval) sdf (operations df) + return (accDf <> rdf, (Just pos', unconsumed, rowsRead + r)) ) (D.empty, (Nothing, "", 0)) batches + return df' + +batchRanges :: Int -> Int -> [(Int, Int)] +batchRanges n inc = go n [0,inc..n] + where + go _ [] = [] + go n [x] = [(x, n)] + go n (f:s:rest) =(f, s) : go n (s:rest) + +scanCsv :: T.Text -> LazyDataFrame +scanCsv path = LazyDataFrame (T.unpack path) ICSV [] 512_000 + +addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame +addOperation op df = df { operations = operations df ++ [op] } + +derive :: C.Columnable a => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame +derive name expr = addOperation (Derive name expr) + +select :: C.Columnable a => [T.Text] -> LazyDataFrame -> LazyDataFrame +select columns = addOperation (Select columns) + +filter :: C.Columnable a => E.Expr Bool -> LazyDataFrame -> LazyDataFrame +filter cond = addOperation (Filter cond) diff --git a/src/DataFrame/Operations/Core.hs b/src/DataFrame/Operations/Core.hs index cf08b602..94a4692e 100644 --- a/src/DataFrame/Operations/Core.hs +++ b/src/DataFrame/Operations/Core.hs @@ -6,6 +6,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE Strict #-} module DataFrame.Operations.Core where import qualified Data.List as L diff --git a/src/DataFrame/Operations/Merge.hs b/src/DataFrame/Operations/Merge.hs new file mode 100644 index 00000000..121e094f --- /dev/null +++ b/src/DataFrame/Operations/Merge.hs @@ -0,0 +1,39 @@ +{-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE Strict #-} +module DataFrame.Operations.Merge where + +import qualified Data.List as L +import qualified Data.Text as T +import qualified Data.Vector as V +import qualified DataFrame.Internal.Column as D +import qualified DataFrame.Internal.DataFrame as D +import qualified DataFrame.Operations.Core as D + +instance Semigroup D.DataFrame where + (<>) :: D.DataFrame -> D.DataFrame -> D.DataFrame + (<>) a b = let + columnsInBOnly = filter (\c -> c `notElem` D.columnNames b) (D.columnNames b) + columnsInA = D.columnNames a + addColumns a' b' df name + | fst (D.dimensions a') == 0 && fst (D.dimensions b') == 0 = df + | fst (D.dimensions a') == 0 = D.insertColumn' name (D.getColumn name b') df + | fst (D.dimensions b') == 0 = D.insertColumn' name (D.getColumn name a') df + | otherwise = let + numColumnsA = (fst $ D.dimensions a') + numColumnsB = (fst $ D.dimensions b') + numColumns = max numColumnsA numColumnsB + optA = D.getColumn name a' + optB = D.getColumn name b' + in case optB of + Nothing -> case optA of + Nothing -> D.insertColumn' name (Just (D.toColumn ([] :: [T.Text]))) df + Just a'' -> D.insertColumn' name (Just (D.expandColumn numColumnsB a'')) df + Just b'' -> case optA of + Nothing -> D.insertColumn' name (Just (D.leftExpandColumn numColumnsA b'')) df + Just a'' -> D.insertColumn' name (D.concatColumns a'' b'') df + in L.foldl' (addColumns a b) D.empty (D.columnNames a `L.union` D.columnNames b) + +instance Monoid D.DataFrame where + mempty = D.empty + + diff --git a/src/DataFrame/Operations/Subset.hs b/src/DataFrame/Operations/Subset.hs index 3087120a..b62f5e5d 100644 --- a/src/DataFrame/Operations/Subset.hs +++ b/src/DataFrame/Operations/Subset.hs @@ -99,10 +99,10 @@ filterBy = flip filter filterWhere :: Expr Bool -> DataFrame -> DataFrame filterWhere expr df = let (TColumn col) = interpret @Bool df expr - (Just indexes) = ifoldlColumn (\s i satisfied -> if satisfied then S.insert i s else s) S.empty col + (Just indexes) = VU.convert . V.map (fromMaybe 0) . V.filter isJust . toVector @(Maybe Int) <$> itransform (\i satisfied -> if satisfied then Just i else Nothing) col c' = snd $ dataframeDimensions df - pick idxs col = atIndices idxs <$> col - in df {columns = V.map (pick indexes) (columns df), dataframeDimensions = (S.size indexes, c')} + pick idxs col = atIndicesStable idxs <$> col + in df {columns = V.map (pick indexes) (columns df), dataframeDimensions = (VU.length indexes, c')} -- | O(k) removes all rows with `Nothing` in a given column from the dataframe. diff --git a/src/DataFrame/Operations/Transformations.hs b/src/DataFrame/Operations/Transformations.hs index 9b34efc7..cb4d1ad1 100644 --- a/src/DataFrame/Operations/Transformations.hs +++ b/src/DataFrame/Operations/Transformations.hs @@ -3,6 +3,8 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE Strict #-} +{-# LANGUAGE StrictData #-} module DataFrame.Operations.Transformations where import qualified Data.List as L