Source code for bids.analysis.analysis

import json
from bids.layout import BIDSLayout
from bids.utils import matches_entities
from bids.variables import BIDSVariableCollection, merge_collections
from . import transformations as transform
from collections import namedtuple, OrderedDict
from six import string_types
import numpy as np
import pandas as pd
from itertools import chain


[docs]class Analysis(object): ''' Represents an entire BIDS-Model analysis. Args: layout (BIDSLayout, str): A BIDSLayout instance or path to pass on to the BIDSLayout initializer. model (str or dict): a BIDS model specification. Can either be a string giving the path of the JSON model spec, or an already-loaded dict containing the model info. ''' def __init__(self, layout, model): if not isinstance(layout, BIDSLayout): layout = BIDSLayout(layout) self.layout = layout if isinstance(model, str): model = json.load(open(model)) self.model = model self._load_blocks(model['blocks']) def __iter__(self): for b in self.blocks: yield b def __getitem__(self, index): if isinstance(index, int): return self.blocks[index] name_matches = list(filter(lambda x: x.name == index, self.blocks)) if not name_matches: raise KeyError('There is no block with the name "%s".' % index) return name_matches[0] def _load_blocks(self, blocks): self.blocks = [] for i, block_args in enumerate(blocks): block = Block(self.layout, index=i, **block_args) self.blocks.append(block)
[docs] def setup(self, blocks=None, agg_func='mean', **kwargs): ''' Set up the sequence of blocks for analysis. Args: blocks (list): Optional list of blocks to set up. Each element must be either an int giving the index of the block in the JSON config block list, or a str giving the (unique) name of the block, as specified in the JSON config. Blocks that do not match either index or name will be skipped. agg_func (str or Callable): The aggregation function to use when combining rows from the previous level of analysis. E.g., when analyzing a 'subject'-level block, inputs coming from the 'session' level are typically averaged to produce individual subject-level estimates. Must be either a string giving the name of a function recognized by apply() in pandas, or a Callable that takes a DataFrame as input and returns a Series or a DataFrame. NOTE: CURRENTLY UNIMPLEMENTED. ''' # In the beginning, there was nothing input_nodes = None # Use inputs from model, and update with kwargs selectors = self.model.get('input', {}) selectors.update(kwargs) for i, b in enumerate(self.blocks): # Skip any blocks whose names or indexes don't match block list if blocks is not None and i not in blocks and b.name not in blocks: continue b.setup(input_nodes, **selectors) input_nodes = b.output_nodes
class Block(object): ''' Represents a single analysis block from a BIDS-Model specification. Args: layout (BIDSLayout): The BIDSLayout containing all project files. level (str): The BIDS keyword to use as the grouping variable; must be one of ['run', 'session', 'subject', or 'dataset']. index (int): The numerical index of the current Block within the sequence of blocks. name (str): Optional name to assign to the block. Must be specified in order to enable name-based indexing in the parent Analysis. transformations (list): List of BIDS-Model transformations to apply. model (dict): The 'model' part of the BIDS-Model block specification. contrasts (list): List of contrasts to apply to the parameter estimates generated when the model is fit. input_nodes (list): Optional list of AnalysisNodes to use as input to this Block (typically, the output from the preceding Block). auto_contrasts (list, bool): Optional list of variable names to create an indicator contrast for. Alternatively, if the boolean value True is passed, a contrast is automatically generated for _all_ available variables. This parameter is over-written by the setting in setup() if the latter is passed. ''' def __init__(self, layout, level, index, name=None, transformations=None, model=None, contrasts=None, input_nodes=None, auto_contrasts=False): self.layout = layout self.level = level self.index = index self.name = name self.transformations = transformations or [] self.model = model or None self.contrasts = contrasts or [] self.input_nodes = input_nodes or [] self.auto_contrasts = auto_contrasts or [] self.output_nodes = [] def _filter_objects(self, objects, kwargs): # Keeps only objects that match target entities, and also removes those # keys from the kwargs dict. valid_ents = {'task', 'subject', 'session', 'run'} entities = {k: kwargs.pop(k) for k in dict(kwargs) if k in valid_ents} objects = [o for o in objects if o.matches_entities(entities)] return (objects, kwargs) def _group_objects(self, objects): # Group list of objects into bins defined by all entities at current # Block level or higher. if self.level == 'dataset': return [objects] groups = OrderedDict() valid_ents = ['subject', 'session', 'task', 'run'] valid_ents = valid_ents[:(valid_ents.index(self.level) + 1)] for o in objects: key = {k: v for k, v in o.entities.items() if k in valid_ents} key = tuple(sorted(key.items(), key=str)) if key not in groups: groups[key] = [] groups[key].append(o) return list(groups.values()) def _concatenate_input_nodes(self, nodes): data, entities = [], [] for n in nodes: contrasts = [c.name for c in n.contrasts] row = pd.Series(np.ones(len(contrasts)), index=contrasts) data.append(row) entities.append(pd.Series(n.entities)) data = pd.concat(data, axis=1).T entities = pd.concat(entities, axis=1).T return BIDSVariableCollection.from_df(data, entities, self.level) def setup(self, input_nodes=None, **kwargs): ''' Set up the Block and construct the design matrix. Args: input_nodes (list): Optional list of Node objects produced by the preceding Block in the analysis. If None, uses any inputs passed in at Block initialization. kwargs: Optional keyword arguments to pass onto load_variables. ''' self.output_nodes = [] input_nodes = input_nodes or self.input_nodes or [] # TODO: remove the scan_length argument entirely once we switch tests # to use the synthetic dataset with image headers. if self.level != 'run': kwargs = kwargs.copy() kwargs.pop('scan_length', None) collections = self.layout.get_collections(self.level, **kwargs) objects = collections + input_nodes objects, kwargs = self._filter_objects(objects, kwargs) groups = self._group_objects(objects) # Set up and validate variable lists model = self.model or {} variables = model.get('variables', []) hrf_variables = model.get('HRF_variables', []) if not set(variables) >= set(hrf_variables): raise ValueError("HRF_variables must be a subset ", "of variables in BIDS model.") for grp in groups: # Split into separate lists of Collections and Nodes input_nodes = [o for o in grp if isinstance(o, AnalysisNode)] colls = list(set(grp) - set(input_nodes)) if input_nodes: node_coll = self._concatenate_input_nodes(input_nodes) colls.append(node_coll) coll = merge_collections(colls) if len(colls) > 1 else colls[0] coll = apply_transformations(coll, self.transformations) if variables: transform.select(coll, variables) node = AnalysisNode(self.level, coll, self.contrasts, input_nodes, self.auto_contrasts) self.output_nodes.append(node) def get_design_matrix(self, names=None, format='long', mode='both', force=False, **kwargs): ''' Get design matrix and associated information. Args: names (list): Optional list of names of variables to include in the returned design matrix. If None, all variables are included. format (str): Whether to return the design matrix in 'long' or 'wide' format. Note that dense design matrices are always returned in 'wide' format. mode (str): Specifies whether to return variables in a sparse representation ('sparse'), dense representation ('dense'), or both ('both'). force (bool): Indicates how to handle columns not of the type indicated by the mode argument. When False, variables of the non-selected type will be silently ignored. When True, variables will be forced to the desired representation. For example, if mode='dense' and force=True, sparse variables will be converted to dense variables and included in the returned design matrix in the .dense attribute. The force argument is ignored entirely if mode='both'. kwargs: Optional keyword arguments. Includes (1) selectors used to constrain which of the available nodes get returned (e.g., passing subject=['01', '02'] will return design information only for subjects '01' and '02'), and (2) arguments passed on to each Variable's to_df() call (e.g., sampling_rate, entities, timing, etc.). Returns: A list of DesignMatrixInfo namedtuples--one per unit of the current analysis level (e.g., if level='run', each element in the list represents the design matrix for a single run). ''' nodes, kwargs = self._filter_objects(self.output_nodes, kwargs) return [n.get_design_matrix(names, format, mode=mode, force=force, **kwargs) for n in nodes] def get_contrasts(self, names=None, variables=None, **kwargs): ''' Return contrast information for the current block. Args: names (list): Optional list of names of contrasts to return. If None (default), all contrasts are returned. variables (bool): Optional list of strings giving the names of design matrix columns to use when generating the matrix of weights. kwargs: Optional keyword arguments used to constrain which of the available nodes get returned (e.g., passing subject=['01', '02'] will return contrast information only for subjects '01' and '02'). Returns: A list with one element per unit of the current analysis level (e.g., if level='run', each element in the list representing the contrast information for a single run). Each element is a list of ContrastInfo namedtuples (one per contrast). ''' nodes, kwargs = self._filter_objects(self.output_nodes, kwargs) return [n.get_contrasts(names, variables) for n in nodes] DesignMatrixInfo = namedtuple('DesignMatrixInfo', ('sparse', 'dense', 'entities')) ContrastInfo = namedtuple('ContrastInfo', ('name', 'weights', 'type', 'entities')) class AnalysisNode(object): ''' A single analysis node generated within a Block. Args: level (str): The level of the Node. Most be one of 'run', 'session', 'subject', or 'dataset'. collection (BIDSVariableCollection): The BIDSVariableCollection containing variables at this Node. contrasts (list): A list of contrasts defined in the originating Block. auto_contrasts (list): Optional list of variable names to create an indicator contrast for. Alternatively, if the boolean value True is passed, a contrast is automatically generated for _all_ available variables. ''' def __init__(self, level, collection, contrasts, input_nodes=None, auto_contrasts=None): self.level = level self.collection = collection self._block_contrasts = contrasts self.input_nodes = input_nodes if auto_contrasts == True: auto_contrasts = collection.variables.keys() self.auto_contrasts = auto_contrasts or [] self._contrasts = None @property def entities(self): return self.collection.entities @property def contrasts(self): if self._contrasts is None: self.get_contrasts() return self._contrasts def get_design_matrix(self, names=None, format='long', mode='both', force=False, **kwargs): ''' Get design matrix and associated information. Args: names (list): Optional list of names of variables to include in the returned design matrix. If None, all variables are included. format (str): Whether to return the design matrix in 'long' or 'wide' format. Note that dense design matrices are always returned in 'wide' format. mode (str): Specifies whether to return variables in a sparse representation ('sparse'), dense representation ('dense'), or both ('both'). force (bool): Indicates how to handle columns not of the type indicated by the mode argument. When False, variables of the non-selected type will be silently ignored. When True, variables will be forced to the desired representation. For example, if mode='dense' and force=True, sparse variables will be converted to dense variables and included in the returned design matrix in the .dense attribute. The force argument is ignored entirely if mode='both'. kwargs: Optional keyword arguments to pass onto each Variable's to_df() call (e.g., sampling_rate, entities, timing, etc.). Returns: A DesignMatrixInfo namedtuple. ''' sparse_df, dense_df = None, None coll = self.collection if self.level != 'run' and mode != 'sparse': mode = 'sparse' include_sparse = include_dense = (force and mode != 'both') if mode in ['sparse', 'both']: kwargs['sparse'] = True sparse_df = coll.to_df(names, format, include_dense=include_dense, **kwargs) if mode in ['dense', 'both']: # The current implementation of pivoting to wide in # BIDSVariableCollection.to_df() breaks if we don't have the # temporal columns to index on, so we force their inclusion first # and then drop them afterwards. kwargs['timing'] = True kwargs['sparse'] = False dense_df = coll.to_df(names, format='wide', include_sparse=include_sparse, **kwargs) if dense_df is not None: dense_df = dense_df.drop(['onset', 'duration'], axis=1) return DesignMatrixInfo(sparse_df, dense_df, self.entities) def get_contrasts(self, names=None, variables=None): ''' Return contrast information for the current block. Args: names (list): Optional list of names of contrasts to return. If None (default), all contrasts are returned. variables (bool): Optional list of strings giving the names of design matrix columns to use when generating the matrix of weights. Notes: The 'variables' argument take precedence over the natural process of column selection. I.e., if a variable shows up in a contrast, but isn't named in variables, it will *not* be included in the returned Returns: A list of ContrastInfo namedtuples, one per contrast. ''' # Verify that there are no invalid columns in the condition_lists all_conds = [c['condition_list'] for c in self._block_contrasts] all_conds = set(chain(*all_conds)) bad_conds = all_conds - set(self.collection.variables.keys()) if bad_conds: raise ValueError("Invalid condition names passed in one or more " " contrast condition lists: %s." % bad_conds) # Construct a list of all contrasts, including identity contrasts contrasts = list(self._block_contrasts) # Check that all contrasts have unique name contrast_names = [c['name'] for c in contrasts] if len(set(contrast_names)) < len(contrast_names): raise ValueError("One or more contrasts have the same name") contrast_names = list(set(contrast_names)) if self.auto_contrasts: for col_name in self.auto_contrasts: if (col_name in self.collection.variables.keys() and col_name not in contrast_names): contrasts.append({ 'name': col_name, 'condition_list': [col_name], 'weights': [1], 'type': 'T' }) # Filter on desired contrast names if passed if names is not None: contrasts = [c for c in contrasts if c['name'] in names] def setup_contrast(c): weights = np.atleast_2d(c['weights']) weights = pd.DataFrame(weights, columns=c['condition_list']) # If variables were explicitly passed, use them as the columns if variables is not None: var_df = pd.DataFrame(columns=variables) weights = pd.concat([weights, var_df])[variables].fillna(0) test_type = c.get('type', ('T' if len(weights) == 1 else 'F')) return ContrastInfo(c['name'], weights, test_type, self.entities) self._contrasts = [setup_contrast(c) for c in contrasts] return self._contrasts def matches_entities(self, entities, strict=False): ''' Determine whether current AnalysisNode matches passed entities. Args: entities (dict): Dictionary of entities to match. Keys are entity names; values are single values or lists. strict (bool): If True, _all_ entities in the current Node must match in order to return True. ''' return matches_entities(self, entities, strict) def apply_transformations(collection, transformations, select=None): ''' Apply all transformations to the variables in the collection. Args: transformations (list): List of transformations to apply. select (list): Optional list of names of variables to retain after all transformations are applied. ''' for t in transformations: kwargs = dict(t) func = kwargs.pop('name') cols = kwargs.pop('input', None) if isinstance(func, string_types): if func in ('and', 'or'): func += '_' if not hasattr(transform, func): raise ValueError("No transformation '%s' found!" % func) func = getattr(transform, func) func(collection, cols, **kwargs) if select is not None: transform.select(collection, select) return collection