Geospatial Analysis with Python on Ceres

Rowan Gaffney (rowan.gaffney@ars.usda.gov)

1. Data on Ceres

Transferring Data

  1. I2 Connection (at select locations)
    • Secure Copy (SCP) to transfer data
    • Additional documentation on Scinet Basecamp
  2. Globus
    • Specifically designed for HPC data transfer. Intuitive web based GUI. Globus Link
    • Additional documentation on Scinet Basecamp

Reducing Data Size

Data Type

Scaling and changing the data type is an effective way to reduce the overall size of your data. A common datatype is floating point 64 bit, which has a level of precision that is often far greater than the precision in the data. Consider reflectance data ranging from 0.0 to 1.0 in floating point representation. Scaling by 10,000, and converting to int16 (16 bits) perserves the precision of the data and can reduce the size by a factor of 4.

Consider the Below Example:

In [1]:
import numpy as np

array1 = np.random.random((1000,10000)).astype(np.float64)
print('This array, in '+str(array1.dtype)+', is '+str(array1.nbytes*1e-6)+' MB')

array2 = (np.random.random((1000,10000))*10000.).astype(np.int16)
print('This array, in '+str(array2.dtype)+', is '+str(array2.nbytes*1e-6)+' MB')
This array, in float64, is 80.0 MB
This array, in int16, is 20.0 MB

Data Compression

Most raster formats have internal lossless compression options. Depending on the nature of your data, this can reduce the size substantially. For detailed specifications of raster formats see the GDAL specifications.

Virtual Rasters

A common issue with geospatial analysis is working with data in different projections. A typical workflow may be to reproject data into a common projection, which results in duplication. An alternative is to use Virtual Raster Files (.VRT), which a simple .xml files that describe how the data should be transformed when opened.

In addition to re-projecting, Virtual Raster Files can be used to mosaic, alter resolutions, resample, etc... An efficient tool for building VRT files is gdalbuildvrt

2. Python Setup

Overall Setup - Background

Packages / Resouces

  • Pangeo : NSF funded project for Big Data Geoscience. Implements a system very similiar to the container used on Ceres (titled: data_science_im_rs)
  • JupyterLab: Veb-based user interface (IDE for Python, R, IDL, etc...)
  • Dask: Parallel Computing Library
  • Xarray: Labelled multi-dimensional array package
  • Numpy: Fundamental package for scientific computing with Python

Cluster

  • Direct Acyclic Graph to distribute data and processing across the cluster (very different from a MPI style cluster).
    • Client
    • Scheduler
    • Workers

Container

Sbatch Script

  • Located at: /project/geospatial_tutorials/data_science_nb_dask.sbatch
  • Submit job to Ceres Resource management system (SLURM)
  • This job will run the JupyterLab server using the container mentioned above (data_science_im_rs.simg)
  • Command: sbatch data_science_nb_dask.sbatch /Dir/To/Parent/Folder/

Step by Step Instructions

Below are the steps/commands to setup the Python/JupyterLab environment. Below the instructions is a video of the process on Ceres.

  1. Login to Scinet

    • Depends on your host system (Windows, Linux, Mac). For me (on linux), I execute the following command on my local machine:

      ssh user.name@login.scinet.science

    • The Scinet basecamp site has a thorough set of resources for getting started. Below are some specific links:

  2. Submit Sbatch Job

    • This "Job" will run the JupyterLab server on Ceres.
    • sbatch + sbatch_file + python_analysis_dir. For the tutorial:

      • sbatch_file is at:/project/geospatial_tutorials/data_science_nb_dask.sbatch
      • the python_analysis_dir can be anyplace you have read/write permissions. For this example, lets use our home folder: /home/user.name

      For me, the command looks like:

      sbatch /project/geospatial_tutorials/data_science_nb_dask.sbatch /home/rowan.gaffney/

    • This will produce a file in the directory the command was run titled DataScienceLab_stdout.XXXXXX, where the XXXXXX is the job ID.
    • To view all your current jobs:

      squeue --user=USER.NAME

  1. Port Forward JupyterLab

    • Open the DataScienceLab_stdout.XXXXXX file like:

      nano DataScienceLab_stdout.XXXXXX

    • The top of the file should look like:

      >

Copy/Paste this in your local terminal to ssh tunnel with remote
-----------------------------------------------------------------
ssh -N -L 9999:10.1.4.65:38141 USER.NAME@login.scinet.science
-----------------------------------------------------------------

Password/Token for this session is:
-----------------------------------------------------------------
password=XXXXXXXXXXXXXX

