Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
505 views
in Technique[技术] by (71.8m points)

Can I speed up this simple Dask script to get a total row count across multiple Feather dataframes?

I have data in C:scriptdataYYYYMMdata.feather

To understand Dask better, I am trying to optimize a simple script which gets the row count from each of those files and sums them up. There are almost 100 million rows across 200 files.

import dask.dataframe as dd
import feather
from dask.distributed import Client,LocalCluster
from dask import delayed

counts = []
with LocalCluster() as cluster, Client(cluster) as client:
  for f in dates:
    df = delayed(feather.read_feather)(f'data{f.year}{f.month:02}data.feather',columns=['colA','colB'])
    counts.append(df.shape[0])

tot = sum(counts)
print(dd.compute(tot))

I've included both colA and colB in the read_feather because I want to eventually move to counting distinct values in different timespans. What I see in the task stream is read_feather firing off at about 2 seconds each, as well as a number of disk-write-read_feather and disk-read-getattr taking 2-4 seconds each. If I run read_feather in it's own script on a single file, it takes about 0.4 seconds to complete. Is the additional time for read_feather due to the parallelism? Also, do the disk-* tasks mean it is reading the file, then writing to disk, and reading it again? I'd thought Dask would read in the file, count the rows, and store that number for summing later.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

The appearance of disk-write-read_feather and disk-read-getattr (should be red on your dashboard view) suggests that the dataframe is large in memory and is indeed being cached to disc - which is totally unhelpful in this case.

When you do

df = ...
df.shape[0]

this creates a chain of three delayed calls (i.e., tasks) for each partition, one to load, one to select the attribute and one to select the index. It seems that these calls are not being fused into one, and you have large intermediate results.

Instead you could make sure to combine the calls, and not have big results:

@dask.delayed
def get_size(f):
    df = feather.read_feather(f'data{f.year}{f.month:02}data.feather',columns=['colA','colB'])
    return df.shape[0]

counts = [get_size(f) for f in dates]

However, I don't know if loading feather is expected to play nicely with threading, or if you may be simply maxing out you IO. Your dashboard and system monitoring tools might tell you more.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

1.4m articles

1.4m replys

5 comments

56.8k users

...