Source code for ravenpy.utilities.mk_test

"""
Created on Wed Jul 29 09:16:06 2015
@author: Michael Schramm
"""

from typing import Optional, Tuple, Union

import numpy as np
from scipy.stats import norm

# TODO: This utility is written in python2 and will fail in python3 (e.g. no xrange)


[docs] def mk_test_calc( x: np.ndarray, alpha: float = 0.05 ) -> Optional[Tuple[str, float, float, float]]: """Make test calculation. This function is derived from code originally posted by Sat Kumar Tomer (satkumartomer@gmail.com) See also: http://vsp.pnnl.gov/help/Vsample/Design_Trend_Mann_Kendall.htm The purpose of the Mann-Kendall (MK) test (Mann 1945, Kendall 1975, Gilbert 1987) is to statistically assess if there is a monotonic upward or downward trend of the variable of interest over time. A monotonic upward (downward) trend means that the variable consistently increases (decreases) through time, but the trend may or may not be linear. The MK test can be used in place of a parametric linear regression analysis, which can be used to test if the slope of the estimated linear regression line is different from zero. The regression analysis requires that the residuals from the fitted regression line be normally distributed; an assumption not required by the MK test, that is, the MK test is a non-parametric (distribution-free) test. Hirsch, Slack and Smith (1982, page 107) indicate that the MK test is best viewed as an exploratory analysis and is most appropriately used to identify stations where changes are significant or of large magnitude and to quantify these findings. Parameters ---------- x : np.array a vector of data. alpha : float significance level (0.05 default) Returns ------- str, float, float, float Notes ----- trend: tells the trend (increasing, decreasing or no trend) h: True (if trend is present) or False (if trend is absence) p: p value of the significance test z: normalized test statistics Examples -------- >>> x = np.random.rand(100) >>> trend, h, p, z = mk_test(x, 0.05) """ n = len(x) # calculate S s = 0 for k in range(n - 1): for j in range(k + 1, n): s += np.sign(x[j] - x[k]) # calculate the unique data unique_x = np.unique(x) g = len(unique_x) # calculate the var(s) if n == g: # there is no tie var_s = (n * (n - 1) * (2 * n + 5)) / 18 else: # there are some ties in data tp = np.zeros(unique_x.shape) for i in range(len(unique_x)): tp[i] = sum(x == unique_x[i]) var_s = (n * (n - 1) * (2 * n + 5) - np.sum(tp * (tp - 1) * (2 * tp + 5))) / 18 if s > 0: z = (s - 1) / np.sqrt(var_s) elif s < 0: z = (s + 1) / np.sqrt(var_s) else: # s == 0: z = 0 # calculate the p_value p = 2 * (1 - norm.cdf(abs(z))) # two tail test h = abs(z) > norm.ppf(1 - alpha / 2) if (z < 0) and h: trend = "decreasing" elif (z > 0) and h: trend = "increasing" else: trend = "no trend" return trend, h, p, z
[docs] def check_num_samples( beta: float, delta: float, std_dev: float, alpha: float = 0.05, n: float = 4, num_iter: int = 1000, tol: float = 1e-6, num_cycles: int = 10000, m: int = 5, ) -> Union[int, float]: """Check number of samples. This function is an implementation of the "Calculation of Number of Samples Required to Detect a Trend" section written by Sat Kumar Tomer (satkumartomer@gmail.com) which can be found at: http://vsp.pnnl.gov/help/Vsample/Design_Trend_Mann_Kendall.htm As stated on the webpage in the URL above the method uses a Monte-Carlo simulation to determine the required number of points in time, n, to take a measurement in order to detect a linear trend for specified small probabilities that the MK test will make decision errors. If a non-linear trend is actually present, then the value of n computed by VSP is only an approximation to the correct n. If non-detects are expected in the resulting data, then the value of n computed by VSP is only an approximation to the correct n, and this approximation will tend to be less accurate as the number of non-detects increases. Parameters ---------- beta : float Probability of falsely accepting the null hypothesis delta : float Change per sample period, i.e., the change that occurs between two adjacent sampling times std_dev : float Standard deviation of the sample points. alpha : float Significance level (0.05 default) n : int Initial number of sample points (4 default). num_iter : int Number of iterations of the Monte-Carlo simulation (1000 default). tol : float Tolerance level to decide if the predicted probability is close enough to the required statistical power value (1e-6 default). num_cycles : int Total number of cycles of the simulation. This is to ensure that the simulation does finish regardless of convergence or not (10000 default). m : int If the tolerance is too small then the simulation could continue to cycle through the same sample numbers over and over. This parameter determines how many cycles to look back. If the same number of samples has been determined m cycles ago then the simulation will stop. Examples -------- >>> num_samples = check_num_samples(0.2, 1, 0.1) """ # Initialize the parameters power = 1.0 - beta p_d = 0.0 cycle_num = 0 min_diff_P_d_and_power = abs(p_d - power) best_P_d = p_d max_n = n min_n = n max_n_cycle = 1 min_n_cycle = 1 # Print information for user print(f"Delta (gradient): {delta}") print(f"Standard deviation: {std_dev}") print(f"Statistical power: {power}") # Compute an estimate of probability of detecting a trend if the estimate # Is not close enough to the specified statistical power value or if the # number of iterations exceeds the number of defined cycles. while abs(p_d - power) > tol and cycle_num < num_cycles: cycle_num += 1 print(f"Cycle Number: {cycle_num}") count_of_trend_detections = 0 # Perform MK test for random sample. for i in range(num_iter): r = np.random.normal(loc=0.0, scale=std_dev, size=n) x = r + delta * np.arange(n) trend, h, p, z = mk_test_calc(x, alpha) if h: count_of_trend_detections += 1 p_d = float(count_of_trend_detections) / num_iter # Determine if p_d is close to the power value. if abs(p_d - power) < tol: print(f"P_d: {p_d}") print(f"{n} samples are required") return n # Determine if the calculated probability is closest to the statistical # power. if min_diff_P_d_and_power > abs(p_d - power): min_diff_P_d_and_power = abs(p_d - power) best_P_d = p_d # Update max or min n. if n > max_n and abs(best_P_d - p_d) < tol: max_n = n max_n_cycle = cycle_num elif n < min_n and abs(best_P_d - p_d) < tol: min_n = n min_n_cycle = cycle_num # In case the tolerance is too small we'll stop the cycling when the # number of cycles, n, is cycling between the same values. elif ( abs(max_n - n) == 0 and cycle_num - max_n_cycle >= m or abs(min_n - n) == 0 and cycle_num - min_n_cycle >= m ): print("Number of samples required has converged.") print(f"P_d: {p_d}") print(f"Approximately {n} samples are required") return n # Determine whether to increase or decrease the number of samples. if p_d < power: n += 1 print(f"P_d: {p_d}") print(f"Increasing n to {n}") print("") else: n -= 1 print(f"P_d: {p_d}") print(f"Decreasing n to {n}") print("") if n == 0: raise ValueError("Number of samples = 0. This should not happen.")