Then open a browser on your local machine to the following address
------------------------------------------------------------------
localhost:9999/?token=XXXXXXXXXXXXXX  (prefix with:https:// if using password)
------------------------------------------------------------------
  • This file will have the information to port forward JupyterLab to your local browser. How you port forward will depend on your local system (Windows, Mac, or Linux). For me (on linux), I execute the following command on my local machine:

    ssh -N -L 9999:XX.X.X.XX:XXXXX USER.NAME@login.scinet.science

  1. Open JupyterLab in your web browser
    • Go to your web browser and type: http://localhost:9999 in the address bar. It may ask for a password, which is located at the top of the DataScienceLab_stdout.XXXXXX file (see step 3).

3. Cluster Setup

Overall Setup - Background

  • Uses the Dask Jobqueue Library to submit jobs to SLURM. Each "Slurm job" has X number of "Python workers".

  • Scales across nodes and partitions.

  • Number of workers can be scaled up or down dynamically.

  • Subject to SLURM resource allocation.

  • JupyterLab has a Dask add-on to monitor the cluster (need to port forward).

  • Dask includes a Dataframe (ie: Pandas) and Array (ie: Numpy) equivalent features.

  • Dask is used by XARRAY - a geospatial data package.

Step by Step Instructions

Below are the steps/commands to setup the cluster. Below these steps is a video of the process on Ceres.

  1. Load Relevant Libraries
In [1]:
import os
import time
import dask_jobqueue as jq
from dask.distributed import Client,wait
import dask.array as da
  1. Setup the Client

Need to specify:

  • Partition: You may want to change the partition (short, mem, brief-low, etc...) to whatever is available.
  • Location of Singularity Image/Container
  • SLURM job and python worker structure. In this example, for each SLURM JOB there are:
    • 3 Python workers (i.e. processes)
    • 4 cores per Python worker
    • 3.2 GB per core
    • The SLURM job will last 10 minutes (wall time)
    • The SLURM job will be run on the short and brief-low partitions
In [2]:
partition='short,brief-low'
num_processes = 3
num_threads_per_processes = 4
mem = 3.2*num_processes*num_threads_per_processes
n_cores_per_job = num_processes*num_threads_per_processes
container = '/project/geospatial_tutorials/data_science_im_rs_latest.sif'

clust = jq.SLURMCluster(queue=partition,
                        processes=num_processes,
                        cores=n_cores_per_job,
                        memory=str(mem)+'GB',
                        interface='ib0',
                        local_directory='$TMPDIR',
                        death_timeout=60,
                        python='singularity exec --bind /usr/lib64 --bind /scinet01 --bind /software/7/apps/envi/bin/ '+container+' python',
                        walltime='00:10:00')
cl=Client(clust)

dash_board = cl.scheduler.address.split('//')[1].split(':')[0]+":"+str(cl.scheduler_info()['services']['dashboard'])
ssh_command = 'ssh -N -L 8787:'+dash_board+' '+os.environ["USER"]+'@login.scinet.science'
print('To port forward diagnostics use:\n'+ssh_command)
cl
To port forward diagnostics use:
ssh -N -L 8787:10.1.8.24:8787 rowan.gaffney@login.scinet.science
Out[2]:

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B
  1. Port Forward the Diagnostics Dashboard

Now we want to visualize activity on the cluster using the Dask Diagnostic Dashboard. To do so, we need to port forward the Diagnostics application. The information to do this should be displayed from the output of the above code. For me (on linux), I execute the following command on my local machine:

ssh -N -L 8787:XX.X.X.XX:8787 rowan.gaffney@login.scinet.science

Once port 8787 is forwarded to your local machine, you can launch the Diagnostic Tools in JupyterLab using the following icon:
I like to view the following graphs:

  • Workers
  • Processing Tasks
  • Progress
  • Memory
  • Task Stream graphs.
  1. Launch the Workers

Now we can Launch workers (and wait 15 seconds for them to load)

In [3]:
clust.scale(n=5*num_processes)
time.sleep(15)

The cluster, at any time, can be summerized with the following command. Depending on how slow slurm and the Dask cluster are, you may need to wait for the workers to load.

In [4]:
cl
Out[4]:

Client

Cluster

  • Workers: 15
  • Cores: 60
  • Memory: 192.00 GB

A few quick example.

  1. 60 GB data: Calculate the mean without holding the data in memory
  2. 600 GB data: Calculate the mean without holding the data in memory
  3. 60 GB data: Persist the data to memory and calculate the mean
In [5]:
t = da.random.random((10000,7500,100),chunks=(400,400,-1))
print('This array is '+str(t.nbytes*1e-9)+' GB')
This array is 60.00000000000001 GB
In [6]:
t
Out[6]:
Array Chunk
Bytes 60.00 GB 128.00 MB
Shape (10000, 7500, 100) (400, 400, 100)
Count 475 Tasks 475 Chunks
Type float64 numpy.ndarray
100 7500 10000
In [7]:
t2 = t.mean()

Now we will dynamically load the data, compute the results, and drop the data.

In [9]:
t2.compute()
Out[9]:
0.5000085710328511

Lets try working with data larger than memory

In [10]:
t = da.random.random((100000,7500,100),chunks=(400,400,-1))
#wait(t)
print('This array is '+str(t.nbytes*1e-9)+' GB')
t
This array is 600.0 GB
Out[10]:
Array Chunk
Bytes 600.00 GB 128.00 MB
Shape (100000, 7500, 100) (400, 400, 100)
Count 4750 Tasks 4750 Chunks
Type float64 numpy.ndarray
100 7500 100000
In [11]:
t2 = t.mean()
In [12]:
t2.compute()
Out[12]:
0.49999990579006276

Alternatively, we can load the data to the cluster with the "persist" option

In [13]:
t = da.random.random((10000,7500,100),chunks=(400,400,-1)).persist()
wait(t)
print('This array is '+str(t.nbytes*1e-9)+' GB')
t
This array is 60.00000000000001 GB
Out[13]:
Array Chunk
Bytes 60.00 GB 128.00 MB
Shape (10000, 7500, 100) (400, 400, 100)
Count 475 Tasks 475 Chunks
Type float64 numpy.ndarray
100 7500 10000
In [17]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:75% !important; }</style>"))