Monday, March 6, 2017

Raspberry Pi Experiments: Running Python3 , Jupyter Notebooks and Dask Cluster - Part 2

Its been a while since I posted my last post but had planned for this a while back and completely missed it. In this part of the blog I will be covering more about Dask distributed scheduler, application of dask and where is shines over excel or python pandas and also issues that you may encounter while using disk. 

If you have not read my previous post I suggest you to refer it as that will give you a fair bit of idea about the setup and the background for this post.

For the post I will take the land registry file from http://data.gov.uk has details of land sales in the UK, going back several decades, and is 3.5GB as of August 2016 (this applies only to the "complete" file, "pp-complete.csv"). No registration required.

-- Download file "pp-complete.csv", which has all records.
-- If schema changes/field added, consult: https://www.gov.uk/guidance/about-the-price-paid-data

The file was placed in the below path:
/mnt/nwdrive/Backup/datasets/pp-complete.txt

The Dask Schduler & Worker were started

jns@minibian:~$ nohup /usr/local/bin/dask-scheduler3 >> /tmp/dask.log &

I started the Dask scheduler on 2 Raspberry Pi nodes with the below command
jns@minibian:~$ nohup /usr/local/bin/dask-worker3 192.168.0.7:8786 >> /tmp/dask.log &

1st Node - Schduler & Worker both are working
distributed.nanny - INFO -         Start Nanny at:          192.168.0.7:39087
distributed.worker - INFO -       Start worker at:          192.168.0.7:36579
distributed.worker - INFO -              nanny at:          192.168.0.7:39087
distributed.worker - INFO -               http at:          192.168.0.7:52884
distributed.worker - INFO - Waiting to connect to:          192.168.0.7:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    0.61 GB
distributed.worker - INFO -       Local Directory:        /tmp/nanny-h60j2lh3
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register 192.168.0.7:36579
distributed.worker - INFO -         Registered to:          192.168.0.7:8786
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Starting worker compute stream, 192.168.0.7:36579
distributed.nanny - INFO - Nanny 192.168.0.7:39087 starts worker process 192.168.0.7:36579

2nd Node: - One Worker is running
pi@raspberrypi:~ $ cat /tmp/dask.log
distributed.nanny - INFO -         Start Nanny at:          192.168.0.4:39911
distributed.worker - INFO -       Start worker at:          192.168.0.4:45033
distributed.worker - INFO -              nanny at:          192.168.0.4:39911
distributed.worker - INFO -               http at:          192.168.0.4:41493
distributed.worker - INFO - Waiting to connect to:          192.168.0.7:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                    0.58 GB
distributed.worker - INFO -       Local Directory:        /tmp/nanny-d3ye93s4
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:          192.168.0.7:8786
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - INFO - Nanny 192.168.0.4:39911 starts worker process 192.168.0.4:45033

Now the code that I tested via jupiter notebook:

The first one that I tested was to initialise the Pandas and Dask objects
import pandas as pd
import dask.dataframe as dd
from distributed import Client
client = Client('192.168.0.7:8786')
strcnames = """transaction
price
transfer_date
postcode
property_type
newly_built
duration
paon
saon
street
locality
city
district
county
ppd_category_type
record_status"""
cnames = [col.strip() for col in strcnames.split("\n")]
cnames
 Then I used the below code to load the 3.5 GB data file

import time
start_time = time.time()
df = None
count = 0
for chunk in pd.read_csv("/mnt/nwdrive/Backup/datasets/pp-complete.txt",names=cnames, chunksize=10000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns
        if df is None:
            df = dd.from_pandas(chunk,npartitions=1)
            df = client.persist(df)
        else:
            df.append(chunk)
        count = count + 1
        print(count)
elapsed_time = time.time() - start_time
print(str(elapsed_time))
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
print("{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))

The reason I tried this instead of dark.read_csv was because for it to access on all nodes it would have required the access to the cvs file on all nodes but I had access to that file on only one node and when ever I tried dask.read_csv it kept on failing with the error file not found. Hence as a solution I loaded the cvs file as chunks in pandas and appended to the task dataframe.

With dask-worker3 running on 2 Nodes I got the following timing to load the 3.5 GB file.
00:48:14.91

With dask-worker3 running on  3 Nodes I got the following timing to load the 3.5 GB file.
00:46:17.24

And also when I did a compute on the Dask data frame using the below command the output was good:

%timeit df.groupby(df.county).price.mean().compute()
1 loop, best of 3: 227 ms per loop

Now while comparing to Pandas the file even did not load and became non-responsive

start_time = time.time()
#pdf = pd.read_csv("http://192.168.0.2:8001/pp-monthly-update-new-version.csv",names=cnames)
pdf = pd.read_csv("/mnt/nwdrive/Backup/datasets/pp-complete.txt",names=cnames)
elapsed_time = time.time() - start_time
print(elapsed_time.total_seconds())
hours, rem = divmod(elapsed_time, 3600)
minutes, seconds = divmod(rem, 60)
print("{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds))

Pandas non-responsiveness while processing 3.5GB file

Hence in conclusion I can say that if you are looking at running your data analytics on Raspberry Pi using python then ask is a great contender for large datasets and gives very good response. Hope this was a helpful. In future I will be posting more blogs about how we can leverage Raspberry PI and data analytics.




Raspberry Pi Experiments: Running Python3 , Jupyter Notebooks and Dask Cluster - Part 2

Its been a while since I posted my last post but had planned for this a while back and completely missed it. In this part of the blog I wil...