Posts Tagged Python

Efficient Calculations on Named Columns of Realworld Data

In data analysis frameworks, it’s common to represent data as a two-dimensional rectangle, where each column is given a name.  Every named column contains values of the same type and every row corresponds to related datums.  I say “rectangle” instead of matrix because a matrix typically implies that every value has the same type.  Database tables have named and typed columns, but they typically only support a limited set of operations.  Of course there are special-purpose databases, like Kdb+, but they’re prohibitively expensive for most users.

Stata is one commercial program that supports a variety of operations on data rectangles.  In fact, Stata’s language is entirely built around the idea of having a single rectangle in memory, upon which you sort, filter, generate columns, or merge with rectangles that you previously saved to disk.  Each column has a name and a data type.  The rectangles also support missing values, which is essential when working with real world data.

Unfortunately, although Stata is a decent tool for playing with data while doing research, it’s horrible as a production tool.  Also, its language is so simple that it makes operations like loops or fill_forward a nightmare.

I want to perform high level rectangular operations within a more flexible language like Python.  I’m already comfortable wrangling data in numpy, and I’m a huge fan of numpy’s MaskedArray package.  Numpy supports named, typed columns via recarray, but recarray is generally incomplete.  For example, there’s no built in way to merge (join) two recarrays.  More importantly, recarray does not support masked values!

I have built my own my own module that replicates and expands upon most of Stata’s rectangular operations.  My implementation includes a class that stores data in a dictionary mapping column names to numpy’s masked arrays.  I can’t share the code because I wrote it at work.

I’m happy with my implementation, but I wish it were faster.  Although I’m faster than Stata, two key operations that are slower than I’d like are: 1) merge (join) and 2) fill-forward.  Both of these do too much work outside of optimized space of numpy.  I’m thinking of rewriting these functions in C or Cython.

I’d love to hear comments about this general data structure.  How are other people dealing with rectangles of real world data?

Comments (5)

Automated Fantasy Pro Football Pickem Weights

I’m participating in the Yahoo! Sports Fantasy Pro Football Pickem with some family and friends.   I don’t regularly play fantasy sports, but I was coaxed into donating money to the pool during a family trip this summer.  I blame the beer and port.

For the uninitiated, each week you must pick a winner in all 16 games, assigning a confidence weight from 1 to 16 to each game.  If your pick wins, then you are awarded points equal to the confidence weight that you assigned to that game.

Now that my money’s in, I want to make a solid showing.  My strategy is to leverage the gambling lines, mapping the point spreads and over under lines to my picks and weights.

I threw together a little Python script to parse the Yahoo odds web page.  For each game, the script averages the point spreads and over under lines.  The favored team is selected, and the picks are ranked according to point spread, with wider spreads getting a larger weight.  Ties on point spread are broken by the over under line, with a larger over under line mapping to a larger weight.

Now I’m wondering how to improve this simple algorithm.  I might break ties by giving a smaller over under a higher weight.  The reasoning would be that smaller over under lines will have less variance.  Surely there are other ways to improve the rankings.

Here’s the table that my little script prints out.

weight | winner                         loser                     spread    o/u
================================================================================
    16 | Minnesota Vikings         over Detroit Lions              -9.83  45.17
    15 | Washington Redskins       over St. Louis Rams             -9.58  37.00
    14 | Green Bay Packers         over Cincinnati Bengals         -9.08  42.00
    13 | Tennessee Titans          over Houston Texans             -6.50  40.92
    12 | Atlanta Falcons           over Carolina Panthers          -6.08  42.50
    11 | Buffalo Bills             over Tampa Bay Buccaneers       -4.42  42.00
    10 | New England Patriots      over New York Jets              -3.42  46.17
     9 | Jacksonville Jaguars      over Arizona Cardinals          -3.08  42.50
     8 | Indianapolis Colts        over Miami Dolphins             -3.08  42.17
     7 | San Diego Chargers        over Baltimore Ravens           -3.00  40.33
     6 | Denver Broncos            over Cleveland Browns           -3.00  38.83
     5 | Kansas City Chiefs        over Oakland Raiders            -3.00  38.50
     4 | Pittsburgh Steelers       over Chicago Bears              -2.92  37.50
     3 | Dallas Cowboys            over New York Giants            -2.75  45.00
     2 | San Francisco 49ers       over Seattle Seahawks           -1.10  39.58
     1 | New Orleans Saints        over Philadelphia Eagles        -1.00  46.08

Leave a Comment

Pythonic Data Analysis with MaskedArray and Timeseries

