Source code for causalexplain.estimators.cam.updateScoreMat

"""
1. **Imports**: Imported necessary modules and the `compute_score_mat_parallel`
function.
2. **Function Definition**: Translated the R function to Python, maintaining the same
logic and structure.
3. **Matrix Operations**: Used NumPy for matrix operations.
4. **Parallel Processing**: Used Python's `multiprocessing.Pool` for parallel
processing, similar to R's `mcmapply`.

Make sure to have the `compute_score_mat_parallel` function defined in a file named
`compute_score_mat_parallel.py`.
"""

# pylint: disable=E1101:no-member, W0201:attribute-defined-outside-init, W0511:fixme
# pylint: disable=C0103:invalid-name, W0221:arguments-differ
# pylint: disable=C0116:missing-function-docstring
# pylint: disable=R0913:too-many-arguments, E0401:import-error
# pylint: disable=R0914:too-many-locals, R0915:too-many-statements
# pylint: disable=W0106:expression-not-assigned, R1702:too-many-branches

from multiprocessing import Pool

import numpy as np

from .computeScoreMatParallel import computeScoreMatParallel


[docs] def updateScoreMat( score_mat, X, score_name, i, j, score_nodes, adj, verbose, num_cores, max_num_parents, pars_score, interv_mat, interv_data): """_summary_ Args: score_mat (_type_): _description_ X (_type_): _description_ score_name (_type_): _description_ i (_type_): _description_ j (_type_): _description_ score_nodes (_type_): _description_ adj (_type_): _description_ output (_type_): _description_ num_cores (_type_): _description_ max_num_parents (_type_): _description_ pars_score (_type_): _description_ interv_mat (_type_): _description_ interv_data (_type_): _description_ Returns: _type_: _description_ """ # new edge: from i to j p = X.shape[1] if verbose: print(f"p: {p}") print("Adjacency matrix---") for r in range(adj.shape[0]): for c in range(p): print(f"{adj[r, c]} ", end="") print() existing_par_of_j = np.where(adj[:, j] == 1)[0] not_allowed_par_of_j = np.setdiff1d( np.where(score_mat[:, j] == -np.inf)[0], np.append(existing_par_of_j, j)) if verbose: print(f"existing_par_of_j: {existing_par_of_j}") print(f"not_allowed_par_of_j: {not_allowed_par_of_j}") # if there is something left that we need to update if len(existing_par_of_j) + len(not_allowed_par_of_j) < p - 1: # update column for j row_parents = np.full((p, len(existing_par_of_j) + 1), np.nan) row_parents[:, :len(existing_par_of_j)] = existing_par_of_j row_parents[:, len(existing_par_of_j)] = np.arange(p) to_update = np.setdiff1d(np.arange(p), np.append( [j], np.append(existing_par_of_j, not_allowed_par_of_j))) if len(existing_par_of_j) < max_num_parents: if num_cores == 1: score_update = [ computeScoreMatParallel( row_parents=row_parents, sel_mat=np.ones((p, p), dtype=bool), score_name=score_name, X=X, verbose=verbose, node2=j, pars_score=pars_score, interv_mat=interv_mat, interv_data=interv_data, i=idx) \ for idx in to_update ] else: with Pool(num_cores) as pool: score_update = pool.starmap( computeScoreMatParallel, [ (row_parents, np.ones((p, p), dtype=bool), score_name, X, verbose, j, pars_score, interv_mat, interv_data, idx) \ for idx in to_update ]) else: score_update = -np.inf score_mat[to_update, j] = score_update - score_nodes[j] return score_mat