Objective:
- Explore NEON AOP Hyperspectral Data
- Calculate/Visualize Spectral Indices
- Unserpervised Machine Learning (Clustering)
Data: NEON AOP hyperspectral data from LTAR CPER
Strategy:
Load data and visualize at specific points.
Calculate and visualize spectral indices.
Subset and scale data to develop kmeans model.
Fit a Kmeans model with 8 clusters to data (we will impliment two different methods).
Predict and visualize entire dataset.
Parallel Algorithm Detail
For parts 1 - 3 we will rely on Dask to extract, transform, and load (ETL) the hyperspectral data. Parts 4 - 5 we will use Dask-ML wrappers on top of scikit-learn (a well-known machine learning library) classifiers to parallelize computations across the cluster.
%pylab inline
import os
os.environ['GDAL_DATA'] = '/opt/conda/share/gdal'
import dask_jobqueue as jq
#import time
from dask.distributed import progress,wait,Client
import dask.array as da
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
import xarray as xr
from sklearn.cluster import MiniBatchKMeans
from dask_ml.wrappers import Incremental
from dask_ml.wrappers import ParallelPostFit
from dask_ml.preprocessing import StandardScaler
from dask_ml.cluster import KMeans
partition='short,debug,brief-low,mem'
num_processes = 3
num_threads_per_processes = 20
mem = 3.2*num_processes*num_threads_per_processes
n_cores_per_job = num_processes*num_threads_per_processes
container = '/project/geospatial_tutorials/data_science_im_rs_latest.sif'
clust = jq.SLURMCluster(queue=partition,
processes=num_processes,
cores=n_cores_per_job,
memory=str(mem)+'GB',
interface='ib0',
local_directory='$TMPDIR',
death_timeout=60,
python='singularity exec --bind /usr/lib64 --bind /scinet01 --bind /software/7/apps/envi/bin/ '+container+' python',
walltime='00:30:00')
cl=Client(clust)
dash_board = cl.scheduler.address.split('//')[1].split(':')[0]+":"+str(cl.scheduler_info()['services']['dashboard'])
ssh_command = 'ssh -N -L 8787:'+dash_board+' '+os.environ["USER"]+'@login.scinet.science'
print('To port forward diagnostics use:\n'+ssh_command)
cl
clust.scale(n=18)
cl
#Function to exclude bands with H2O or CO2 absorption
def band_list():
good_bands1 = np.linspace(0,188,189).astype(int)
good_bands3 = np.linspace(211,269,269-211+1).astype(int)
good_bands5 = np.linspace(316,425,425-316+1).astype(int)
good_bands = np.hstack([good_bands1,good_bands3,good_bands5])
bad_bands2 = np.linspace(189,210,210-189+1).astype(int)
bad_bands4 = np.linspace(270,315,315-270+1).astype(int)
bad_bands = np.hstack([bad_bands2,bad_bands4])
return(good_bands,bad_bands)
#Persist the data to memory
d_all = xr.open_zarr('/project/geospatial_tutorials/tutorial_1/data/neon_2017_mosaic_brdf_corr.zarr',group='reflectance').isel(wl=band_list()[0]).chunk({'x':'auto','y':'auto','wl':-1}).persist()
d_all
#Plot the spectral signature at point
plt.figure(figsize=(8,6))
d_all.reflectance.interp(x=520002.5554,y=4520000.78758,method='linear').plot.line('b.')
plt.ylabel('Reflectance * 10000')
plt.xlabel('Wavelength (nanometers)')
plt.grid()
#Calculate the spectral indices and add to the "d_all" variable
d_all['indices'] = xr.concat([((d_all.reflectance.sel(wl=858.6,method='nearest')-d_all.reflectance.sel(wl=648.2,method='nearest'))/(d_all.reflectance.sel(wl=858.6,method='nearest')+d_all.reflectance.sel(wl=648.2,method='nearest'))).assign_coords(index='ndvi').expand_dims('index'),
((0.5*(d_all.reflectance.sel(wl=2000,method='nearest')/10000.+d_all.reflectance.sel(wl=2200,method='nearest')/10000.))-d_all.reflectance.sel(wl=2100.,method='nearest')/10000.).drop('wl').assign_coords(index='cai').expand_dims('index'),
((xr.ufuncs.log(1./(d_all.reflectance.sel(wl=1754.,method='nearest')/10000.))-xr.ufuncs.log(1./(d_all.reflectance.sel(wl=1680.,method='nearest')/10000.)))/(xr.ufuncs.log(d_all.reflectance.sel(wl=1754.,method='nearest')/10000.)+xr.ufuncs.log(d_all.reflectance.sel(wl=1680,method='nearest')/10000.))).assign_coords(index='ndli').expand_dims('index'),
((d_all.reflectance.sel(wl=750.,method='nearest')-d_all.reflectance.sel(wl=705.,method='nearest'))/(d_all.reflectance.sel(wl=750.,method='nearest')+d_all.reflectance.sel(wl=705.,method='nearest')-(2.*d_all.reflectance.sel(wl=445.,method='nearest')))).drop('wl').assign_coords(index='mrendvi').expand_dims('index'),
((d_all.reflectance.sel(wl=800.,method='nearest')-d_all.reflectance.sel(wl=445.,method='nearest'))/(d_all.reflectance.sel(wl=800.,method='nearest')-d_all.reflectance.sel(wl=680.,method='nearest'))).assign_coords(index='sipi').expand_dims('index'),
((xr.ufuncs.log(10000./d_all.reflectance.sel(wl=1510.,method='nearest'))-xr.ufuncs.log(10000./d_all.reflectance.sel(wl=1680.,method='nearest')))/(xr.ufuncs.log(10000./d_all.reflectance.sel(wl=1510.,method='nearest'))+xr.ufuncs.log(10000./d_all.reflectance.sel(wl=1680.,method='nearest')))).assign_coords(index='ndni').expand_dims('index'),
((1./(d_all.reflectance.sel(wl=510.,method='nearest')/10000.))-(1./(d_all.reflectance.sel(wl=550.,method='nearest')/10000.))).assign_coords(index='cri1').expand_dims('index'),
((1./(d_all.reflectance.sel(wl=510.,method='nearest')/10000.))-(1./(d_all.reflectance.sel(wl=700.,method='nearest')/10000.))).assign_coords(index='cri2').expand_dims('index')],dim='index').chunk((1.,d_all.reflectance.data.chunksize[0],d_all.reflectance.data.chunksize[1])).transpose('y','x','index').chunk(('auto','auto',1))
#persist the data to memory
d_all['indices'].data = d_all['indices'].data.persist()
#wait for the computation to complete
wait(d_all)
#show the xarray
d_all
#Plot the Indices
fig, axes = plt.subplots(nrows=4,ncols=2,figsize=(24,30))
ax = axes.flatten()
#Loop through each indices a plot at a 1/(50*50) resolution
i=-1
for ind in d_all.coords['index'].values:
print('Plotting '+ind+' ...')
i=i+1
d_all.indices.sel(index=ind).where(d_all.reflectance.isel(wl=22)>-10.).isel(x=slice(None,None,50),y=slice(None,None,50)).plot(ax=ax[i])