In the financial world, most quants analyze time series data using languages such as Matlab, R, SAS, or Stata.  I’ve used those tools, but I’m much happier working with a more general purpose language.  Most recently, I’ve been writing most of my code in Python.  Although people have implemented interfaces from Python to R and other numerical libraries, I prefer to avoid hopping in and out of Python.  Fortunately, Python, NumPy, and SciPy provide an expressive, flexible, and efficient platform for analyzing data.

Regardless of what programming platform you choose, the biggest challenge is wrangling real world data into the platform’s data structures so that you can take advantage of their high level operators.  When you’re wrangling, you’re typically confronted with a Procrustean bed that forces you to crudely fit your real world inputs to the pristine shape and view of your data structure.  This painful process unfortunately risks generating incorrect and misleading results.

In the case of python’s numpy, you need to fit your data into numpy arrays.  Recently I’ve needed to analyze a lot of real world time series data, so I’ve been exploring two important extensions to numpy: masked arrays and the scikits timeseries.  I want to share my experiences and show some code.

I’m coding in Python 2.5.2, using numpy 1.3.0.dev6370, scipy 0.7.0, and scikits.timeseries 0.67.0.dev-r1480.  I’ve run my code both on Windows and OS X 10.5.  A single file containing all of the example code is located on github here.  Everything that I’m writing related to this post is accessible via git://github.com/nodogbite/maskedarray_timeseries.git or by browsing here.

Real World Example

To ground this discussion, let’s consider a purely random dataset that approximates some simple real world daily financial data.  The function ‘generateFakeStockData’ pumps out roughly 8 years of daily return and trading volume data for a universe of 2000 tickers.  Using this data, we’ll calculate:

  • the volume-weighted returns for a given basket of stocks, and
  • the minimum, maximum, and average daily daily returns or trading volumes.

Starting at the top of the class hierarchy, consider the basic numpy array.  Fundamentally, a numpy array is a data structure that stores a multi-dimensional collection of homogeneous data.  Once data is in a numpy array, it’s easy to crunch it using either regular python code or optimized functions from the numpy and scipy libraries.

For our toy example, it’s tempting to tap into those capabilities by loading the data into numpy arrays.  We could put the daily returns into a two-dimensional array of floats, while putting the daily trading volume into an array of integers.  In both numpy arrays, the columns contain data for a particular ticker, and the rows contain data for a particular day.  Then, as the figure shows, we could compute a vector of total returns by writing ‘NP.prod(dailyReturns + 1, axis=0) – 1′.

numpyarray_stockdata1
Unfortunately, our data, and most real world data, aren’t perfectly aligned, and are interspersed with missing values.  These gaps aren’t easy to represent in the numpy array.  As a hack, we could insert a conspicuous flag value in place of missing values, and then derive a perfectly clean array or set of arrays every time we want to use a numpy operator.  This is both inefficient and error prone.  We won’t bother doing that with our example daily financial data.

Masked Arrays

The numpy.ma.MaskedArray, a subclass of the numpy array, was built for this situation.  Conceptually, a masked array is a numpy array coupled with a second array of booleans that has the same shape.  The first array is called the ‘data’, and the boolean array is called the ‘mask’.  The MaskedArray blanks out the data array value at every position where the mask array is ‘True’.

maskedarray_stockdata1
The MaskedArray redefines most of the numpy array’s functions so that it handles missing data just as you expect.  For example, sum and product simply ignore the masked entries.  Since MaskedArray is a subclass of ndarray, the switch is fairly seamless.  There are also a bunch of new capabilities, as well as some potential pitfalls, which I’ll point out.

Returning to our example, we write a function that loads data from ‘generateFakeStockData’ into a couple of masked arrays.  Let’s name the function ‘loadDataIntoMaskedArrays’.  This function is fairly representative of most functions that I’ve written to build masked arrays when I’m processing CSV files.  (Usually I’m a bit more sophisticated so that I can process any number of columns, instead of hard-coding information about the columns, as I’ve done here.)  As we read each datum, we record both its value and a boolean that will become its mask.  Finally, we pass both the data and the mask to the MaskedArray constructor.

Once the data is in the MaskedArray, we can use functions from its class and module to produce results that appropriately skip the missing data.  Similar to the numpy array, we write ‘MA.product(dailyReturns + 1, axis=0) – 1′ to generate a vector of total returns for every ticker.  Here we see a minor blemish in the API: the numpy package has both a ‘prod’ and ‘product’ function, but the numpy.ma package only has a ‘product’ function.

Timeseries

So far, we’ve only used the dates as a groupby key.  The ’scikits.timeseries’ package enables us to conveniently couple a list of dates to our masked array.  Doing so will help us align data for different tickers, as well as produce subsets for a given date range.  There’s some nice documentation for the timeseries module here, but let’s write some timeseries code for our example.

