Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
166 views
in Technique[技术] by (71.8m points)

python - Finding Length of Dask dataframe

I am trying to find the length of dask dataframe using len(dataframe[column]) but everytime i try to execute this i get an error:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
Traceback (most recent call last):
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingqueues.py", line 238, in _feed
    send_bytes(obj)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingconnection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingconnection.py", line 280, in _send_bytes
    ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
BrokenPipeError: [WinError 232] The pipe is being closed
distributed.nanny - ERROR - Nanny failed to start process
Traceback (most recent call last):
  File "C:UsershaknehAppDataLocalContinuumanaconda3libsite-packagesdistributed
anny.py", line 575, in start
    await self.process.start()
  File "C:UsershaknehAppDataLocalContinuumanaconda3libsite-packagesdistributedprocess.py", line 34, in _call_and_set_future
    res = func(*args, **kwargs)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libsite-packagesdistributedprocess.py", line 202, in _start
    process.start()
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingprocess.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingcontext.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingcontext.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingpopen_spawn_win32.py", line 89, in __init__
    reduction.dump(process_obj, to_child)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessing
eduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingconnection.py", line 948, in reduce_pipe_connection
    dh = reduction.DupHandle(conn.fileno(), access)
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingconnection.py", line 170, in fileno
    self._check_closed()
  File "C:UsershaknehAppDataLocalContinuumanaconda3libmultiprocessingconnection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting

My dask dataframe has got 10 million rows. Is there any way i can get through this error.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I feel that finding the length of a column will not be so straight forward because Dask might be building a data frame from various sources - something similar why you can get .head() on a dataframe, but need to do something extra do do .tail().

Since you are using such a big dataframe, I believe Python will load anything inside len() into memory.

I got two suggestions but I'm not entirely sure they won't trigger the same exception.

Using pipe

Let's see if this will work, you can try to use pipe on your column and pass len to it, perhaps that could work.

dataframe["column"].pipe(len)

For reference here's the pipe documentation

Partitions

One thing that I'm thinking that it could help is if you partition your column into chunks, that might help to keep your memory usage low, the only issue is that you have to do some guest work on how big these partitions will be.

Another thing you will have to keep track is the length of each partition, this could be a bit messy and I feel that there must be a better way to do it tbh.

length = 0

len += dataframe["column"].partitions[:10000]
len += dataframe["column"].partitions[:20000]

Of course, you could try to use a loop to make the code a bit cleaner.

For reference here's the documentation for dataframe.partitions

Please let me know if any of these work, I hope I can help you.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...