Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,42 @@
-- Useful Haskell extensions.
{-# LANGUAGE OverloadedStrings #-} -- Allow string literal to be interpreted as any other string type.
{-# LANGUAGE TypeApplications #-} -- Convenience syntax for specifiying the type `sum a b :: Int` vs `sum @Int a b'.
{-# LANGUAGE NumericUnderscores #-}

import qualified DataFrame as D -- import for general functionality.
import qualified DataFrame.Functions as F -- import for column expressions.

import DataFrame ((|>)) -- import chaining operator with unqualified.

import qualified Data.Vector.Unboxed as VU
import Control.Monad (replicateM)
import Data.Time
import System.Random.Stateful

main :: IO ()
main = do
df <- D.readTsv "./data/chipotle.tsv"
let quantity = F.col "quantity" :: D.Expr Int -- A typed reference to a column.
print (df
|> D.select ["item_name", "quantity"]
|> D.groupBy ["item_name"]
|> D.aggregate [ (F.sum quantity) `F.as` "sum_quantity"
, (F.mean quantity) `F.as` "mean_quantity"
, (F.maximum quantity) `F.as` "maximum_quantity"
]
|> D.sortBy D.Descending ["sum_quantity"]
|> D.take 10)
let n = 100_000_000
g <- newIOGenM =<< newStdGen
let range = (-20.0 :: Double, 20.0 :: Double)
startGeneration <- getCurrentTime
ns <- VU.replicateM n (uniformRM range g)
xs <- VU.replicateM n (uniformRM range g)
ys <- VU.replicateM n (uniformRM range g)
let df = D.fromUnnamedColumns (map D.fromUnboxedVector [ns, xs, ys])
endGeneration <- getCurrentTime
let generationTime = diffUTCTime endGeneration startGeneration
putStrLn $ "Data generation Time: " ++ (show generationTime)
startCalculation <- getCurrentTime
print $ D.mean "0" df
print $ D.variance "1" df
print $ D.correlation "1" "2" df
endCalculation <- getCurrentTime
let calculationTime = diffUTCTime endCalculation startCalculation
putStrLn $ "Calculation Time: " ++ (show calculationTime)
startFilter <- getCurrentTime
print $ D.filter "0" (>= (19.9 :: Double)) df D.|> D.take 10
endFilter <- getCurrentTime
let filterTime = diffUTCTime endFilter startFilter
putStrLn $ "Filter Time: " ++ (show filterTime)
let totalTime = diffUTCTime endFilter startGeneration
putStrLn $ "Total Time: " ++ (show totalTime)
1 change: 1 addition & 0 deletions dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ library
DataFrame.Display.Terminal.Colours,
DataFrame.Internal.DataFrame,
DataFrame.Internal.Row,
DataFrame.Internal.Schema,
DataFrame.Errors,
DataFrame.Operations.Core,
DataFrame.Operations.Join,
Expand Down
10 changes: 4 additions & 6 deletions src/DataFrame/Functions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import Language.Haskell.TH
import qualified Language.Haskell.TH.Syntax as TH
import qualified Data.Char as Char
import Debug.Trace (traceShow)
import Type.Reflection (typeRep)

col :: Columnable a => T.Text -> Expr a
col = Col
Expand Down Expand Up @@ -64,16 +65,13 @@ count :: Columnable a => Expr a -> Expr Int
count (Col name) = GeneralAggregate name "count" VG.length
count _ = error "Argument can only be a column reference not an unevaluated expression"

anyValue :: Columnable a => Expr a -> Expr a
anyValue (Col name) = ReductionAggregate name "anyValue" VG.head

minimum :: Columnable a => Expr a -> Expr a
minimum (Col name) = ReductionAggregate name "minimum" VG.minimum
minimum (Col name) = ReductionAggregate name "minimum" min

maximum :: Columnable a => Expr a -> Expr a
maximum (Col name) = ReductionAggregate name "maximum" VG.maximum
maximum (Col name) = ReductionAggregate name "maximum" max

sum :: (Columnable a, Num a, VU.Unbox a) => Expr a -> Expr a
sum :: forall a . (Columnable a, Num a, VU.Unbox a) => Expr a -> Expr a
sum (Col name) = NumericAggregate name "sum" VG.sum

mean :: (Columnable a, Num a, VU.Unbox a) => Expr a -> Expr Double
Expand Down
122 changes: 88 additions & 34 deletions src/DataFrame/Internal/Expression.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE BangPatterns #-}
module DataFrame.Internal.Expression where

import qualified Data.Map as M
Expand Down Expand Up @@ -51,10 +52,12 @@ data Expr a where
ReductionAggregate :: (Columnable a)
=> T.Text -- Column name
-> T.Text -- Operation name
-> (forall v a. (VG.Vector v a, Columnable a) => v a -> a)
-> (forall a . Columnable a => a -> a -> a)
-> Expr a
NumericAggregate :: (Columnable a,
Columnable b,
VU.Unbox a,
VU.Unbox b,
Num a,
Num b)
=> T.Text -- Column name
Expand Down Expand Up @@ -84,7 +87,7 @@ interpretAggregation :: forall a . (Columnable a) => GroupedDataFrame -> Expr a
interpretAggregation gdf (Lit value) = TColumn $ fromVector $ V.replicate (VG.length (offsets gdf) - 1) value
interpretAggregation gdf@(Grouped df names indices os) (Col name) = case getColumn name df of
Nothing -> throw $ ColumnNotFoundException name "" (map fst $ M.toList $ columnIndices df)
Just col -> TColumn $ atIndicesStable (VG.map (indices VG.!) (VG.init os)) col
Just col -> TColumn $ atIndicesStable (VG.map (indices `VG.unsafeIndex`) (VG.init os)) col
interpretAggregation gdf (Apply _ (f :: c -> d) expr) = let
(TColumn value) = interpretAggregation @c gdf expr
in case mapColumn f value of
Expand All @@ -100,59 +103,110 @@ interpretAggregation gdf@(Grouped df names indices os) (GeneralAggregate name op
Nothing -> throw $ ColumnNotFoundException name "" (map fst $ M.toList $ columnIndices df)
Just (BoxedColumn col) -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (V.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
)
)
Just (UnboxedColumn col) -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (VU.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
(\i -> f (V.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i))))
)
)
Just (UnboxedColumn col) -> case sUnbox @c of
SFalse -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (VU.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i))))
)
)
STrue -> TColumn $ fromUnboxedVector $
VU.generate (VG.length os - 1)
(\i -> f (VU.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i))))
)
)
Just (OptionalColumn col) -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (V.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
(\i -> f (V.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i))))
)
)
interpretAggregation gdf@(Grouped df names indices os) (ReductionAggregate name op (f :: forall v a. (VG.Vector v a, Columnable a) => v a -> a)) = case getColumn name df of
interpretAggregation gdf@(Grouped df names indices os) (ReductionAggregate name op (f :: forall a . Columnable a => a -> a -> a)) = case getColumn name df of
Nothing -> throw $ ColumnNotFoundException name "" (map fst $ M.toList $ columnIndices df)
Just (BoxedColumn col) -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (V.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
)
)
Just (UnboxedColumn col) -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (VU.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
)
)
VG.generate (VG.length os - 1) $ \g ->
let !start = os `VG.unsafeIndex` g
!end = os `VG.unsafeIndex` (g+1)
in go (col `VG.unsafeIndex` (indices `VG.unsafeIndex` start)) (start + 1) end
where
{-# INLINE go #-}
go !acc j e
| j == e = acc
| otherwise =
let !x = col `VG.unsafeIndex` (indices `VG.unsafeIndex` j)
in go (f acc x) (j + 1) e
Just (UnboxedColumn col) -> case sUnbox @a of
SFalse -> TColumn $ fromVector $
VG.generate (VG.length os - 1) $ \g ->
let !start = os `VG.unsafeIndex` g
!end = os `VG.unsafeIndex` (g+1)
in go (col `VG.unsafeIndex` (indices `VG.unsafeIndex` start)) (start + 1) end
where
{-# INLINE go #-}
go !acc j e
| j == e = acc
| otherwise =
let !x = col `VG.unsafeIndex` (indices `VG.unsafeIndex` j)
in go (f acc x) (j + 1) e
STrue -> TColumn $ fromVector $
VG.generate (VG.length os - 1) $ \g ->
let !start = os `VG.unsafeIndex` g
!end = os `VG.unsafeIndex` (g+1)
in go (col `VG.unsafeIndex` (indices `VG.unsafeIndex` start)) (start + 1) end
where
{-# INLINE go #-}
go !acc j e
| j == e = acc
| otherwise =
let !x = col `VG.unsafeIndex` (indices `VG.unsafeIndex` j)
in go (f acc x) (j + 1) e
Just (OptionalColumn col) -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (V.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
)
)
VG.generate (VG.length os - 1) $ \g ->
let !start = os `VG.unsafeIndex` g
!end = os `VG.unsafeIndex` (g+1)
in go (col `VG.unsafeIndex` (indices `VG.unsafeIndex` start)) (start + 1) end
where
{-# INLINE go #-}
go !acc j e
| j == e = acc
| otherwise =
let !x = col `VG.unsafeIndex` (indices `VG.unsafeIndex` j)
in go (f acc x) (j + 1) e
interpretAggregation gdf@(Grouped df names indices os) (NumericAggregate name op (f :: VU.Vector b -> c)) = case getColumn name df of
Nothing -> throw $ ColumnNotFoundException name "" (map fst $ M.toList $ columnIndices df)
Just (UnboxedColumn (col :: VU.Vector d)) -> case testEquality (typeRep @b) (typeRep @d) of
Nothing -> error $ "Cannot apply numeric aggregation to non-numeric column: " ++ (T.unpack name)
Nothing -> case testEquality (typeRep @d) (typeRep @Int) of
Just Refl -> case sUnbox @c of
SFalse -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (VU.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> fromIntegral (col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i)))))
)
)
STrue -> TColumn $ fromUnboxedVector $
VU.generate (VG.length os - 1)
(\i -> f (VU.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> fromIntegral (col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i)))))
)
)
Just Refl -> case sNumeric @d of
SFalse -> error $ "Cannot apply numeric aggregation to non-numeric column: " ++ (T.unpack name)
STrue -> case sUnbox @c of
SFalse -> TColumn $ fromVector $
V.generate (VG.length os - 1)
(\i -> f (VU.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
(\i -> f (VU.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i))))
)
)
STrue -> TColumn $ fromUnboxedVector $
VU.generate (VG.length os - 1)
(\i -> f (VU.generate (os VG.! (i + 1) - (os VG.! i))
(\j -> col VG.! (indices VG.! (j + (os VG.! i))))
(\i -> f (VU.generate (os `VG.unsafeIndex` (i + 1) - (os `VG.unsafeIndex` i))
(\j -> col `VG.unsafeIndex` (indices `VG.unsafeIndex` (j + (os `VG.unsafeIndex` i))))
)
)
_ -> error $ "Cannot apply numeric aggregation to non-numeric column: " ++ (T.unpack name)
Expand Down
33 changes: 33 additions & 0 deletions src/DataFrame/Internal/Schema.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
module DataFrame.Internal.Schema where

import qualified Data.Map as M
import qualified Data.Proxy as P
import qualified Data.Text as T

import DataFrame.Internal.Column
import Data.Maybe
import Data.Type.Equality (type (:~:)(Refl), TestEquality (..))
import Type.Reflection (typeRep)

data SchemaType where
SType :: Columnable a => P.Proxy a -> SchemaType

instance Show SchemaType where
show (SType (_ :: P.Proxy a)) = show (typeRep @a)

instance Eq SchemaType where
(==) (SType (_ :: P.Proxy a)) (SType (_ :: P.Proxy b)) = isJust (testEquality (typeRep @a) (typeRep @b))

schemaType :: forall a . Columnable a => SchemaType
schemaType = SType (P.Proxy @a)

data Schema = Schema {
elements :: M.Map T.Text SchemaType
} deriving (Show, Eq)
11 changes: 8 additions & 3 deletions src/DataFrame/Operations/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,18 @@ insertVectorWithDefault defaultValue name xs d =
values = xs V.++ V.replicate (rows - V.length xs) defaultValue
in insertColumn name (fromVector values) d

-- TODO: Add existence check in rename.
rename :: T.Text -> T.Text -> DataFrame -> DataFrame
rename orig new df = fromMaybe (throw $ ColumnNotFoundException orig "rename" (map fst $ M.toList $ columnIndices df)) $ do
rename orig new df = either throw id (renameSafe orig new df)

renameMany :: [(T.Text, T.Text)] -> DataFrame -> DataFrame
renameMany replacements df = fold (uncurry rename) replacements df

renameSafe :: T.Text -> T.Text -> DataFrame -> Either DataFrameException DataFrame
renameSafe orig new df = fromMaybe (Left $ ColumnNotFoundException orig "rename" (map fst $ M.toList $ columnIndices df)) $ do
columnIndex <- M.lookup orig (columnIndices df)
let origRemoved = M.delete orig (columnIndices df)
let newAdded = M.insert new columnIndex origRemoved
return df { columnIndices = newAdded }
return (Right df { columnIndices = newAdded })

-- | O(1) Get the number of elements in a given column.
columnSize :: T.Text -> DataFrame -> Maybe Int
Expand Down