Let’s call the function ‘loadDataIntoTimeseries’, which uses a helper function ‘makeTimeseriesGrid’.  This version of the loading function doesn’t have the bit of logic to fill in masked values for tickers that are missing from the initial dates.  Instead, we simply keep lists of observed dates, datums, and masks for every ticker.  Then we use functions from the timeseries package to properly align the rows and fill in missing values.

Creating a list of timeseries Date objects is the first step when constructing a timeseries object.  Every Date object has a frequency.  In this case we use the ‘B’ frequency, which stands for business days, or Monday through Friday.  Then, using that list of Date objects, you must create a DateArray object, which also has a frequency that’s equal to the frequency of the dates.  The time_series constructor also takes in a MaskedArray that has the same length as the DateArray.

The Date object has some useful functionality, such as when you add 1 to a Friday Date with business frequency, you get the subsequent Monday.  One feature that’s potentially harmful is that if you construct a business frequency Date using a weekend, you’ll get back a Date for the subsequent Monday.  I think a thrown exception would be more pythonic.

Let’s look at the function ‘makeTimeseriesGrid’.  Here we build up a timeseries for every ticker, align them to a common date list, and finally stack all of the data together into a single two-dimensional grid.  Note that I call ‘fill_missing_dates’ on each ticker timeseries.  This fills in all the missing dates in the DateArray and also inserts masked rows into the timeseries data at the corresponding locations.  I wish this were the default behavior because it leverages the principle point of MaskedArrays.  I think calling it is the best practice.

Each of the timeseries are not necessarily aligned on the same date list.  The timeseries package contains a function ‘aligned’, which behaves a bit like ‘fill_missing_dates’, to create a list of timeseries objects that contain the exact same dates.  Its signature is a bit awkward because it takes its input as a variable length argument list.

Finally, we use the numpy.ma.column_stack function to build a two-dimensional array.  Note that we access the ’series’ property of each timeseries object, which returns its masked array.  The timeseries object also has a ‘data’ property which returns a plain, un-masked numpy array.  I wish ‘data’ returned the masked array because ’series’ is a confusing name and I think a masked array should be the default.

To Be Continued

This post is pushing “too long didn’t read” length.  I will defer benchmarks and additional examples of using the constructed timeseries object to a subsequent post.

Comments (6)

Manage your Django Transactions

I’m building a Django application at work that performs several thousand inserts.  I was frustrated when it was taking several minutes to populate the database.  For reference, I’m running on a Windows XP box with a 2GHz Xeon, 4GB of RAM, and I’m using SQLite as the database.  In one straightforward piece of the program, it was taking over 3 minutes to insert 5000 rows.

Since I’m new to Django, I wasn’t immediately aware that, by default, every call to an ORM object’s save() results in a separate transaction.  When you’re doing thousands of inserts, the overhead is unbearable.  So after reading this, this, and this, I realized that the @transaction.commit_on_success decorator would allow me to lump a collection of saves into a single transaction.

from django.db import transaction

@transaction.commit_on_success
def myfunction(request):
    for i in range(5000):
        myobj = MyObject(mynumber=i)
        myobj.save()

After I made this change, the 5000 inserts took roughly 15 seconds instead of over 3 minutes.

Comments (2)

Hack for functools.partial and multiprocessing

Today I wrote some python code that mapped a function onto a sequence of financial time series data.  For the first version, I used itertools.imap to apply a partial function created with functools.partial to the sequence.  I needed a partial function with curried arguments because a couple of the inputs were created and fixed before the call to itertools.imap.

The program took 20 minutes to run because the dataset is rather large.  Since it’s an embarassingly parallel problem, I decided to give the multiprocessing package a whirl.  I’ve used multiprocessing’s parallel map functions before, so I naively created a worker Pool with 4 proceses and changed itertools.imap to pool.imap.  UnfortunatelyI was smacked with this error: “TypeError: type ‘partial’ takes at least one argument.”

It seems that the functools.partial object isn’t pickeled across the multiprocessing workers.  I worked around this by using itertools.izip, combined with itertools.repeat.  The itertools.repeat function mimics currying by repeating the fixed argument.  I’ve pasted a simple example of this below, and I’ve also put it here.  If anybody has a better way of doing this, please let me know.

import functools
import itertools
import multiprocessing

def uncurryDummyMultiply(t):
    return dummyMultiply(t[0], t[1])

def dummyMultiply(a, b):
    return a * b

def serialMap():
    mathFun = functools.partial(dummyMultiply, 10)
    total = 0
    for x in itertools.imap(mathFun, xrange(1000)):
        total += x
    return total

