Source code for esda.geary_local_mv

import numpy as np
import pandas as pd
from scipy import stats
from sklearn.base import BaseEstimator
from sklearn.utils import check_array


[docs]class Geary_Local_MV(BaseEstimator): """Local Geary - Multivariate"""
[docs] def __init__(self, connectivity=None, permutations=999): """ Initialize a Local_Geary_MV estimator Parameters ---------- connectivity : scipy.sparse matrix object the connectivity structure describing the relationships between observed units. Need not be row-standardized. permutations : int (default=999) number of random permutations for calculation of pseudo p_values Attributes ---------- localG : numpy array array containing the observed multivariate Local Geary values. p_sim : numpy array array containing the simulated p-values for each unit. """ self.connectivity = connectivity self.permutations = permutations
def fit(self, variables): """ Parameters ---------- variables : numpy.ndarray array containing continuous data Returns ------- the fitted estimator. Notes ----- Technical details and derivations can be found in :cite:`Anselin1995`. Examples -------- Guerry data replication GeoDa tutorial >>> import libpysal >>> import geopandas as gpd >>> guerry = lp.examples.load_example('Guerry') >>> guerry_ds = gpd.read_file(guerry.get_path('Guerry.shp')) >>> w = libpysal.weights.Queen.from_dataframe(guerry_ds) >>> import libpysal >>> import geopandas as gpd >>> guerry = lp.examples.load_example('Guerry') >>> guerry_ds = gpd.read_file(guerry.get_path('Guerry.shp')) >>> w = libpysal.weights.Queen.from_dataframe(guerry_ds) >>> x1 = guerry_ds['Donatns'] >>> x2 = guerry_ds['Suicids'] >>> lG_mv = Local_Geary(connectivity=w).fit([x1,x2]) >>> lG_mv.localG[0:5] >>> lG_mv.p_sim[0:5] """ self.variables = check_array( variables, accept_sparse=False, dtype="float", force_all_finite=True, estimator=self, ) w = self.connectivity w.transform = "r" self.n = len(variables[0]) self.w = w permutations = self.permutations # Caclulate z-scores for input variables # to be used in _statistic and _crand zvariables = stats.zscore(variables, axis=1) self.localG = self._statistic(variables, zvariables, w) if permutations: self._crand(zvariables) sim = np.transpose(self.Gs) above = sim >= self.localG larger = above.sum(0) low_extreme = (permutations - larger) < larger larger[low_extreme] = permutations - larger[low_extreme] self.p_sim = (larger + 1.0) / (permutations + 1.0) return self @staticmethod def _statistic(variables, zvariables, w): # Define denominator adjustment k = len(variables) # Create focal and neighbor values adj_list = w.to_adjlist(remove_symmetric=False) zseries = [pd.Series(i, index=w.id_order) for i in zvariables] focal = [zseries[i].loc[adj_list.focal].values for i in range(len(variables))] neighbor = [ zseries[i].loc[adj_list.neighbor].values for i in range(len(variables)) ] # Carry out local Geary calculation gs = adj_list.weight.values * (np.array(focal) - np.array(neighbor)) ** 2 # Reorganize data temp = pd.DataFrame(gs).T temp["ID"] = adj_list.focal.values adj_list_gs = temp.groupby(by="ID").sum() # Rearrange data based on w id order adj_list_gs["w_order"] = w.id_order adj_list_gs.sort_values(by="w_order", inplace=True) localG = np.array(adj_list_gs.sum(axis=1) / k) return localG def _crand(self, zvariables): """ conditional randomization for observation i with ni neighbors, the candidate set cannot include i (we don't want i being a neighbor of i). we have to sample without replacement from a set of ids that doesn't include i. numpy doesn't directly support sampling wo replacement and it is expensive to implement this. instead we omit i from the original ids, permute the ids and take the first ni elements of the permuted ids as the neighbors to i in each randomization. """ nvars = self.variables.shape[0] Gs = np.zeros((self.n, self.permutations)) prange = list(range(self.permutations)) k = self.w.max_neighbors + 1 nn = self.n - 1 rids = np.array([np.random.permutation(nn)[0:k] for i in prange]) ids = np.arange(self.w.n) ido = self.w.id_order w = [self.w.weights[ido[i]] for i in ids] wc = [self.w.cardinalities[ido[i]] for i in ids] for i in range(self.w.n): idsi = ids[ids != i] np.random.shuffle(idsi) vars_rand = [] for j in range(nvars): vars_rand.append(zvariables[j][idsi[rids[:, 0 : wc[i]]]]) # vars rand as tmp # Calculate diff diff = [] for z in range(nvars): diff.append( (np.array((zvariables[z][i] - vars_rand[z]) ** 2 * w[i])).sum(1) / nvars ) # add up differences temp = np.array([sum(x) for x in zip(*diff)]) # Assign to object to be returned Gs[i] = temp self.Gs = Gs