From bb621f10404b4d757c96ab05b97b85bc8b057404 Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Wed, 18 Jun 2025 08:10:53 -0700 Subject: [PATCH 1/5] Initial skeleton for spilling to disk --- dataframe.cabal | 4 ++- src/DataFrame/Internal/Types.hs | 2 +- src/DataFrame/Lazy/Internal/Column.hs | 35 ++++++++++++++++++++++++ src/DataFrame/Lazy/Internal/DataFrame.hs | 19 +++++++++++++ 4 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 src/DataFrame/Lazy/Internal/Column.hs create mode 100644 src/DataFrame/Lazy/Internal/DataFrame.hs diff --git a/dataframe.cabal b/dataframe.cabal index 7b0f8aa7..9d1cbda0 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -40,13 +40,15 @@ library DataFrame.Operations.Typing, DataFrame.Operations.Aggregation, DataFrame.Display.Terminal.Plot, - DataFrame.IO.CSV + DataFrame.IO.CSV, + DataFrame.Lazy.Internal.Column build-depends: base >= 4.17.2.0 && < 4.22, array ^>= 0.5, attoparsec >= 0.12 && <= 0.14.4, bytestring >= 0.11 && <= 0.12.2.0, containers >= 0.6.7 && < 0.8, directory >= 1.3.0.0 && <= 1.3.9.0, + filepath >= 1.0.0.0 && <= 1.5.4.0, hashable >= 1.2 && <= 1.5.0.0, statistics >= 0.16.2.1 && <= 0.16.3.0, text >= 2.0 && <= 2.1.2, diff --git a/src/DataFrame/Internal/Types.hs b/src/DataFrame/Internal/Types.hs index 2a72e0a7..3a640786 100644 --- a/src/DataFrame/Internal/Types.hs +++ b/src/DataFrame/Internal/Types.hs @@ -19,4 +19,4 @@ import Data.Word ( Word8, Word16, Word32, Word64 ) import Type.Reflection (TypeRep, typeOf, typeRep) import Data.Type.Equality (TestEquality(..)) -type Columnable' a = (Typeable a, Show a, Ord a, Eq a) +type Columnable' a = (Typeable a, Show a, Ord a, Eq a, Read a) diff --git a/src/DataFrame/Lazy/Internal/Column.hs b/src/DataFrame/Lazy/Internal/Column.hs new file mode 100644 index 00000000..3c7224d7 --- /dev/null +++ b/src/DataFrame/Lazy/Internal/Column.hs @@ -0,0 +1,35 @@ +module DataFrame.Lazy.Internal.Column where + +import qualified Data.ByteString.Char8 as C +import Data.IORef +import Data.Sequence +import qualified Data.Vector as V +import Data.Word +import qualified DataFrame.Internal.Column as InMemory +import System.FilePath + +data Block + = InMem !InMemory.Column + | OnDisk !FilePath !Word64 !Word64 -- file, offset, length + deriving (Eq, Show) + +data Column = Column + { blocks :: IORef (Seq Block) + , totalElems :: IORef Int + } + +appendValue :: C.Columnable a => Column -> a -> IO () +appendValue col value = do + buf <- readIORef (blocks col) + case viewr buf of + EmptyR -> newBlockAndInsert + init :> lastBlk -> case lastBlk of + InMem inmemcol | InMemory.columnLength vec < chunkRowTarget DF -> do + writeIORef (blocks col) (init :> InMem (vec |> bs)) + _ -> newBlockAndInsert + bumpBytes (BS.length bs) + where + newBlockAndInsert = do + writeIORef (blocks col) . (:>) =<< pure EmptyR <*> pure (InMem (V.singleton bs)) + + diff --git a/src/DataFrame/Lazy/Internal/DataFrame.hs b/src/DataFrame/Lazy/Internal/DataFrame.hs new file mode 100644 index 00000000..e9fe2ee0 --- /dev/null +++ b/src/DataFrame/Lazy/Internal/DataFrame.hs @@ -0,0 +1,19 @@ +module DataFrame.Lazy.Internal.DataFrame where + +import Data.IORef +import qualified Data.Map as M +import qualified Data.Text as T +import qualified Data.Vector as V +import qualified DataFrame.Lazy.Internal.Column as C +import System.FilePath + +data DataFrame = DataFrame + { columns :: V.Vector (Maybe C.Column) + , columnIndices :: !(M.Map T.Text Int) + , freeIndices :: !(IORef [Int]) + , dataframeDims :: !(IORef (Int, Int)) -- (rows , cols) + , memBudgetBytes :: !Int -- e.g. 512 * 1024 * 1024 + , liveMemBytes :: !(IORef Int) -- updated atomically + , chunkRowTarget :: !Int -- e.g. 100_000 + , spillDir :: !FilePath + } \ No newline at end of file From 8fc3708bd9e2e31212a0bb0325ea119698a683ac Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Sun, 22 Jun 2025 00:02:02 -0700 Subject: [PATCH 2/5] Initial broken but somewhat working version of scan API. --- dataframe.cabal | 3 +- src/DataFrame/IO.hs | 3 + src/DataFrame/IO/CSV.hs | 20 ++++-- src/DataFrame/Internal/Column.hs | 25 +++++++ src/DataFrame/Internal/Expression.hs | 2 +- src/DataFrame/Lazy/Internal/Column.hs | 35 ---------- src/DataFrame/Lazy/Internal/DataFrame.hs | 84 ++++++++++++++++++++---- src/DataFrame/Operations/Merge.hs | 29 ++++++++ 8 files changed, 147 insertions(+), 54 deletions(-) create mode 100644 src/DataFrame/IO.hs delete mode 100644 src/DataFrame/Lazy/Internal/Column.hs create mode 100644 src/DataFrame/Operations/Merge.hs diff --git a/dataframe.cabal b/dataframe.cabal index 9d1cbda0..992817d6 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -33,6 +33,7 @@ library DataFrame.Internal.Row, DataFrame.Errors, DataFrame.Operations.Core, + DataFrame.Operations.Merge, DataFrame.Operations.Subset, DataFrame.Operations.Sorting, DataFrame.Operations.Statistics, @@ -41,7 +42,7 @@ library DataFrame.Operations.Aggregation, DataFrame.Display.Terminal.Plot, DataFrame.IO.CSV, - DataFrame.Lazy.Internal.Column + DataFrame.Lazy.Internal.DataFrame build-depends: base >= 4.17.2.0 && < 4.22, array ^>= 0.5, attoparsec >= 0.12 && <= 0.14.4, diff --git a/src/DataFrame/IO.hs b/src/DataFrame/IO.hs new file mode 100644 index 00000000..50134000 --- /dev/null +++ b/src/DataFrame/IO.hs @@ -0,0 +1,3 @@ +module DataFrame.IO where + +data InputTypes = CSV deriving Show \ No newline at end of file diff --git a/src/DataFrame/IO/CSV.hs b/src/DataFrame/IO/CSV.hs index 661f519c..3fdeed1d 100644 --- a/src/DataFrame/IO/CSV.hs +++ b/src/DataFrame/IO/CSV.hs @@ -23,7 +23,7 @@ import qualified Data.Vector.Mutable as VM import qualified Data.Vector.Unboxed.Mutable as VUM import Control.Applicative ((<$>), (<|>), (<*>), (<*), (*>), many) -import Control.Monad (forM_, zipWithM_, unless, void) +import Control.Monad (forM_, zipWithM_, unless, void, replicateM_) import Data.Attoparsec.Text import Data.Char import DataFrame.Internal.Column (Column(..), freezeColumn', writeColumn, columnLength) @@ -48,13 +48,15 @@ import Type.Reflection data ReadOptions = ReadOptions { hasHeader :: Bool, inferTypes :: Bool, - safeRead :: Bool + safeRead :: Bool, + rowRange :: Maybe (Int, Int), -- (start, length) + seekPos :: Maybe Integer } -- | By default we assume the file has a header, we infer the types on read -- and we convert any rows with nullish objects into Maybe (safeRead). defaultOptions :: ReadOptions -defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True } +defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing } -- | Reads a CSV file from the given path. -- Note this file stores intermediate temporary files @@ -71,7 +73,9 @@ readTsv = readSeparated '\t' defaultOptions -- | Reads a character separated file into a dataframe using mutable vectors. readSeparated :: Char -> ReadOptions -> String -> IO DataFrame readSeparated c opts path = do - totalRows <- countRows c path + (begin, len) <- case rowRange opts of + Nothing -> countRows c path >>= \totalRows -> return (0, if hasHeader opts then totalRows - 1 else totalRows) + Just (start, len) -> return (start, len) withFile path ReadMode $ \handle -> do firstRow <- map T.strip . parseSep c <$> TIO.hGetLine handle let columnNames = if hasHeader opts @@ -80,9 +84,12 @@ readSeparated c opts path = do -- If there was no header rewind the file cursor. unless (hasHeader opts) $ hSeek handle AbsoluteSeek 0 + -- skip columns till `begin` + _ <- replicateM_ begin (TIO.hGetLine handle >> return () ) + -- Initialize mutable vectors for each column let numColumns = length columnNames - let numRows = if hasHeader opts then totalRows - 1 else totalRows + let numRows = len -- Use this row to infer the types of the rest of the column. -- TODO: this isn't robust but in so far as this is a guess anyway -- it's probably fine. But we should probably sample n rows and pick @@ -102,6 +109,7 @@ readSeparated c opts path = do -- Freeze the mutable vectors into immutable ones nulls' <- V.unsafeFreeze nullIndices cols <- V.mapM (freezeColumn mutableCols nulls' opts) (V.generate numColumns id) + return $ DataFrame { columns = cols, freeIndices = [], @@ -134,7 +142,7 @@ inferValueType s = let fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Handle -> IO () fillColumns n c mutableCols nullIndices handle = do input <- newIORef (mempty :: T.Text) - forM_ [1..n] $ \i -> do + forM_ [1..(n - 1)] $ \i -> do isEOF <- hIsEOF handle input' <- readIORef input unless (isEOF && input' == mempty) $ do diff --git a/src/DataFrame/Internal/Column.hs b/src/DataFrame/Internal/Column.hs index b92ae1d0..f58e9190 100644 --- a/src/DataFrame/Internal/Column.hs +++ b/src/DataFrame/Internal/Column.hs @@ -521,6 +521,31 @@ expandColumn n (UnboxedColumn col) = OptionalColumn $ VB.map Just (VU.convert co expandColumn n (GroupedBoxedColumn col) = GroupedBoxedColumn $ col <> VB.replicate n VB.empty expandColumn n (GroupedUnboxedColumn col) = GroupedUnboxedColumn $ col <> VB.replicate n VU.empty +leftExpandColumn :: Int -> Column -> Column +leftExpandColumn n (OptionalColumn col) = OptionalColumn $ VB.replicate n Nothing <> col +leftExpandColumn n (BoxedColumn col) = OptionalColumn $ VB.replicate n Nothing <> VB.map Just col +leftExpandColumn n (UnboxedColumn col) = OptionalColumn $ VB.replicate n Nothing <> VB.map Just (VU.convert col) +leftExpandColumn n (GroupedBoxedColumn col) = GroupedBoxedColumn $ VB.replicate n VB.empty <> col +leftExpandColumn n (GroupedUnboxedColumn col) = GroupedUnboxedColumn $ VB.replicate n VU.empty <> col + +concatColumns :: Column -> Column -> Maybe Column +concatColumns (OptionalColumn left) (OptionalColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (OptionalColumn $ left <> right) +concatColumns (BoxedColumn left) (BoxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (BoxedColumn $ left <> right) +concatColumns (UnboxedColumn left) (UnboxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (UnboxedColumn $ left <> right) +concatColumns (GroupedBoxedColumn left) (GroupedBoxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (GroupedBoxedColumn $ left <> right) +concatColumns (GroupedUnboxedColumn left) (GroupedUnboxedColumn right) = case testEquality (typeOf left) (typeOf right) of + Nothing -> Nothing + Just Refl -> Just (GroupedUnboxedColumn $ left <> right) +concatColumns _ _ = Nothing + toVector :: forall a . Columnable a => Column -> VB.Vector a toVector column@(OptionalColumn (col :: VB.Vector b)) = case testEquality (typeRep @a) (typeRep @b) of diff --git a/src/DataFrame/Internal/Expression.hs b/src/DataFrame/Internal/Expression.hs index 4772b078..ee9d1bea 100644 --- a/src/DataFrame/Internal/Expression.hs +++ b/src/DataFrame/Internal/Expression.hs @@ -32,7 +32,7 @@ data Expr a where Apply :: (Columnable a, Columnable b) => T.Text -> (b -> a) -> Expr b -> Expr a BinOp :: (Columnable c, Columnable b, Columnable a) => T.Text -> (c -> b -> a) -> Expr c -> Expr b -> Expr a -interpret :: forall a b . (Columnable a) => DataFrame -> Expr a -> TypedColumn a +interpret :: forall a . (Columnable a) => DataFrame -> Expr a -> TypedColumn a interpret df (Lit value) = TColumn $ toColumn' $ V.replicate (fst $ dataframeDimensions df) value interpret df (Col name) = case getColumn name df of Nothing -> throw $ ColumnNotFoundException name "" (map fst $ M.toList $ columnIndices df) diff --git a/src/DataFrame/Lazy/Internal/Column.hs b/src/DataFrame/Lazy/Internal/Column.hs deleted file mode 100644 index 3c7224d7..00000000 --- a/src/DataFrame/Lazy/Internal/Column.hs +++ /dev/null @@ -1,35 +0,0 @@ -module DataFrame.Lazy.Internal.Column where - -import qualified Data.ByteString.Char8 as C -import Data.IORef -import Data.Sequence -import qualified Data.Vector as V -import Data.Word -import qualified DataFrame.Internal.Column as InMemory -import System.FilePath - -data Block - = InMem !InMemory.Column - | OnDisk !FilePath !Word64 !Word64 -- file, offset, length - deriving (Eq, Show) - -data Column = Column - { blocks :: IORef (Seq Block) - , totalElems :: IORef Int - } - -appendValue :: C.Columnable a => Column -> a -> IO () -appendValue col value = do - buf <- readIORef (blocks col) - case viewr buf of - EmptyR -> newBlockAndInsert - init :> lastBlk -> case lastBlk of - InMem inmemcol | InMemory.columnLength vec < chunkRowTarget DF -> do - writeIORef (blocks col) (init :> InMem (vec |> bs)) - _ -> newBlockAndInsert - bumpBytes (BS.length bs) - where - newBlockAndInsert = do - writeIORef (blocks col) . (:>) =<< pure EmptyR <*> pure (InMem (V.singleton bs)) - - diff --git a/src/DataFrame/Lazy/Internal/DataFrame.hs b/src/DataFrame/Lazy/Internal/DataFrame.hs index e9fe2ee0..c66c6c1b 100644 --- a/src/DataFrame/Lazy/Internal/DataFrame.hs +++ b/src/DataFrame/Lazy/Internal/DataFrame.hs @@ -1,19 +1,81 @@ +{-# LANGUAGE GADTs #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE NumericUnderscores #-} module DataFrame.Lazy.Internal.DataFrame where +import Control.Monad (forM_) import Data.IORef +import Data.Kind import qualified Data.Map as M import qualified Data.Text as T import qualified Data.Vector as V -import qualified DataFrame.Lazy.Internal.Column as C +import qualified DataFrame.Internal.DataFrame as D +import qualified DataFrame.Internal.Column as C +import qualified DataFrame.Internal.Expression as E +import qualified DataFrame.Operations.Core as D +import qualified DataFrame.Operations.Subset as D +import qualified DataFrame.Operations.Transformations as D +import qualified DataFrame.IO.CSV as D import System.FilePath -data DataFrame = DataFrame - { columns :: V.Vector (Maybe C.Column) - , columnIndices :: !(M.Map T.Text Int) - , freeIndices :: !(IORef [Int]) - , dataframeDims :: !(IORef (Int, Int)) -- (rows , cols) - , memBudgetBytes :: !Int -- e.g. 512 * 1024 * 1024 - , liveMemBytes :: !(IORef Int) -- updated atomically - , chunkRowTarget :: !Int -- e.g. 100_000 - , spillDir :: !FilePath - } \ No newline at end of file +data LazyOperation where + Derive :: C.Columnable a => T.Text -> E.Expr a -> LazyOperation + Select :: [T.Text] -> LazyOperation + Filter :: E.Expr Bool -> LazyOperation + +instance Show LazyOperation where + show :: LazyOperation -> String + show (Derive name expr) = T.unpack name ++ " := " ++ show expr + show (Select columns) = "select(" ++ show columns ++ ")" + show (Filter expr) = "filter(" ++ show expr ++ ")" + +data InputType = ICSV deriving Show + +data LazyDataFrame = LazyDataFrame + { inputPath :: FilePath + , inputType :: InputType + , operations :: [LazyOperation] + , batchSize :: Int + } deriving Show + +eval :: LazyOperation -> D.DataFrame -> D.DataFrame +eval (Derive name expr) = D.derive name expr +eval (Select columns) = D.select columns +eval (Filter expr) = D.filterWhere expr + +runDataFrame :: forall a . (C.Columnable a) => LazyDataFrame -> IO D.DataFrame +runDataFrame df = do + let path = inputPath df + -- totalRows <- D.countRows ',' path + let batches = batchRanges 1000000 (batchSize df) + _ <- forM_ batches $ \ (start, end) -> do + -- TODO: implement specific read operations for batching that returns a seek instead of re-reading everything. + sdf <- D.readSeparated ',' (D.defaultOptions { D.rowRange = Just (start, (batchSize df)) }) path + let rdf = foldl' (\d op -> eval op d) sdf (operations df) + if fst (D.dimensions rdf) == 0 then return () else print rdf + return (D.empty) + +batchRanges :: Int -> Int -> [(Int, Int)] +batchRanges n inc = go n [0,inc..n] + where + go _ [] = [] + go n [x] = [(x, n)] + go n (f:s:rest) =(f, s) : go n (s:rest) + +scanCsv :: T.Text -> LazyDataFrame +scanCsv path = LazyDataFrame (T.unpack path) ICSV [] 1024 + +addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame +addOperation op df = df { operations = (operations df) ++ [op] } + +derive :: C.Columnable a => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame +derive name expr = addOperation (Derive name expr) + +select :: C.Columnable a => [T.Text] -> LazyDataFrame -> LazyDataFrame +select columns = addOperation (Select columns) + +filter :: C.Columnable a => E.Expr Bool -> LazyDataFrame -> LazyDataFrame +filter cond = addOperation (Filter cond) diff --git a/src/DataFrame/Operations/Merge.hs b/src/DataFrame/Operations/Merge.hs new file mode 100644 index 00000000..ab241983 --- /dev/null +++ b/src/DataFrame/Operations/Merge.hs @@ -0,0 +1,29 @@ +{-# LANGUAGE InstanceSigs #-} +module DataFrame.Operations.Merge where + +import qualified Data.List as L +import qualified Data.Text as T +import qualified Data.Vector as V +import qualified DataFrame.Internal.Column as D +import qualified DataFrame.Internal.DataFrame as D +import qualified DataFrame.Operations.Core as D + +instance Semigroup D.DataFrame where + (<>) :: D.DataFrame -> D.DataFrame -> D.DataFrame + (<>) a b = let + columnsInBOnly = filter (\c -> not (c `elem` (D.columnNames b))) (D.columnNames b) + columnsInA = D.columnNames a + addColumns a' b' df name = let + numColumnsA = (fst $ D.dimensions a') + numColumnsB = (fst $ D.dimensions b') + numColumns = max numColumnsA numColumnsB + optA = D.getColumn name a' + optB = D.getColumn name b' + in case optB of + Nothing -> case optA of + Nothing -> D.insertColumn' name (Just (D.toColumn ([] :: [T.Text]))) df + Just a'' -> D.insertColumn' name (Just (D.expandColumn numColumnsB a'')) df + Just b'' -> case optA of + Nothing -> D.insertColumn' name (Just (D.leftExpandColumn numColumnsA b'')) df + Just a'' -> D.insertColumn' name (D.concatColumns a'' b'') df + in foldl' (addColumns a b) D.empty (L.union (D.columnNames a) (D.columnNames b)) From 3a395b02c638669cabd9adc02b157c45a9714cb3 Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Mon, 23 Jun 2025 12:23:14 -0700 Subject: [PATCH 3/5] Track more state when reading file to avoid unnecessary IO. --- src/DataFrame/IO/CSV.hs | 64 ++++++++++++++------- src/DataFrame/Internal/Column.hs | 2 +- src/DataFrame/Internal/DataFrame.hs | 2 +- src/DataFrame/Lazy/Internal/DataFrame.hs | 37 +++++++----- src/DataFrame/Operations/Core.hs | 1 + src/DataFrame/Operations/Merge.hs | 16 +++++- src/DataFrame/Operations/Subset.hs | 6 +- src/DataFrame/Operations/Transformations.hs | 2 + 8 files changed, 89 insertions(+), 41 deletions(-) diff --git a/src/DataFrame/IO/CSV.hs b/src/DataFrame/IO/CSV.hs index 3fdeed1d..fe7e71fe 100644 --- a/src/DataFrame/IO/CSV.hs +++ b/src/DataFrame/IO/CSV.hs @@ -23,7 +23,7 @@ import qualified Data.Vector.Mutable as VM import qualified Data.Vector.Unboxed.Mutable as VUM import Control.Applicative ((<$>), (<|>), (<*>), (<*), (*>), many) -import Control.Monad (forM_, zipWithM_, unless, void, replicateM_) +import Control.Monad (forM_, zipWithM_, unless, when, void, replicateM_) import Data.Attoparsec.Text import Data.Char import DataFrame.Internal.Column (Column(..), freezeColumn', writeColumn, columnLength) @@ -49,33 +49,39 @@ data ReadOptions = ReadOptions { hasHeader :: Bool, inferTypes :: Bool, safeRead :: Bool, - rowRange :: Maybe (Int, Int), -- (start, length) - seekPos :: Maybe Integer + rowRange :: !(Maybe (Int, Int)), -- (start, length) + seekPos :: !(Maybe Integer), + totalRows :: !(Maybe Int), + leftOver :: !T.Text, + rowsRead :: !Int } -- | By default we assume the file has a header, we infer the types on read -- and we convert any rows with nullish objects into Maybe (safeRead). defaultOptions :: ReadOptions -defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing } +defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing, totalRows = Nothing, leftOver = "", rowsRead = 0 } -- | Reads a CSV file from the given path. -- Note this file stores intermediate temporary files -- while converting the CSV from a row to a columnar format. readCsv :: String -> IO DataFrame -readCsv = readSeparated ',' defaultOptions +readCsv path = fst <$> readSeparated ',' defaultOptions path -- | Reads a tab separated file from the given path. -- Note this file stores intermediate temporary files -- while converting the CSV from a row to a columnar format. readTsv :: String -> IO DataFrame -readTsv = readSeparated '\t' defaultOptions +readTsv path = fst <$> readSeparated '\t' defaultOptions path -- | Reads a character separated file into a dataframe using mutable vectors. -readSeparated :: Char -> ReadOptions -> String -> IO DataFrame +readSeparated :: Char -> ReadOptions -> String -> IO (DataFrame, (Integer, T.Text, Int)) readSeparated c opts path = do - (begin, len) <- case rowRange opts of - Nothing -> countRows c path >>= \totalRows -> return (0, if hasHeader opts then totalRows - 1 else totalRows) - Just (start, len) -> return (start, len) + totalRows <- case totalRows opts of + Nothing -> countRows c path >>= \total -> if hasHeader opts then return (total - 1) else return total + Just n -> if hasHeader opts then return (n - 1) else return n + let (begin, len) = case rowRange opts of + Nothing -> (0, totalRows) + Just (start, len) -> (start, min len (totalRows - rowsRead opts)) withFile path ReadMode $ \handle -> do firstRow <- map T.strip . parseSep c <$> TIO.hGetLine handle let columnNames = if hasHeader opts @@ -84,17 +90,18 @@ readSeparated c opts path = do -- If there was no header rewind the file cursor. unless (hasHeader opts) $ hSeek handle AbsoluteSeek 0 - -- skip columns till `begin` - _ <- replicateM_ begin (TIO.hGetLine handle >> return () ) + currPos <- hTell handle + when (isJust $ seekPos opts) $ hSeek handle AbsoluteSeek (fromMaybe currPos (seekPos opts)) -- Initialize mutable vectors for each column let numColumns = length columnNames - let numRows = len + let numRows = len -- Use this row to infer the types of the rest of the column. -- TODO: this isn't robust but in so far as this is a guess anyway -- it's probably fine. But we should probably sample n rows and pick -- the most likely type from the sample. - dataRow <- map T.strip . parseSep c <$> TIO.hGetLine handle + -- dataRow <- map T.strip . parseSep c . (<>) (leftOver opts) <$> TIO.hGetLine handle + (!dataRow, !remainder) <- readSingleLine c (leftOver opts) handle -- This array will track the indices of all null values for each column. -- If any exist then the column will be an optional type. @@ -104,18 +111,19 @@ readSeparated c opts path = do getInitialDataVectors numRows mutableCols dataRow -- Read rows into the mutable vectors - fillColumns numRows c mutableCols nullIndices handle + (!unconsumed, !r) <- fillColumns numRows c mutableCols nullIndices remainder handle -- Freeze the mutable vectors into immutable ones nulls' <- V.unsafeFreeze nullIndices cols <- V.mapM (freezeColumn mutableCols nulls' opts) (V.generate numColumns id) + pos <- hTell handle - return $ DataFrame { + return (DataFrame { columns = cols, freeIndices = [], columnIndices = M.fromList (zip columnNames [0..]), dataframeDimensions = (maybe 0 columnLength (cols V.! 0), V.length cols) - } + }, (pos, unconsumed, r + 1)) {-# INLINE readSeparated #-} getInitialDataVectors :: Int -> VM.IOVector Column -> [T.Text] -> IO () @@ -138,10 +146,22 @@ inferValueType s = let Nothing -> "Other" {-# INLINE inferValueType #-} +readSingleLine :: Char -> T.Text -> Handle -> IO ([T.Text], T.Text) +readSingleLine c unused handle = parseWith (TIO.hGetChunk handle) (parseRow c) unused >>= \case + Fail unconsumed ctx er -> do + erpos <- hTell handle + fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " + <> show er <> "; context: " <> show ctx + Partial c -> do + fail "Partial handler is called" + Done (unconsumed :: T.Text) (row :: [T.Text]) -> do + return (row, unconsumed) + -- | Reads rows from the handle and stores values in mutable vectors. -fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Handle -> IO () -fillColumns n c mutableCols nullIndices handle = do - input <- newIORef (mempty :: T.Text) +fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> T.Text -> Handle -> IO (T.Text, Int) +fillColumns n c mutableCols nullIndices unused handle = do + input <- newIORef unused + rowsRead <- newIORef (0 :: Int) forM_ [1..(n - 1)] $ \i -> do isEOF <- hIsEOF handle input' <- readIORef input @@ -155,7 +175,11 @@ fillColumns n c mutableCols nullIndices handle = do fail "Partial handler is called" Done (unconsumed :: T.Text) (row :: [T.Text]) -> do writeIORef input unconsumed + modifyIORef rowsRead (+1) zipWithM_ (writeValue mutableCols nullIndices i) [0..] row + l <- readIORef input + r <- readIORef rowsRead + pure (l, r) {-# INLINE fillColumns #-} -- | Writes a value into the appropriate column, resizing the vector if necessary. diff --git a/src/DataFrame/Internal/Column.hs b/src/DataFrame/Internal/Column.hs index f58e9190..4f6609ed 100644 --- a/src/DataFrame/Internal/Column.hs +++ b/src/DataFrame/Internal/Column.hs @@ -4,7 +4,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE StrictData #-} +{-# LANGUAGE Strict #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} diff --git a/src/DataFrame/Internal/DataFrame.hs b/src/DataFrame/Internal/DataFrame.hs index cadd2113..4406badb 100644 --- a/src/DataFrame/Internal/DataFrame.hs +++ b/src/DataFrame/Internal/DataFrame.hs @@ -4,7 +4,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE GADTs #-} -{-# LANGUAGE StrictData #-} +{-# LANGUAGE Strict #-} {-# LANGUAGE FlexibleContexts #-} module DataFrame.Internal.DataFrame where diff --git a/src/DataFrame/Lazy/Internal/DataFrame.hs b/src/DataFrame/Lazy/Internal/DataFrame.hs index c66c6c1b..64417c9a 100644 --- a/src/DataFrame/Lazy/Internal/DataFrame.hs +++ b/src/DataFrame/Lazy/Internal/DataFrame.hs @@ -3,12 +3,16 @@ {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE Strict #-} +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NumericUnderscores #-} module DataFrame.Lazy.Internal.DataFrame where -import Control.Monad (forM_) +import Control.Monad (forM, foldM) import Data.IORef import Data.Kind +import qualified Data.List as L import qualified Data.Map as M import qualified Data.Text as T import qualified Data.Vector as V @@ -16,6 +20,7 @@ import qualified DataFrame.Internal.DataFrame as D import qualified DataFrame.Internal.Column as C import qualified DataFrame.Internal.Expression as E import qualified DataFrame.Operations.Core as D +import DataFrame.Operations.Merge import qualified DataFrame.Operations.Subset as D import qualified DataFrame.Operations.Transformations as D import qualified DataFrame.IO.CSV as D @@ -37,7 +42,7 @@ data InputType = ICSV deriving Show data LazyDataFrame = LazyDataFrame { inputPath :: FilePath , inputType :: InputType - , operations :: [LazyOperation] + , operations :: [LazyOperation] , batchSize :: Int } deriving Show @@ -49,27 +54,33 @@ eval (Filter expr) = D.filterWhere expr runDataFrame :: forall a . (C.Columnable a) => LazyDataFrame -> IO D.DataFrame runDataFrame df = do let path = inputPath df - -- totalRows <- D.countRows ',' path - let batches = batchRanges 1000000 (batchSize df) - _ <- forM_ batches $ \ (start, end) -> do - -- TODO: implement specific read operations for batching that returns a seek instead of re-reading everything. - sdf <- D.readSeparated ',' (D.defaultOptions { D.rowRange = Just (start, (batchSize df)) }) path - let rdf = foldl' (\d op -> eval op d) sdf (operations df) - if fst (D.dimensions rdf) == 0 then return () else print rdf - return (D.empty) + totalRows <- D.countRows ',' path + let batches = batchRanges totalRows (batchSize df) + (df', _) <- foldM (\(!accDf, (!pos, !unused, !r)) (!start, !end) -> do + mapM_ putStr ["Scanning: ", show start, " to ", show end, " rows out of ", show totalRows, "\n"] + + (!sdf, (!pos', !unconsumed, !rowsRead)) <- D.readSeparated ',' ( + D.defaultOptions { D.rowRange = Just (start, batchSize df) + , D.totalRows = Just totalRows + , D.seekPos = pos + , D.rowsRead = r + , D.leftOver = unused}) path + let !rdf = L.foldl' (flip eval) sdf (operations df) + return (accDf <> rdf, (Just pos', unconsumed, rowsRead + r)) ) (D.empty, (Nothing, "", 0)) batches + return df' batchRanges :: Int -> Int -> [(Int, Int)] batchRanges n inc = go n [0,inc..n] - where + where go _ [] = [] go n [x] = [(x, n)] go n (f:s:rest) =(f, s) : go n (s:rest) scanCsv :: T.Text -> LazyDataFrame -scanCsv path = LazyDataFrame (T.unpack path) ICSV [] 1024 +scanCsv path = LazyDataFrame (T.unpack path) ICSV [] 512_000 addOperation :: LazyOperation -> LazyDataFrame -> LazyDataFrame -addOperation op df = df { operations = (operations df) ++ [op] } +addOperation op df = df { operations = operations df ++ [op] } derive :: C.Columnable a => T.Text -> E.Expr a -> LazyDataFrame -> LazyDataFrame derive name expr = addOperation (Derive name expr) diff --git a/src/DataFrame/Operations/Core.hs b/src/DataFrame/Operations/Core.hs index cf08b602..94a4692e 100644 --- a/src/DataFrame/Operations/Core.hs +++ b/src/DataFrame/Operations/Core.hs @@ -6,6 +6,7 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE BangPatterns #-} +{-# LANGUAGE Strict #-} module DataFrame.Operations.Core where import qualified Data.List as L diff --git a/src/DataFrame/Operations/Merge.hs b/src/DataFrame/Operations/Merge.hs index ab241983..121e094f 100644 --- a/src/DataFrame/Operations/Merge.hs +++ b/src/DataFrame/Operations/Merge.hs @@ -1,4 +1,5 @@ {-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE Strict #-} module DataFrame.Operations.Merge where import qualified Data.List as L @@ -11,9 +12,13 @@ import qualified DataFrame.Operations.Core as D instance Semigroup D.DataFrame where (<>) :: D.DataFrame -> D.DataFrame -> D.DataFrame (<>) a b = let - columnsInBOnly = filter (\c -> not (c `elem` (D.columnNames b))) (D.columnNames b) + columnsInBOnly = filter (\c -> c `notElem` D.columnNames b) (D.columnNames b) columnsInA = D.columnNames a - addColumns a' b' df name = let + addColumns a' b' df name + | fst (D.dimensions a') == 0 && fst (D.dimensions b') == 0 = df + | fst (D.dimensions a') == 0 = D.insertColumn' name (D.getColumn name b') df + | fst (D.dimensions b') == 0 = D.insertColumn' name (D.getColumn name a') df + | otherwise = let numColumnsA = (fst $ D.dimensions a') numColumnsB = (fst $ D.dimensions b') numColumns = max numColumnsA numColumnsB @@ -26,4 +31,9 @@ instance Semigroup D.DataFrame where Just b'' -> case optA of Nothing -> D.insertColumn' name (Just (D.leftExpandColumn numColumnsA b'')) df Just a'' -> D.insertColumn' name (D.concatColumns a'' b'') df - in foldl' (addColumns a b) D.empty (L.union (D.columnNames a) (D.columnNames b)) + in L.foldl' (addColumns a b) D.empty (D.columnNames a `L.union` D.columnNames b) + +instance Monoid D.DataFrame where + mempty = D.empty + + diff --git a/src/DataFrame/Operations/Subset.hs b/src/DataFrame/Operations/Subset.hs index 3087120a..b62f5e5d 100644 --- a/src/DataFrame/Operations/Subset.hs +++ b/src/DataFrame/Operations/Subset.hs @@ -99,10 +99,10 @@ filterBy = flip filter filterWhere :: Expr Bool -> DataFrame -> DataFrame filterWhere expr df = let (TColumn col) = interpret @Bool df expr - (Just indexes) = ifoldlColumn (\s i satisfied -> if satisfied then S.insert i s else s) S.empty col + (Just indexes) = VU.convert . V.map (fromMaybe 0) . V.filter isJust . toVector @(Maybe Int) <$> itransform (\i satisfied -> if satisfied then Just i else Nothing) col c' = snd $ dataframeDimensions df - pick idxs col = atIndices idxs <$> col - in df {columns = V.map (pick indexes) (columns df), dataframeDimensions = (S.size indexes, c')} + pick idxs col = atIndicesStable idxs <$> col + in df {columns = V.map (pick indexes) (columns df), dataframeDimensions = (VU.length indexes, c')} -- | O(k) removes all rows with `Nothing` in a given column from the dataframe. diff --git a/src/DataFrame/Operations/Transformations.hs b/src/DataFrame/Operations/Transformations.hs index 9b34efc7..cb4d1ad1 100644 --- a/src/DataFrame/Operations/Transformations.hs +++ b/src/DataFrame/Operations/Transformations.hs @@ -3,6 +3,8 @@ {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE Strict #-} +{-# LANGUAGE StrictData #-} module DataFrame.Operations.Transformations where import qualified Data.List as L From 4014745a38c3e3032759ab6335153a0dc3cb9529 Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Mon, 23 Jun 2025 12:26:11 -0700 Subject: [PATCH 4/5] Expose "Lazy" module as part of public API --- dataframe.cabal | 3 ++- src/DataFrame/Lazy.hs | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 src/DataFrame/Lazy.hs diff --git a/dataframe.cabal b/dataframe.cabal index 992817d6..f7924f3d 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -22,7 +22,8 @@ source-repository head location: https://github.com/mchav/dataframe library - exposed-modules: DataFrame + exposed-modules: DataFrame, + DataFrame.Lazy other-modules: DataFrame.Internal.Types, DataFrame.Internal.Expression, DataFrame.Internal.Parsing, diff --git a/src/DataFrame/Lazy.hs b/src/DataFrame/Lazy.hs new file mode 100644 index 00000000..d5aa813f --- /dev/null +++ b/src/DataFrame/Lazy.hs @@ -0,0 +1,3 @@ +module DataFrame.Lazy (module DataFrame.Lazy.Internal.DataFrame) where + + From 09c5cf743e175deb0239b7154f84084e857a2f74 Mon Sep 17 00:00:00 2001 From: Michael Chavinda Date: Mon, 23 Jun 2025 13:07:41 -0700 Subject: [PATCH 5/5] Separate Lazy and Eager CSV IO. --- dataframe.cabal | 4 +- src/DataFrame/IO/CSV.hs | 64 ++--- src/DataFrame/Lazy.hs | 2 +- src/DataFrame/Lazy/IO/CSV.hs | 327 +++++++++++++++++++++++ src/DataFrame/Lazy/Internal/DataFrame.hs | 2 +- 5 files changed, 348 insertions(+), 51 deletions(-) create mode 100644 src/DataFrame/Lazy/IO/CSV.hs diff --git a/dataframe.cabal b/dataframe.cabal index f7924f3d..542c6448 100644 --- a/dataframe.cabal +++ b/dataframe.cabal @@ -43,6 +43,7 @@ library DataFrame.Operations.Aggregation, DataFrame.Display.Terminal.Plot, DataFrame.IO.CSV, + DataFrame.Lazy.IO.CSV, DataFrame.Lazy.Internal.DataFrame build-depends: base >= 4.17.2.0 && < 4.22, array ^>= 0.5, @@ -80,7 +81,8 @@ executable dataframe DataFrame.Operations.Typing, DataFrame.Operations.Aggregation, DataFrame.Display.Terminal.Plot, - DataFrame.IO.CSV + DataFrame.IO.CSV, + DataFrame.Lazy.IO.CSV build-depends: base >= 4.17.2.0 && < 4.22, array ^>= 0.5, attoparsec >= 0.12 && <= 0.14.4, diff --git a/src/DataFrame/IO/CSV.hs b/src/DataFrame/IO/CSV.hs index fe7e71fe..661f519c 100644 --- a/src/DataFrame/IO/CSV.hs +++ b/src/DataFrame/IO/CSV.hs @@ -23,7 +23,7 @@ import qualified Data.Vector.Mutable as VM import qualified Data.Vector.Unboxed.Mutable as VUM import Control.Applicative ((<$>), (<|>), (<*>), (<*), (*>), many) -import Control.Monad (forM_, zipWithM_, unless, when, void, replicateM_) +import Control.Monad (forM_, zipWithM_, unless, void) import Data.Attoparsec.Text import Data.Char import DataFrame.Internal.Column (Column(..), freezeColumn', writeColumn, columnLength) @@ -48,40 +48,30 @@ import Type.Reflection data ReadOptions = ReadOptions { hasHeader :: Bool, inferTypes :: Bool, - safeRead :: Bool, - rowRange :: !(Maybe (Int, Int)), -- (start, length) - seekPos :: !(Maybe Integer), - totalRows :: !(Maybe Int), - leftOver :: !T.Text, - rowsRead :: !Int + safeRead :: Bool } -- | By default we assume the file has a header, we infer the types on read -- and we convert any rows with nullish objects into Maybe (safeRead). defaultOptions :: ReadOptions -defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing, totalRows = Nothing, leftOver = "", rowsRead = 0 } +defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True } -- | Reads a CSV file from the given path. -- Note this file stores intermediate temporary files -- while converting the CSV from a row to a columnar format. readCsv :: String -> IO DataFrame -readCsv path = fst <$> readSeparated ',' defaultOptions path +readCsv = readSeparated ',' defaultOptions -- | Reads a tab separated file from the given path. -- Note this file stores intermediate temporary files -- while converting the CSV from a row to a columnar format. readTsv :: String -> IO DataFrame -readTsv path = fst <$> readSeparated '\t' defaultOptions path +readTsv = readSeparated '\t' defaultOptions -- | Reads a character separated file into a dataframe using mutable vectors. -readSeparated :: Char -> ReadOptions -> String -> IO (DataFrame, (Integer, T.Text, Int)) +readSeparated :: Char -> ReadOptions -> String -> IO DataFrame readSeparated c opts path = do - totalRows <- case totalRows opts of - Nothing -> countRows c path >>= \total -> if hasHeader opts then return (total - 1) else return total - Just n -> if hasHeader opts then return (n - 1) else return n - let (begin, len) = case rowRange opts of - Nothing -> (0, totalRows) - Just (start, len) -> (start, min len (totalRows - rowsRead opts)) + totalRows <- countRows c path withFile path ReadMode $ \handle -> do firstRow <- map T.strip . parseSep c <$> TIO.hGetLine handle let columnNames = if hasHeader opts @@ -90,18 +80,14 @@ readSeparated c opts path = do -- If there was no header rewind the file cursor. unless (hasHeader opts) $ hSeek handle AbsoluteSeek 0 - currPos <- hTell handle - when (isJust $ seekPos opts) $ hSeek handle AbsoluteSeek (fromMaybe currPos (seekPos opts)) - -- Initialize mutable vectors for each column let numColumns = length columnNames - let numRows = len + let numRows = if hasHeader opts then totalRows - 1 else totalRows -- Use this row to infer the types of the rest of the column. -- TODO: this isn't robust but in so far as this is a guess anyway -- it's probably fine. But we should probably sample n rows and pick -- the most likely type from the sample. - -- dataRow <- map T.strip . parseSep c . (<>) (leftOver opts) <$> TIO.hGetLine handle - (!dataRow, !remainder) <- readSingleLine c (leftOver opts) handle + dataRow <- map T.strip . parseSep c <$> TIO.hGetLine handle -- This array will track the indices of all null values for each column. -- If any exist then the column will be an optional type. @@ -111,19 +97,17 @@ readSeparated c opts path = do getInitialDataVectors numRows mutableCols dataRow -- Read rows into the mutable vectors - (!unconsumed, !r) <- fillColumns numRows c mutableCols nullIndices remainder handle + fillColumns numRows c mutableCols nullIndices handle -- Freeze the mutable vectors into immutable ones nulls' <- V.unsafeFreeze nullIndices cols <- V.mapM (freezeColumn mutableCols nulls' opts) (V.generate numColumns id) - pos <- hTell handle - - return (DataFrame { + return $ DataFrame { columns = cols, freeIndices = [], columnIndices = M.fromList (zip columnNames [0..]), dataframeDimensions = (maybe 0 columnLength (cols V.! 0), V.length cols) - }, (pos, unconsumed, r + 1)) + } {-# INLINE readSeparated #-} getInitialDataVectors :: Int -> VM.IOVector Column -> [T.Text] -> IO () @@ -146,23 +130,11 @@ inferValueType s = let Nothing -> "Other" {-# INLINE inferValueType #-} -readSingleLine :: Char -> T.Text -> Handle -> IO ([T.Text], T.Text) -readSingleLine c unused handle = parseWith (TIO.hGetChunk handle) (parseRow c) unused >>= \case - Fail unconsumed ctx er -> do - erpos <- hTell handle - fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " - <> show er <> "; context: " <> show ctx - Partial c -> do - fail "Partial handler is called" - Done (unconsumed :: T.Text) (row :: [T.Text]) -> do - return (row, unconsumed) - -- | Reads rows from the handle and stores values in mutable vectors. -fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> T.Text -> Handle -> IO (T.Text, Int) -fillColumns n c mutableCols nullIndices unused handle = do - input <- newIORef unused - rowsRead <- newIORef (0 :: Int) - forM_ [1..(n - 1)] $ \i -> do +fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Handle -> IO () +fillColumns n c mutableCols nullIndices handle = do + input <- newIORef (mempty :: T.Text) + forM_ [1..n] $ \i -> do isEOF <- hIsEOF handle input' <- readIORef input unless (isEOF && input' == mempty) $ do @@ -175,11 +147,7 @@ fillColumns n c mutableCols nullIndices unused handle = do fail "Partial handler is called" Done (unconsumed :: T.Text) (row :: [T.Text]) -> do writeIORef input unconsumed - modifyIORef rowsRead (+1) zipWithM_ (writeValue mutableCols nullIndices i) [0..] row - l <- readIORef input - r <- readIORef rowsRead - pure (l, r) {-# INLINE fillColumns #-} -- | Writes a value into the appropriate column, resizing the vector if necessary. diff --git a/src/DataFrame/Lazy.hs b/src/DataFrame/Lazy.hs index d5aa813f..9d69f7f2 100644 --- a/src/DataFrame/Lazy.hs +++ b/src/DataFrame/Lazy.hs @@ -1,3 +1,3 @@ module DataFrame.Lazy (module DataFrame.Lazy.Internal.DataFrame) where - +import DataFrame.Lazy.Internal.DataFrame diff --git a/src/DataFrame/Lazy/IO/CSV.hs b/src/DataFrame/Lazy/IO/CSV.hs new file mode 100644 index 00000000..ce373c52 --- /dev/null +++ b/src/DataFrame/Lazy/IO/CSV.hs @@ -0,0 +1,327 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE ExplicitNamespaces #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE Strict #-} +module DataFrame.Lazy.IO.CSV where + +import qualified Data.ByteString.Char8 as C +import qualified Data.List as L +import qualified Data.Map as M +import qualified Data.Set as S +import qualified Data.Text as T +import qualified Data.Text.Lazy as TL +import qualified Data.Text.Lazy.IO as TLIO +import qualified Data.Text.IO as TIO +import qualified Data.Vector as V +import qualified Data.Vector.Unboxed as VU +import qualified Data.Vector.Mutable as VM +import qualified Data.Vector.Unboxed.Mutable as VUM + +import Control.Applicative ((<$>), (<|>), (<*>), (<*), (*>), many) +import Control.Monad (forM_, zipWithM_, unless, when, void, replicateM_) +import Data.Attoparsec.Text +import Data.Char +import DataFrame.Internal.Column (Column(..), freezeColumn', writeColumn, columnLength) +import DataFrame.Internal.DataFrame (DataFrame(..)) +import DataFrame.Internal.Parsing +import DataFrame.Operations.Typing +import Data.Foldable (fold) +import Data.Function (on) +import Data.IORef +import Data.Maybe +import Data.Text.Encoding (decodeUtf8Lenient) +import Data.Type.Equality + ( TestEquality (testEquality), + type (:~:) (Refl) + ) +import GHC.IO.Handle (Handle) +import Prelude hiding (concat, takeWhile) +import System.IO +import Type.Reflection + +-- | Record for CSV read options. +data ReadOptions = ReadOptions { + hasHeader :: Bool, + inferTypes :: Bool, + safeRead :: Bool, + rowRange :: !(Maybe (Int, Int)), -- (start, length) + seekPos :: !(Maybe Integer), + totalRows :: !(Maybe Int), + leftOver :: !T.Text, + rowsRead :: !Int +} + +-- | By default we assume the file has a header, we infer the types on read +-- and we convert any rows with nullish objects into Maybe (safeRead). +defaultOptions :: ReadOptions +defaultOptions = ReadOptions { hasHeader = True, inferTypes = True, safeRead = True, rowRange = Nothing, seekPos = Nothing, totalRows = Nothing, leftOver = "", rowsRead = 0 } + +-- | Reads a CSV file from the given path. +-- Note this file stores intermediate temporary files +-- while converting the CSV from a row to a columnar format. +readCsv :: String -> IO DataFrame +readCsv path = fst <$> readSeparated ',' defaultOptions path + +-- | Reads a tab separated file from the given path. +-- Note this file stores intermediate temporary files +-- while converting the CSV from a row to a columnar format. +readTsv :: String -> IO DataFrame +readTsv path = fst <$> readSeparated '\t' defaultOptions path + +-- | Reads a character separated file into a dataframe using mutable vectors. +readSeparated :: Char -> ReadOptions -> String -> IO (DataFrame, (Integer, T.Text, Int)) +readSeparated c opts path = do + totalRows <- case totalRows opts of + Nothing -> countRows c path >>= \total -> if hasHeader opts then return (total - 1) else return total + Just n -> if hasHeader opts then return (n - 1) else return n + let (begin, len) = case rowRange opts of + Nothing -> (0, totalRows) + Just (start, len) -> (start, min len (totalRows - rowsRead opts)) + withFile path ReadMode $ \handle -> do + firstRow <- map T.strip . parseSep c <$> TIO.hGetLine handle + let columnNames = if hasHeader opts + then map (T.filter (/= '\"')) firstRow + else map (T.singleton . intToDigit) [0..(length firstRow - 1)] + -- If there was no header rewind the file cursor. + unless (hasHeader opts) $ hSeek handle AbsoluteSeek 0 + + currPos <- hTell handle + when (isJust $ seekPos opts) $ hSeek handle AbsoluteSeek (fromMaybe currPos (seekPos opts)) + + -- Initialize mutable vectors for each column + let numColumns = length columnNames + let numRows = len + -- Use this row to infer the types of the rest of the column. + -- TODO: this isn't robust but in so far as this is a guess anyway + -- it's probably fine. But we should probably sample n rows and pick + -- the most likely type from the sample. + -- dataRow <- map T.strip . parseSep c . (<>) (leftOver opts) <$> TIO.hGetLine handle + (!dataRow, !remainder) <- readSingleLine c (leftOver opts) handle + + -- This array will track the indices of all null values for each column. + -- If any exist then the column will be an optional type. + nullIndices <- VM.unsafeNew numColumns + VM.set nullIndices [] + mutableCols <- VM.unsafeNew numColumns + getInitialDataVectors numRows mutableCols dataRow + + -- Read rows into the mutable vectors + (!unconsumed, !r) <- fillColumns numRows c mutableCols nullIndices remainder handle + + -- Freeze the mutable vectors into immutable ones + nulls' <- V.unsafeFreeze nullIndices + cols <- V.mapM (freezeColumn mutableCols nulls' opts) (V.generate numColumns id) + pos <- hTell handle + + return (DataFrame { + columns = cols, + freeIndices = [], + columnIndices = M.fromList (zip columnNames [0..]), + dataframeDimensions = (maybe 0 columnLength (cols V.! 0), V.length cols) + }, (pos, unconsumed, r + 1)) +{-# INLINE readSeparated #-} + +getInitialDataVectors :: Int -> VM.IOVector Column -> [T.Text] -> IO () +getInitialDataVectors n mCol xs = do + forM_ (zip [0..] xs) $ \(i, x) -> do + col <- case inferValueType x of + "Int" -> MutableUnboxedColumn <$> ((VUM.unsafeNew n :: IO (VUM.IOVector Int)) >>= \c -> VUM.unsafeWrite c 0 (fromMaybe 0 $ readInt x) >> return c) + "Double" -> MutableUnboxedColumn <$> ((VUM.unsafeNew n :: IO (VUM.IOVector Double)) >>= \c -> VUM.unsafeWrite c 0 (fromMaybe 0 $ readDouble x) >> return c) + _ -> MutableBoxedColumn <$> ((VM.unsafeNew n :: IO (VM.IOVector T.Text)) >>= \c -> VM.unsafeWrite c 0 x >> return c) + VM.unsafeWrite mCol i col +{-# INLINE getInitialDataVectors #-} + +inferValueType :: T.Text -> T.Text +inferValueType s = let + example = s + in case readInt example of + Just _ -> "Int" + Nothing -> case readDouble example of + Just _ -> "Double" + Nothing -> "Other" +{-# INLINE inferValueType #-} + +readSingleLine :: Char -> T.Text -> Handle -> IO ([T.Text], T.Text) +readSingleLine c unused handle = parseWith (TIO.hGetChunk handle) (parseRow c) unused >>= \case + Fail unconsumed ctx er -> do + erpos <- hTell handle + fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " + <> show er <> "; context: " <> show ctx + Partial c -> do + fail "Partial handler is called" + Done (unconsumed :: T.Text) (row :: [T.Text]) -> do + return (row, unconsumed) + +-- | Reads rows from the handle and stores values in mutable vectors. +fillColumns :: Int -> Char -> VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> T.Text -> Handle -> IO (T.Text, Int) +fillColumns n c mutableCols nullIndices unused handle = do + input <- newIORef unused + rowsRead <- newIORef (0 :: Int) + forM_ [1..(n - 1)] $ \i -> do + isEOF <- hIsEOF handle + input' <- readIORef input + unless (isEOF && input' == mempty) $ do + parseWith (TIO.hGetChunk handle) (parseRow c) input' >>= \case + Fail unconsumed ctx er -> do + erpos <- hTell handle + fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " + <> show er <> "; context: " <> show ctx + Partial c -> do + fail "Partial handler is called" + Done (unconsumed :: T.Text) (row :: [T.Text]) -> do + writeIORef input unconsumed + modifyIORef rowsRead (+1) + zipWithM_ (writeValue mutableCols nullIndices i) [0..] row + l <- readIORef input + r <- readIORef rowsRead + pure (l, r) +{-# INLINE fillColumns #-} + +-- | Writes a value into the appropriate column, resizing the vector if necessary. +writeValue :: VM.IOVector Column -> VM.IOVector [(Int, T.Text)] -> Int -> Int -> T.Text -> IO () +writeValue mutableCols nullIndices count colIndex value = do + col <- VM.unsafeRead mutableCols colIndex + res <- writeColumn count value col + let modify value = VM.unsafeModify nullIndices ((count, value) :) colIndex + either modify (const (return ())) res +{-# INLINE writeValue #-} + +-- | Freezes a mutable vector into an immutable one, trimming it to the actual row count. +freezeColumn :: VM.IOVector Column -> V.Vector [(Int, T.Text)] -> ReadOptions -> Int -> IO (Maybe Column) +freezeColumn mutableCols nulls opts colIndex = do + col <- VM.unsafeRead mutableCols colIndex + Just <$> freezeColumn' (nulls V.! colIndex) col +{-# INLINE freezeColumn #-} + +parseSep :: Char -> T.Text -> [T.Text] +parseSep c s = either error id (parseOnly (record c) s) +{-# INLINE parseSep #-} + +record :: Char -> Parser [T.Text] +record c = + field c `sepBy1` char c + "record" +{-# INLINE record #-} + +parseRow :: Char -> Parser [T.Text] +parseRow c = (record c <* lineEnd) "record-new-line" + +field :: Char -> Parser T.Text +field c = + quotedField <|> unquotedField c + "field" +{-# INLINE field #-} + +unquotedTerminators :: Char -> S.Set Char +unquotedTerminators sep = S.fromList [sep, '\n', '\r', '"'] + +unquotedField :: Char -> Parser T.Text +unquotedField sep = + takeWhile (not . (`S.member` terminators)) "unquoted field" + where terminators = unquotedTerminators sep +{-# INLINE unquotedField #-} + +quotedField :: Parser T.Text +quotedField = char '"' *> contents <* char '"' "quoted field" + where + contents = fold <$> many (unquote <|> unescape) + where + unquote = takeWhile1 (notInClass "\"\\") + unescape = char '\\' *> do + T.singleton <$> do + char '\\' <|> char '"' +{-# INLINE quotedField #-} + +lineEnd :: Parser () +lineEnd = + (endOfLine <|> endOfInput) + "end of line" +{-# INLINE lineEnd #-} + +-- | First pass to count rows for exact allocation +countRows :: Char -> FilePath -> IO Int +countRows c path = withFile path ReadMode $! go 0 "" + where + go !n !input h = do + isEOF <- hIsEOF h + if isEOF && input == mempty + then pure n + else + parseWith (TIO.hGetChunk h) (parseRow c) input >>= \case + Fail unconsumed ctx er -> do + erpos <- hTell h + fail $ "Failed to parse CSV file around " <> show erpos <> " byte; due: " + <> show er <> "; context: " <> show ctx <> " " <> show unconsumed + Partial c -> do + fail $ "Partial handler is called; n = " <> show n + Done (unconsumed :: T.Text) _ -> + go (n + 1) unconsumed h +{-# INLINE countRows #-} + +writeCsv :: String -> DataFrame -> IO () +writeCsv = writeSeparated ',' + +writeSeparated :: Char -- ^ Separator + -> String -- ^ Path to write to + -> DataFrame + -> IO () +writeSeparated c filepath df = withFile filepath WriteMode $ \handle ->do + let (rows, columns) = dataframeDimensions df + let headers = map fst (L.sortBy (compare `on` snd) (M.toList (columnIndices df))) + TIO.hPutStrLn handle (T.intercalate ", " headers) + forM_ [0..(rows - 1)] $ \i -> do + let row = getRowAsText df i + TIO.hPutStrLn handle (T.intercalate ", " row) + +getRowAsText :: DataFrame -> Int -> [T.Text] +getRowAsText df i = V.ifoldr go [] (columns df) + where + indexMap = M.fromList (map (\(a, b) -> (b, a)) $ M.toList (columnIndices df)) + go k Nothing acc = acc + go k (Just (BoxedColumn (c :: V.Vector a))) acc = case c V.!? i of + Just e -> textRep : acc + where textRep = case testEquality (typeRep @a) (typeRep @T.Text) of + Just Refl -> e + Nothing -> case typeRep @a of + App t1 t2 -> case eqTypeRep t1 (typeRep @Maybe) of + Just HRefl -> case testEquality t2 (typeRep @T.Text) of + Just Refl -> fromMaybe "null" e + Nothing -> (fromOptional . (T.pack . show)) e + where fromOptional s + | T.isPrefixOf "Just " s = T.drop (T.length "Just ") s + | otherwise = "null" + Nothing -> (T.pack . show) e + _ -> (T.pack . show) e + Nothing -> + error $ + "Column " + ++ T.unpack (indexMap M.! k) + ++ " has less items than " + ++ "the other columns at index " + ++ show i + go k (Just (UnboxedColumn c)) acc = case c VU.!? i of + Just e -> T.pack (show e) : acc + Nothing -> + error $ + "Column " + ++ T.unpack (indexMap M.! k) + ++ " has less items than " + ++ "the other columns at index " + ++ show i + go k (Just (OptionalColumn (c :: V.Vector (Maybe a)))) acc = case c V.!? i of + Just e -> textRep : acc + where textRep = case testEquality (typeRep @a) (typeRep @T.Text) of + Just Refl -> fromMaybe "Nothing" e + Nothing -> (T.pack . show) e + Nothing -> + error $ + "Column " + ++ T.unpack (indexMap M.! k) + ++ " has less items than " + ++ "the other columns at index " + ++ show i diff --git a/src/DataFrame/Lazy/Internal/DataFrame.hs b/src/DataFrame/Lazy/Internal/DataFrame.hs index 64417c9a..9f1b1cb8 100644 --- a/src/DataFrame/Lazy/Internal/DataFrame.hs +++ b/src/DataFrame/Lazy/Internal/DataFrame.hs @@ -23,7 +23,7 @@ import qualified DataFrame.Operations.Core as D import DataFrame.Operations.Merge import qualified DataFrame.Operations.Subset as D import qualified DataFrame.Operations.Transformations as D -import qualified DataFrame.IO.CSV as D +import qualified DataFrame.Lazy.IO.CSV as D import System.FilePath data LazyOperation where