def parallelMap_Error():
    """Generates an Error!
    TypeError: type 'partial' takes at least one argument
    """
    mathFun = functools.partial(dummyMultiply, 10)
    pool    = multiprocessing.Pool(processes=4)
    total = 0
    for x in pool.imap(mathFun, xrange(1000)):
        total += x
    return total

def parallelMap_NoError():
    """Parallel version of serialMap that doesn't generate a TypeError
    """
    pool    = multiprocessing.Pool(processes=4)
    total = 0
    for x in pool.imap(uncurryDummyMultiply,
                       itertools.izip(itertools.repeat(10),
                                      xrange(1000)),
                       chunksize=50):
        total += x
    return total

if __name__ == "__main__":
    print "serialMap result:   %d" % serialMap()
    print "parallelMap result: %d" % parallelMap_NoError()

Leave a Comment

Simple Python code for Amazon EC2

Back in October I started playing with Amazon’s elastic compute cloud.  My basic goal was to do some distributed computing with Python.  Thanks to Amazon’s documentation and helpful writeups by some early adopters, I was able to get my environment setup and to get my head around the various command line incantations.  I quickly grew tired of repeatedly having to input the image and instance IDs on the command line.

I wanted a pythonic way of starting up instances, querying their status, and doing simple things like running a command on all of my instances.  I figured a pythonic EC2 framework would make it easier for me to interface with whatever distributed computing framework I chose.  Also, I like python.

I wrote a class, EC2Controller, that uses Amazon’s Ec2.py module to simplify several common operations.  I’ve posted the code to a Google Code project, ec2python, which can be found at http://code.google.com/p/ec2python/.  I suspect some or all of the functionality is implemented in the boto project, but I haven’t taken the time to investigate.

Here’s an example of how I use EC2Controller from ipython:

In [1]: import EC2Controller as EC2

In [2]: ec2 = EC2.EC2Controller(cluster_size=2, verbose=True)

In [3]: ec2.getInstances()
Out[3]: []

In [4]: ec2.startCluster()
startCluster.  Query for the current instances.
Starting 2 instances...

In [5]: ec2.getInstances()
Out[5]:
[
EC2Instance( instance ID: i-d73389be
             state:       pending
             image:       ami-ee2eca87
             public  DNS:
             private DNS:  ),

EC2Instance( instance ID: i-d63389bf
             state:       pending
             image:       ami-ee2eca87
             public  DNS:
             private DNS:  )]

In [6]: ec2.waitUntilAllAreRunning()
Out[6]: True

In [7]: ec2.getInstances()
Out[7]:
[
EC2Instance( instance ID: i-d73389be
             state:       running
             image:       ami-ee2eca87
             public  DNS: ec2-75-101-212-146.compute-1.amazonaws.com
             private DNS: domU-12-31-39-00-54-E1.compute-1.internal ),

EC2Instance( instance ID: i-d63389bf
             state:       running
             image:       ami-ee2eca87
             public  DNS: ec2-174-129-150-216.compute-1.amazonaws.com
             private DNS: domU-12-31-39-00-B0-67.compute-1.internal )]

In [8]: ec2.stopCluster()
stopCluster.  Query for the current instances.
Terminate these instances: '['i-d73389be', 'i-d63389bf']'

I hope somebody finds the code useful.

Comments (2)

Lessons Learned: Parsing CSV in Haskell and Python

One of my best friends read my CSV post and said “So, you wrote a program that was a little slow, and then you wrote another one that was faster. That’s great.” Hard to disagree.

Over the past couple days, thousands of people have read my last two posts, and tens of people have made comments and offered advice. Thanks to Slarba, I now have a Haskell program that runs in about 1 second (which happens to be about 0.7 seconds faster than my Python program, although I haven’t tried to optimize the Python program at all).

Here’s what I learned in this odyssey:

  1. If you’re reading in a large text file, use Haskell’s ByteString instead of String. (thanks to Don Stewart, aka dons, for writing that module).
  2. To clean up your code that uses sorts, make your custom data type an instance of Ord instead of using the sortBy function.
  3. I had never used Haskell’s foreign function interface before. It’s amazing how simple it is to pull in a C function. It seems like you just need to abide by a few idioms, such as the NOINLINE pragma and the unsafePerformIO function.
  4. Clearly this is all a lot of effort and optimization simply to read in some basic CSV lines in Haskell. Hopefully the ASCII string parsing libraries will improve in time.
  5. The common optimization compiler flag ‘-O2′ is not the default. Several people suggested adding that flag, although I never noticed a speedup from it for this program.
  6. The Parsec module is very neat and clever, but its performance is hampered by Haskell’s String type.

For completeness, here’s the final version of the code.

> {-# OPTIONS -fffi -funbox-strict-fields #-}
> import qualified Data.ByteString.Char8 as BS
> import Data.ByteString.Unsafe (unsafeUseAsCString)
> import System
> import Data.List
> import Maybe
> import CForeign
> import Foreign.Ptr
> import System.IO.Unsafe

> data Row = Row {
>       ticker :: !BS.ByteString,
>       bid    :: !Double,
>       ask    :: !Double,
>       day    :: !BS.ByteString,
>       time   :: !BS.ByteString
>       }
> instance Eq Row where
>     a == b  = ticker a == ticker b && time a == time b
> instance Ord Row where
>     compare a b = case compare (ticker a) (ticker b) of
>                     EQ -> compare (time a) (time b)
>                     e  -> e

since ByteString library doesn't seem to have strtod-like function, we can
peek into the ByteString using useAsCString and calling C strtod on it.

> foreign import ccall "stdlib.h strtod" cstrtod :: CString->CString->IO CDouble
> {-# NOINLINE strtod #-}
> strtod :: BS.ByteString -> Double
> strtod bs = realToFrac . unsafePerformIO .
>             unsafeUseAsCString bs $ \cs -> cstrtod cs nullPtr

> data Result = Result {
>       tickerR :: !BS.ByteString,
>       openR   :: !Double,
>       highR   :: !Double,
>       lowR    :: !Double,
>       closeR  :: !Double,
>       dayR    :: !BS.ByteString
>       }
> instance Show Result where
>     show (Result ticker open high low close day) =
>         intercalate "," [BS.unpack ticker, show open, show high,
>                          show low, show close, BS.unpack day] ++ "\n"

> main :: IO ()
> main = do
>   (csvFile:outputFile:_) <- getArgs
>   rows <- doReadData csvFile
>   writeFile outputFile . concatMap show . computeResults $ groupToDays rows

> computeResults :: [[Row]] -> [Result]
> computeResults = map makeResult
>  where makeResult d = let openRow  = head d
>                           closeRow = last d
>                           highRow  = maximumBy comparePrice d
>                           lowRow   = minimumBy comparePrice d
>                       in Result (ticker openRow) (price openRow) (price highRow)
>                              (price lowRow) (price closeRow) (day openRow)
>        price r = (bid r + ask r) / 2
>        comparePrice a b = compare (price a) (price b)

> groupToDays :: [Row] -> [[Row]]
> groupToDays = concatMap (groupBy tickerDay) . groupBy tickerName
>  where tickerName a b = ticker a == ticker b
>        tickerDay a b  = day a == day b

> doReadData :: FilePath -> IO [Row]
> doReadData file = do
>     contents <- BS.readFile file
>     return . sort . mapMaybe parseRow . BS.lines $ contents

> parseRow :: BS.ByteString -> Maybe Row
> parseRow row = case BS.split ',' row of
>                  [ticker_,bid_,ask_,_,day_,time_] ->
>                      let (bidD, askD) = (strtod bid_, strtod ask_)
>                      in if (bidD == 0 || askD == 0)
>                         then Nothing
>                         else Just $ Row ticker_ bidD askD day_ time_
>                  _ -> Nothing

Thanks again to all the people who helped out. It’s humbling to think about other hackers spending their time commenting and optimizing my code.

Comments (11)

Followup: CSV Parsing in Haskell and Python

Tonight I tried improving the Haskell version of my CSV data analysis program. None of the changes made the Haskell program perform as well as the Python program. Still, I thought I’d share the results.

I first tried using the Parsec code discussed in the Real World Haskell book. The authors boil the CSV parsing down to 5 lines, which I incorporated into my program as:

> doReadDataParsec file = do
>     chars <- readFile file
>     let charRows = case parse csvFile "(unknown)" chars of
>                      Right r   -> r
>                      otherwise -> [[]]
>     let sortByTicker = sortBy $ comparing head `mappend` comparing (!!5)
>     let strRows = sortByTicker $ filter (\r -> (length r) > 0) charRows
>     return $ catMaybes $ map makeRowFromStrs strRows
>   where csvFile = endBy line eol
>         line    = sepBy cell (char ',')
>         cell    = many (noneOf ",\n")
>         eol     = char '\n'

I use the standard readFile function here, which reads the entire file in as a String, instead of a ByteString. The String is then parsed using Parsec. Using this function, the program executes in about 10 seconds. That’s up from about 7.5 seconds with my first implementation.

Next I tried to avoid the String type. In this version, I only unpack the ByteString when I need to convert the bid and ask to a Double. I then pack the Double back to a ByteString when the result is written to file. Everything else is left as a ByteString (except for the data types that I create — I’m not sure if that’s a problem). This approach shaves off a little over 3 seconds, executing in about 4.3 seconds.

Here is the complete program:

> import qualified Data.ByteString.Char8   as BS
> import Data.Ord
> import Data.Maybe
> import Data.Monoid
> import List
> import System.Environment

TICKER1,33,35,NULL,2007-09-28 00:00:00,2007-09-28 16:32:00
TICKER2,29.5,31,NULL,2007-03-07 00:00:00,2007-03-07 07:33:00
TICKER3,29.5,31,NULL,2007-03-07 00:00:00,2007-03-07 07:33:00

> data RowBytes = RowBytes {tickerB :: BS.ByteString,
>                           bidB :: Double,
>                           askB :: Double,
>                           dayB :: BS.ByteString,
>                           timeB :: BS.ByteString} deriving (Read, Show)
> data Result = Result {tickerR :: BS.ByteString,
>                       openR :: Double,
>                       highR :: Double,
>                       lowR :: Double,
>                       closeR :: Double,
>                       dayR :: BS.ByteString} deriving (Read, Show)

> main :: IO ()
> main = do
>   (csvFile:outputFile:_) <- getArgs
>   rows <- doReadData csvFile
>   let tickerGroups = groupBy (\a b -> (tickerB a) == (tickerB b)) rows
>   let oneTicker rs = map oneDay $ groupBy (\a b -> (dayB a) == (dayB b)) rs
>   let results = concat $ map oneTicker tickerGroups
>   let bytes r = [tickerR r,
>                  BS.pack $ show $ openR r,
>                  BS.pack $ show $ highR r,
>                  BS.pack $ show $ lowR r,
>                  BS.pack $ show $ closeR r,
>                  dayR r]
>   let pprow r = BS.concat $ intersperse (BS.singleton ',') $ bytes r
>   let txt = BS.unlines $ map pprow results
>   BS.writeFile outputFile txt
>   putStrLn "done"

create a Result row that encapsulates the open/high/low/close for
a single ticker on a single day

> oneDay rows =
>   let price r  = ((bidB r) + (askB r)) / 2
>       openRow  = head rows
>       closeRow = last rows
>       highRow  = maximumBy (\a b -> compare (price a) (price b)) rows
>       lowRow   = minimumBy (\a b -> compare (price a) (price b)) rows
>   in (Result (tickerB openRow)
>              (price openRow)
>              (price highRow)
>              (price lowRow)
>              (price closeRow)
>              (dayB openRow))

> maybeRead s = case reads s of
>                 [ (x, "")] -> Just (x::Double)
>                 _          -> Nothing

Convert a list of strings into a [Row]
We need to convert two of the strings to Doubles.

> makeRow r =
>   case (maybeRead (BS.unpack $ r !! 1), maybeRead (BS.unpack $ r !! 2)) of
>     ((Just a), (Just b)) -> Just (RowBytes (r !! 0) a b (r !! 4) (r !! 5))
>     _                    -> Nothing

Read in the CSV file, returning a [Row], sorted by ticker and day

> doReadData file = do
>   bytes <- BS.readFile file
>   let byteRows = map (BS.split ',') $ BS.lines bytes
>   let sortByTicker   = sortBy $ comparing head `mappend` comparing (!!5)
>   let sortedByteRows = sortByTicker $ filter (\r -> (length r) > 0) byteRows
>   return $ catMaybes $ map makeRow sortedByteRows

Finally, I tried adding the -O2 ghc compile flag, but that did not yield a speedup.

I’m not sure what to try next. Thanks to everybody who commented to my last post, both directly on this blog and on the Reddit thread.

Note that in the comments below, I’ve linked to a version of the Haskell code that runs in 1 second.  Thanks to Slarba for posting the solution, which I only modified slightly.

Comments (14)

CSV Parsing: Haskell versus Python

I recently wrote a fairly simple program to parse a CSV file containing some daily intraday prices for a number of tickers. The CSV file had roughly 160,000 lines, with each line containing a ticker, bid, ask, and timestamp. The goal is to produce a daily open, high, low, and close for each ticker.

First I wrote the program in Haskell using the Text.CSV library. It performed horrendously (I didn’t even bother waiting for the program to finish executing as I heard the CPU fan spin up to max). After chatting with ‘dons’ and others on #haskell, I rewrote the program using the Data.ByteString library. Unfortunately, it took about 7.5 seconds to process the input and produce a result.

Curious, I wrote an equivalent program in python. The python program finishes in about 1.7 seconds on my MacBook. That’s 1.7 seconds for python and 7.5 seconds for Haskell — not so good.

None of the superstars on #haskell had any immediate suggestions for what’s making Haskell perform poorly, but most suggestions pointed to my unpacking of the ByteStrings to Strings. I’ll investigate further if I have the time, and post the findings here.

Here is my Haskell code (http://hpaste.org/8928):

> import qualified Data.ByteString.Char8   as BS
> import Data.Maybe
> import Data.List
> import List
> import System.Environment
> import System.IO

TICKER1,33,35,NULL,2007-09-28 00:00:00,2007-09-28 16:32:00
TICKER2,29.5,31,NULL,2007-03-07 00:00:00,2007-03-07 07:33:00
TICKER3,29.5,31,NULL,2007-03-07 00:00:00,2007-03-07 07:33:00

> data Row = Row {ticker :: String,
>                 bid :: Double,
>                 ask :: Double,
>                 day :: String,
>                 time :: String} deriving (Read, Show)

> data Result = Result {tickerR :: String,
>                       openR :: Double,
>                       highR :: Double,
>                       lowR :: Double,
>                       closeR :: Double,
>                       dayR :: String} deriving (Read, Show)

> main :: IO ()
> main = do
>   (csvFile:outputFile:_) <- getArgs
>   rows <- doReadData csvFile
>   let tickerGroups = groupBy (\a b -> (ticker a) == (ticker b)) rows
>   let oneTicker rs = map oneDay $ groupBy (\a b -> (day a) == (day b)) rs
>   let results = concat $ map oneTicker tickerGroups
>   let pprow r = concat $ intersperse "," $ [tickerR r,
>                                             show $ openR r,
>                                             show $ highR r,
>                                             show $ lowR r,
>                                             show $ closeR r,
>                                             dayR r]
>   let txt = unlines $ map pprow results
>   writeFile outputFile txt
>   putStrLn "done"

create a Result row that encapsulates the open/high/low/close for
a single ticker on a single day

> oneDay rows =
>   let price r  = ((bid r) + (ask r)) / 2
>       openRow  = head rows
>       closeRow = last rows
>       highRow  = maximumBy (\a b -> compare (price a) (price b)) rows
>       lowRow   = minimumBy (\a b -> compare (price a) (price b)) rows
>   in (Result (ticker openRow)
>              (price openRow)
>              (price highRow)
>              (price lowRow)
>              (price closeRow)
>              (day openRow))

> maybeRead s = case reads s of
>                 [ (x, "")] -> Just (x::Double)
>                 _          -> Nothing

Convert a list of strings into a [Row]
We need to convert two of the strings to Doubles.

> makeRow r =
>   case (maybeRead (r !! 1), maybeRead (r !! 2)) of
>     ((Just a), (Just b)) -> Just (Row (r !! 0) a b (r !! 4) (r !! 5))
>     _                    -> Nothing

Read in the CSV file, returning a [Row], sorted by ticker and day

> doReadData file = do
>   bytes <- BS.readFile file
>   let byteRows = map (BS.split ',') $ BS.lines bytes
>   let sortByTicker   = sortBy (\a b -> case compare (head a) (head b) of
>                                          EQ -> compare (a !! 5) (b !! 5)
>                                          o  -> o)
>   let sortedByteRows = sortByTicker $ filter (\r -> (length r) > 0) byteRows
>   let strRows = map (\r -> map BS.unpack r) sortedByteRows
>   return $ catMaybes $ map makeRow strRows

And here is the python code:

import itertools as IT

def rowCompare(x,y):
    """comparison used for sorting our rows.
    Sort by ticker and then by time."""
    if cmp(x[0], y[0]) == 0:
        return cmp(x[5], y[5])
    else:
        return cmp(x[0], y[0])

def price(r):
    """Take the average of the bid and ask prices."""
    return (float(r[1]) + float(r[2])) / 2.0

# read in the data from a CSV file
f = open('IG_US.csv', 'r')
lines = sorted([line.strip().split(',') for line in f], rowCompare)

# create daily open/high/low/close for each ticker
results = []
for ticker, tickerIter in IT.groupby(lines, lambda x: x[0]):
    for day, dayIter in IT.groupby(tickerIter, lambda x: x[4]):
        dayRows = [d for d in dayIter if d[1] != "NULL" and d[2] != "NULL"]
        if not dayRows:
            continue
        highRow = dayRows[0]
        lowRow = dayRows[0]
        for row in dayRows:
            highRow = row if price(row) > highRow else highRow
            lowRow = row if price(row) < lowRow else lowRow
        results.append([ticker,
                        "%0.2f" % price(dayRows[0]),
                        "%0.2f" % price(highRow),
                        "%0.2f" % price(lowRow),
                        "%0.2f" % price(dayRows[-1]),
                        day])

# write out the result to a file
fw = open("./IG_US_python_output.csv", 'w')
fw.write("\n".join([",".join([str(c) for c in l]) for l in results]))

Please note that I’ve posted a followup here.

Comments (17)

Improved Labix dateutil.rruleset

I’ve been using Labix’s python dateutil.rruleset object in a program to manage collections of recurring dates, represented by dateutil.rrule objects. I discovered that the rruleset does not work properly if you remove a date from the set and then subsequently add the date back in. Using the standard implementation (actually, I was using Brian Beck’s heap-based re-implementation) of dateutil.rrulset, once a date is marked as excluded (either with exrule or exdate), it cannot be included.

As a simple example involving a single date, you can see the following in ipython:

In [1]: import dateutil.rrule as R
In [2]: import datetime as DT
In [3]: s = R.rruleset()
In [4]: s.exdate(DT.datetime(2000,1,1))
In [5]: s.rdate(DT.datetime(2000,1,1))
In [6]: list(s)
Out[6]: []

The correct output in the above example, as I see it, is the single date that we added on the 5th line. To achieve this behavior I completely rewrote the rruleset class within the dateutil.rrule.py file of the distribution. I then reinstalled the modified dateutil.rrule package using setup.py.

The new code is below. To use it yourself, simple cut out the existing rruleset class code and paste in this code.

class rruleset(rrulebase):

    class _genrule:
        """keep a iterator that we step through, holding at each
        value (as the current 'min') until we advance."""
        def __init__(self, op, rule):
            self.op   = op
            self.rule = rule
            self.reset()

        def reset(self):
            """reset the iterator and clear out the current minimum."""
            self._min  = None
            self.iter = iter(self.rule)

        def copyAndReset(self):
            """so that multiple iterators can step through the rules concurrently."""
            c = rruleset._genrule(self.op, self.rule)
            c.reset()
            return c

        def advance(self):
            """mark the minimum as consumed so we can move to the next one."""
            self._min = None

        def _getMin(self):
            """Either get the cached min or produce a new min."""
            if self._min is not None:
                return self._min
            else:
                self._min = self.iter.next()
                return self._min

        def __repr__(self):
            return "genrule op: %s _min: %s" % (self.op, self._min)

        min = property(_getMin)

    class _allrules:
        """keep a list of genrule objects that we'll iterate through."""
        def __init__(self):
            self.rules = []

        def append(self, genrule):
            """add a new rules to our list of rules"""
            self.rules.append(genrule)
            self.reset()

        def reset(self):
            """reset every rule in the list"""
            for r in self.rules:
                r.reset()

        def copyAndReset(self):
            """create a copy of our list of rules and reset them all"""
            c = rruleset._allrules()
            c.rules = [r.copyAndReset() for r in self.rules]
            c.reset()
            return c

        def advanceAllWithMin(self, min):
            """advance the iterator for each rule that is currently at the given min."""
            for r in self.rules:
                try:
                    if r.min == min:
                        r.advance()
                except:
                    pass

        def popMinDate(self):
            """pop off a list of minimums from our list of rules.  some of the
            iterators in our rules list might overlap.  we want to abide by the
            last (most recently added) rule in the list.  So if there are two rules,
            and the first rule includes a date, but the second rule excludes that
            date, then we want to exclude the date."""
            mins = []
            foundAddRule = False
            for r in self.rules:
                try:
                    record = {'min': r.min, 'genrule': r}
                    if r.op == 'add':
                        foundAddRule = True
                    if (not mins) or (r.min < mins[0]['min']):
                        # we encounter a new minimum, so clear the list
                        mins = []
                        mins.append(record)
                    elif r.min == mins[0]['min']:
                        # current minimum matches minimum for this rule
                        mins.append(record)
                except:
                    pass
            if not foundAddRule:
                raise IndexError
            if mins:
                self.advanceAllWithMin(mins[-1]['min'])
                return mins[-1]
            else:
                # done iterating -- all rules exhausted.
                raise IndexError

    def __init__(self, cache=False):
        rrulebase.__init__(self, cache)
        self._allrules = rruleset._allrules()

    def rrule(self, rrule):
        self._allrules.append(rruleset._genrule('add', rrule))

    def rdate(self, rdate):
        self.rdates([rdate])

    def rdates(self, rdates):
        self._allrules.append(rruleset._genrule('add', sorted(rdates)))

    def exrule(self, exrule):
        self._allrules.append(rruleset._genrule('ex', exrule))

    def exdate(self, exdate):
        self.exdates([exdate])

    def exdates(self, exdates):
        self._allrules.append(rruleset._genrule('ex', sorted(exdates)))

    def _iter(self):
        allrules = self._allrules.copyAndReset()
        total = 0
        try:
            while True:
                # this loop will exit when popMinDate raises an exception
                minRecord = allrules.popMinDate()
                if minRecord['genrule'].op == "add":
                    total += 1
                    yield minRecord['min']
        except:
            self._len = total

Leave a Comment