diff --git a/.gitignore b/.gitignore index 0fa80837..fd3b7b8f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ .idea .eggs .pytest_cache +client_work + notebooks *.egg-info __pycache__ diff --git a/CHANGELOG.md b/CHANGELOG.md index 96c3fe52..218b4e72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ # Changelog: +### August 4, 2020 +* Experiment Class: Representation of an experiment as one or more configured System Models + ### June 22, 2020 * Bug Fix: Multiprocessing error for Windows diff --git a/README.md b/README.md index 0b5c34ce..90b9ac7d 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ / ___/ __` / __ / / / /| | / / / / / /__/ /_/ / /_/ / /___/ ___ |/ /_/ / \___/\__,_/\__,_/\____/_/ |_/_____/ -by cadCAD ver. 0.4.17 +by cadCAD ver. 0.4.18 ====================================== Complex Adaptive Dynamics o i e diff --git a/cadCAD/configuration/__init__.py b/cadCAD/configuration/__init__.py index 7b61cd88..91a550a9 100644 --- a/cadCAD/configuration/__init__.py +++ b/cadCAD/configuration/__init__.py @@ -1,10 +1,8 @@ -from pprint import pprint from typing import Dict, Callable, List, Tuple from pandas.core.frame import DataFrame from collections import deque from copy import deepcopy import pandas as pd -# from time import time from cadCAD import configs from cadCAD.utils import key_filter @@ -13,11 +11,10 @@ class Configuration(object): - def __init__(self, user_id, sim_config={}, initial_state={}, seeds={}, env_processes={}, + def __init__(self, user_id, subset_id, subset_window, sim_config={}, initial_state={}, seeds={}, env_processes={}, exogenous_states={}, partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], session_id=0, simulation_id=0, run_id=1, experiment_id=0, exp_window=deque([0, None], 2), **kwargs ) -> None: - # print(exogenous_states) self.sim_config = sim_config self.initial_state = initial_state self.seeds = seeds @@ -28,11 +25,13 @@ def __init__(self, user_id, sim_config={}, initial_state={}, seeds={}, env_proce self.kwargs = kwargs self.user_id = user_id - self.session_id = session_id # eesntially config id + self.session_id = session_id # essentially config id self.simulation_id = simulation_id self.run_id = run_id self.experiment_id = experiment_id self.exp_window = exp_window + self.subset_id = subset_id + self.subset_window = subset_window sanitize_config(self) @@ -40,12 +39,13 @@ def __init__(self, user_id, sim_config={}, initial_state={}, seeds={}, env_proce class Experiment: def __init__(self): self.exp_id = 0 + self.subset_id = 0 self.exp_window = deque([self.exp_id, None], 2) + self.subset_window = deque([self.subset_id, None], 2) def append_configs( self, user_id='cadCAD_user', - session_id=0, # ToDo: change to string sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={}, partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True, config_list=configs @@ -71,10 +71,11 @@ def append_configs( sim_cnt = 0 new_sim_configs = [] - for t in list(zip(sim_configs, list(range(len(sim_configs))))): + for subset_id, t in enumerate(list(zip(sim_configs, list(range(len(sim_configs)))))): sim_config = t[0] + sim_config['subset_id'] = subset_id + sim_config['subset_window'] = self.subset_window N = sim_config['N'] - if N > 1: for n in range(N): sim_config['simulation_id'] = simulation_id + sim_cnt @@ -92,8 +93,8 @@ def append_configs( sim_cnt += 1 run_id = 0 - print(self.exp_id) for sim_config in new_sim_configs: + subset_id = sim_config['subset_id'] sim_config['N'] = run_id + 1 if max_runs == 1: sim_config['run_id'] = run_id @@ -115,110 +116,17 @@ def append_configs( user_id=user_id, session_id=f"{user_id}={sim_config['simulation_id']}_{sim_config['run_id']}", simulation_id=sim_config['simulation_id'], - # run_id=run_id_config run_id=sim_config['run_id'], + experiment_id=self.exp_id, - exp_window=self.exp_window + exp_window=self.exp_window, + subset_id=subset_id, + subset_window=self.subset_window ) configs.append(config) run_id += 1 - - # print(exp_cnt) self.exp_id += 1 self.exp_window.appendleft(self.exp_id) - # print() - # print(self.exp_id) - - -# def append_configs( -# user_id='cadCAD_user', -# session_id=0, #ToDo: change to string -# sim_configs={}, initial_state={}, seeds={}, raw_exogenous_states={}, env_processes={}, -# partial_state_update_blocks={}, policy_ops=[lambda a, b: a + b], _exo_update_per_ts: bool = True, -# config_list=configs -# ) -> None: -# -# try: -# max_runs = sim_configs[0]['N'] -# except KeyError: -# max_runs = sim_configs['N'] -# -# if _exo_update_per_ts is True: -# exogenous_states = exo_update_per_ts(raw_exogenous_states) -# else: -# exogenous_states = raw_exogenous_states -# -# if isinstance(sim_configs, dict): -# sim_configs = [sim_configs] -# -# simulation_id = 0 -# if len(config_list) > 0: -# last_config = config_list[-1] -# simulation_id = last_config.simulation_id + 1 -# -# sim_cnt = 0 -# new_sim_configs = [] -# for t in list(zip(sim_configs, list(range(len(sim_configs))))): -# sim_config = t[0] -# N = sim_config['N'] -# -# if N > 1: -# for n in range(N): -# sim_config['simulation_id'] = simulation_id + sim_cnt -# sim_config['run_id'] = n -# sim_config['N'] = 1 -# # sim_config['N'] = n + 1 -# new_sim_configs.append(deepcopy(sim_config)) -# del sim_config -# else: -# sim_config['simulation_id'] = simulation_id -# sim_config['run_id'] = 0 -# new_sim_configs.append(deepcopy(sim_config)) -# # del sim_config -# -# sim_cnt += 1 -# -# # print(configs) -# run_id = 0 -# # if len(configs) > 0: -# # ds_run_id = configs[-1].__dict__['run_id'] + 1 -# # # print() -# # # print(configs[-1].__dict__['simulation_id']) -# # # print(configs[-1].__dict__['run_id']) -# # # configs[-1].__dict__['run_id'] -# # # print(configs[-1].__dict__['run_id'] + 1) -# # run_id = ds_run_id + 1 -# -# for sim_config in new_sim_configs: -# sim_config['N'] = run_id + 1 -# if max_runs == 1: -# sim_config['run_id'] = run_id -# elif max_runs >= 1: -# if run_id >= max_runs: -# sim_config['N'] = run_id - (max_runs - 1) -# -# config = Configuration( -# sim_config=sim_config, -# initial_state=initial_state, -# seeds=seeds, -# exogenous_states=exogenous_states, -# env_processes=env_processes, -# partial_state_update_blocks=partial_state_update_blocks, -# policy_ops=policy_ops, -# -# # session_id=session_id, -# user_id=user_id, -# session_id=f"{user_id}={sim_config['simulation_id']}_{sim_config['run_id']}", -# simulation_id=sim_config['simulation_id'], -# # run_id=run_id_config -# run_id=sim_config['run_id'] -# ) -# configs.append(config) -# run_id += 1 -# -# # print(configs) -# # print(f'simulation_id: {configs[-1].__dict__["simulation_id"]}') -# # print(f'run_id: {configs[-1].__dict__["run_id"]}') class Identity: @@ -300,7 +208,7 @@ def only_ep_handler(state_dict): return sdf_values, bdf_values if len(partial_state_updates) != 0: - # backwards compatibility # + # backwards compatibility partial_state_updates = sanitize_partial_state_updates(partial_state_updates) bdf = self.create_matrix_field(partial_state_updates, 'policies') @@ -312,13 +220,3 @@ def only_ep_handler(state_dict): zipped_list = list(zip(sdf_values, bdf_values)) return list(map(lambda x: (x[0] + exo_proc, x[1]), zipped_list)) - -# def timing_val(func): -# def wrapper(*arg, **kw): -# '''source: http://www.daniweb.com/code/snippet368.html''' -# t1 = time() -# res = func(*arg, **kw) -# t2 = time() -# print(f"{func.__name__}: {t2-t1 :.5f}") -# return res -# return wrapper diff --git a/cadCAD/configuration/utils/__init__.py b/cadCAD/configuration/utils/__init__.py index d9e9d014..3e85afa6 100644 --- a/cadCAD/configuration/utils/__init__.py +++ b/cadCAD/configuration/utils/__init__.py @@ -1,5 +1,5 @@ -from collections import Counter from datetime import datetime, timedelta +from collections import Counter from copy import deepcopy from functools import reduce from funcy import curry @@ -41,9 +41,9 @@ def configs_as_spec(configs): def configs_as_objs(configs): counted_IDs_configs = configs_as_spec(configs) - new_config = list(map(lambda x: x[1], counted_IDs_configs)) + new_configs = list(map(lambda x: x[1], counted_IDs_configs)) del counted_IDs_configs - return new_config + return new_configs def configs_as_dicts(configs): @@ -79,7 +79,6 @@ def bound_norm_random(rng, low, high): tstep_delta = timedelta(days=0, minutes=0, seconds=30) def time_step(dt_str, dt_format='%Y-%m-%d %H:%M:%S', _timedelta = tstep_delta): - # print(dt_str) dt = datetime.strptime(dt_str, dt_format) t = dt + _timedelta return t.strftime(dt_format) @@ -87,7 +86,6 @@ def time_step(dt_str, dt_format='%Y-%m-%d %H:%M:%S', _timedelta = tstep_delta): ep_t_delta = timedelta(days=0, minutes=0, seconds=1) def ep_time_step(s_condition, dt_str, fromat_str='%Y-%m-%d %H:%M:%S', _timedelta = ep_t_delta): - # print(dt_str) if s_condition: return time_step(dt_str, fromat_str, _timedelta) else: diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 8c1e27db..bb8af18b 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,15 +1,13 @@ -from pprint import pprint from time import time from typing import Callable, Dict, List, Any, Tuple -# from time import time -# from tqdm import tqdm -from cadCAD import logo + from cadCAD.utils import flatten from cadCAD.utils.execution import print_exec_info from cadCAD.configuration import Configuration, Processor -from cadCAD.configuration.utils import TensorFieldReport +from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs from cadCAD.engine.simulation import Executor as SimExecutor from cadCAD.engine.execution import single_proc_exec, parallelize_simulations, local_simulations +from cadCAD.configuration.utils import configs_as_dicts VarDictType = Dict[str, List[Any]] StatesListsType = List[Dict[str, Any]] @@ -39,10 +37,18 @@ def __init__(self, context=ExecutionMode.local_mode, method=None, additional_obj elif context == 'dist_proc': def distroduce_proc( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, RunIDs, + ExpIDs, + SubsetIDs, + SubsetWindows, + exec_method, sc, additional_objs=additional_objs ): return method( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, RunIDs, + ExpIDs, + SubsetIDs, + SubsetWindows, + exec_method, sc, additional_objs ) @@ -52,7 +58,7 @@ def distroduce_proc( class Executor: def __init__(self, exec_context: ExecutionContext, configs: List[Configuration], spark_context=None - ) -> None: + ) -> None: self.sc = spark_context self.SimExecutor = SimExecutor self.exec_method = exec_context.method @@ -65,22 +71,21 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]: sessions = [] var_dict_list, states_lists = [], [] - Ts, Ns, SimIDs, RunIDs, ExpIDs, ExpWindows = [], [], [], [], [], [] + Ts, Ns, SimIDs, RunIDs = [], [], [], [] + ExpIDs, ExpWindows, SubsetIDs, SubsetWindows = [], [], [], [] eps, configs_structs, env_processes_list = [], [], [] partial_state_updates, sim_executors = [], [] config_idx = 0 - print_exec_info(self.exec_context, self.configs) + print_exec_info(self.exec_context, configs_as_objs(self.configs)) -# danlessa_experiments -# for x in tqdm(self.configs, -# desc='Initializing configurations'): t1 = time() for x in self.configs: sessions.append( { 'user_id': x.user_id, 'experiment_id': x.experiment_id, 'session_id': x.session_id, - 'simulation_id': x.simulation_id, 'run_id': x.run_id + 'simulation_id': x.simulation_id, 'run_id': x.run_id, + 'subset_id': x.subset_id, 'subset_window': x.subset_window } ) Ts.append(x.sim_config['T']) @@ -90,6 +95,8 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]: ExpWindows.append(x.exp_window) SimIDs.append(x.simulation_id) RunIDs.append(x.run_id) + SubsetIDs.append(x.subset_id) + SubsetWindows.append(x.subset_window) var_dict_list.append(x.sim_config['M']) states_lists.append([x.initial_state]) @@ -114,7 +121,7 @@ def get_final_results(simulations, psus, eps, sessions, remote_threshold): flat_simulations = flatten(flat_timesteps) if config_amt == 1: return simulations, tensor_fields, sessions - elif (config_amt > 1) and (config_amt < remote_threshold): + elif (config_amt > 1): # and (config_amt < remote_threshold): return flat_simulations, tensor_fields, sessions remote_threshold = 100 @@ -124,16 +131,17 @@ def auto_mode_switcher(config_amt): try: if config_amt == 1: return ExecutionMode.single_mode, single_proc_exec - elif (config_amt > 1) and (config_amt < remote_threshold): + elif (config_amt > 1): # and (config_amt < remote_threshold): return ExecutionMode.multi_mode, parallelize_simulations except AttributeError: if config_amt < 1: - print('N must be > 1!') - elif config_amt > remote_threshold: - print('Remote Threshold is N=100. Use ExecutionMode.dist_proc if N >= 100') + raise ValueError('N must be > 1!') + # elif config_amt > remote_threshold: + # print('Remote Threshold is N=100. Use ExecutionMode.dist_proc if N >= 100') final_result = None - original_context = self.exec_context + # original_context = self.exec_context + original_N = len(configs_as_dicts(self.configs)) if self.exec_context != ExecutionMode.distributed: # Consider Legacy Support if self.exec_context != ExecutionMode.local_mode: @@ -142,14 +150,14 @@ def auto_mode_switcher(config_amt): print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, - ExpIDs #Ns + ExpIDs, SubsetIDs, SubsetWindows, original_N ) final_result = get_final_results(simulations_results, partial_state_updates, eps, sessions, remote_threshold) elif self.exec_context == ExecutionMode.distributed: print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, - SimIDs, RunIDs, ExpIDs, self.sc + SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.sc ) final_result = get_final_dist_results(simulations_results, partial_state_updates, eps, sessions) @@ -157,13 +165,3 @@ def auto_mode_switcher(config_amt): print(f"Total execution time: {t2 - t1 :.2f}s") return final_result - -# danlessa_experiments: -> get_final_dist_results -# results = [] -# zipped_results = zip(simulations, partial_state_updates, eps) -# for result, partial_state_updates, ep in tqdm(zipped_results, -# total=len(simulations), -# desc='Flattening results'): -# results.append((flatten(result), create_tensor_field(partial_state_updates, ep))) - -# final_result = results diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 93c933ea..2caa5f0d 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,4 +1,3 @@ -from pprint import pprint from typing import Callable, Dict, List, Any, Tuple from pathos.multiprocessing import ThreadPool as TPool from pathos.multiprocessing import ProcessPool as PPool @@ -13,44 +12,53 @@ def single_proc_exec( - simulation_execs: List[Callable], - var_dict_list: List[VarDictType], - states_lists: List[StatesListsType], - configs_structs: List[ConfigsType], - env_processes_list: List[EnvProcessesType], - Ts: List[range], - SimIDs, - Ns: List[int], - ExpIDs: List[int] - ): + simulation_execs: List[Callable], + var_dict_list: List[VarDictType], + states_lists: List[StatesListsType], + configs_structs: List[ConfigsType], + env_processes_list: List[EnvProcessesType], + Ts: List[range], + SimIDs, + Ns: List[int], + ExpIDs: List[int], + SubsetIDs, + SubsetWindows, + configured_n +): print(f'Execution Mode: single_threaded') - params = [simulation_execs, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns] - simulation_exec, states_list, config, env_processes, T, SimID, N = list(map(lambda x: x.pop(), params)) - result = simulation_exec(var_dict_list, states_list, config, env_processes, T, SimID, N) + params = [ + simulation_execs, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows + ] + simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list( + map(lambda x: x.pop(), params) + ) + result = simulation_exec( + var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + ) return flatten(result) def parallelize_simulations( - simulation_execs: List[Callable], - var_dict_list: List[VarDictType], - states_lists: List[StatesListsType], - configs_structs: List[ConfigsType], - env_processes_list: List[EnvProcessesType], - Ts: List[range], - SimIDs, - Ns: List[int], - ExpIDs: List[int] - ): - # indexed sim_ids - # SimIDs = list(range(len(SimIDs))) - # print(SimIDs) - # print(list(range(len(Ns)))) - # print(Ns) - # exit() + simulation_execs: List[Callable], + var_dict_list: List[VarDictType], + states_lists: List[StatesListsType], + configs_structs: List[ConfigsType], + env_processes_list: List[EnvProcessesType], + Ts: List[range], + SimIDs, + Ns: List[int], + ExpIDs: List[int], + SubsetIDs, + SubsetWindows, + configured_n +): print(f'Execution Mode: parallelized') params = list( - zip(simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns) + zip( + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, + Ts, SimIDs, Ns, SubsetIDs, SubsetWindows + ) ) len_configs_structs = len(configs_structs) @@ -60,7 +68,6 @@ def parallelize_simulations( highest_divisor = int(len_configs_structs / sim_count) new_configs_structs, new_params = [], [] - print() for count in range(sim_count): if count == 0: new_params.append( @@ -70,7 +77,6 @@ def parallelize_simulations( configs_structs[count: highest_divisor] ) elif count > 0: - # print(params) new_params.append( params[count * highest_divisor: (count + 1) * highest_divisor] ) @@ -78,39 +84,30 @@ def parallelize_simulations( configs_structs[count * highest_divisor: (count + 1) * highest_divisor] ) - print(SimIDs) - print(Ns) - print(ExpIDs) - # pprint(new_configs_structs) - # exit() def threaded_executor(params): - # tp = TPool(len_configs_structs) tp = TPool() if len_configs_structs > 1: - results = tp.map(lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7]), params) + results = tp.map( + lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params + ) else: t = params[0] - results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7]) + results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n) tp.close() return results - # len_new_configs_structs = len(new_configs_structs) - # pp = PPool(len_new_configs_structs) - # len(new_params) - # print('params len: '+ str(len(new_params))) - - # pprint(params) - # print() - pp = PPool() results = flatten(list(pp.map(lambda params: threaded_executor(params), new_params))) pp.close() + pp.join() + pp.clear() + # pp.restart() + return results -remote_threshold = 100 def local_simulations( simulation_execs: List[Callable], var_dict_list: List[VarDictType], @@ -120,19 +117,26 @@ def local_simulations( Ts: List[range], SimIDs, Ns: List[int], - ExpIDs: List[int] + ExpIDs: List[int], + SubsetIDs, + SubsetWindows, + configured_n ): + print(f'SimIDs : {SimIDs}') + print(f'SubsetIDs: {SubsetIDs}') + print(f'Ns : {Ns}') + print(f'ExpIDs : {ExpIDs}') config_amt = len(configs_structs) try: - if len(configs_structs) == 1: + if config_amt == 1: return single_proc_exec( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, - ExpIDs + ExpIDs, SubsetIDs, SubsetWindows, configured_n ) - elif len(configs_structs) > 1 and config_amt < remote_threshold: + elif config_amt > 1: # and config_amt < remote_threshold: return parallelize_simulations( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, - ExpIDs + ExpIDs, SubsetIDs, SubsetWindows, configured_n ) except ValueError: - print('ValueError: sim_configs\' N must > 0') + raise ValueError("\'sim_configs\' N must > 0") diff --git a/cadCAD/engine/simulation.py b/cadCAD/engine/simulation.py index 3ded8288..b9a57ba8 100644 --- a/cadCAD/engine/simulation.py +++ b/cadCAD/engine/simulation.py @@ -1,14 +1,10 @@ -from pprint import pprint from typing import Any, Callable, Dict, List, Tuple -from pathos.pools import ThreadPool as TPool -from functools import reduce -from types import MappingProxyType from copy import deepcopy from functools import reduce from funcy import curry -from cadCAD.engine.utils import engine_exception from cadCAD.utils import flatten +from cadCAD.engine.utils import engine_exception id_exception: Callable = curry(engine_exception)(KeyError)(KeyError)(None) @@ -26,14 +22,14 @@ def __init__( self.policy_update_exception = policy_update_exception def get_policy_input( - self, - sweep_dict: Dict[str, List[Any]], - sub_step: int, - sL: List[Dict[str, Any]], - s: Dict[str, Any], - funcs: List[Callable], - additional_objs - ) -> Dict[str, Any]: + self, + sweep_dict: Dict[str, List[Any]], + sub_step: int, + sL: List[Dict[str, Any]], + s: Dict[str, Any], + funcs: List[Callable], + additional_objs + ) -> Dict[str, Any]: ops = self.policy_ops @@ -73,11 +69,11 @@ def compose(init_reduction_funct, funct_list, val_list): } def apply_env_proc( - self, - sweep_dict, - env_processes: Dict[str, Callable], - state_dict: Dict[str, Any] - ) -> Dict[str, Any]: + self, + sweep_dict, + env_processes: Dict[str, Callable], + state_dict: Dict[str, Any] + ) -> Dict[str, Any]: def env_composition(target_field, state_dict, target_value): function_type = type(lambda x: x) @@ -105,18 +101,18 @@ def env_composition(target_field, state_dict, target_value): # mech_step def partial_state_update( - self, - sweep_dict: Dict[str, List[Any]], - sub_step: int, - sL, - sH, - state_funcs: List[Callable], - policy_funcs: List[Callable], - env_processes: Dict[str, Callable], - time_step: int, - run: int, - additional_objs - ) -> List[Dict[str, Any]]: + self, + sweep_dict: Dict[str, List[Any]], + sub_step: int, + sL, + sH, + state_funcs: List[Callable], + policy_funcs: List[Callable], + env_processes: Dict[str, Callable], + time_step: int, + run: int, + additional_objs + ) -> List[Dict[str, Any]]: # last_in_obj: Dict[str, Any] = MappingProxyType(sL[-1]) last_in_obj: Dict[str, Any] = deepcopy(sL[-1]) @@ -152,15 +148,15 @@ def transfer_missing_fields(source, destination): # mech_pipeline - state_update_block def state_update_pipeline( - self, - sweep_dict: Dict[str, List[Any]], - simulation_list, - configs: List[Tuple[List[Callable], List[Callable]]], - env_processes: Dict[str, Callable], - time_step: int, - run: int, - additional_objs - ) -> List[Dict[str, Any]]: + self, + sweep_dict: Dict[str, List[Any]], + simulation_list, + configs: List[Tuple[List[Callable], List[Callable]]], + env_processes: Dict[str, Callable], + time_step: int, + run: int, + additional_objs + ) -> List[Dict[str, Any]]: sub_step = 0 states_list_copy: List[Dict[str, Any]] = tuple(simulation_list[-1]) @@ -176,7 +172,8 @@ def state_update_pipeline( sub_step += 1 for [s_conf, p_conf] in configs: states_list: List[Dict[str, Any]] = self.partial_state_update( - sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run, additional_objs + sweep_dict, sub_step, states_list, simulation_list, s_conf, p_conf, env_processes, time_step, run, + additional_objs ) sub_step += 1 @@ -186,15 +183,15 @@ def state_update_pipeline( # state_update_pipeline def run_pipeline( - self, - sweep_dict: Dict[str, List[Any]], - states_list: List[Dict[str, Any]], - configs: List[Tuple[List[Callable], List[Callable]]], - env_processes: Dict[str, Callable], - time_seq: range, - run: int, - additional_objs - ) -> List[List[Dict[str, Any]]]: + self, + sweep_dict: Dict[str, List[Any]], + states_list: List[Dict[str, Any]], + configs: List[Tuple[List[Callable], List[Callable]]], + env_processes: Dict[str, Callable], + time_seq: range, + run: int, + additional_objs + ) -> List[List[Dict[str, Any]]]: time_seq: List[int] = [x + 1 for x in time_seq] simulation_list: List[List[Dict[str, Any]]] = [states_list] @@ -209,39 +206,39 @@ def run_pipeline( return simulation_list def simulation( - self, - sweep_dict: Dict[str, List[Any]], - states_list: List[Dict[str, Any]], - configs, - env_processes: Dict[str, Callable], - time_seq: range, - simulation_id: int, - run: int, - additional_objs=None + self, + sweep_dict: Dict[str, List[Any]], + states_list: List[Dict[str, Any]], + configs, + env_processes: Dict[str, Callable], + time_seq: range, + simulation_id: int, + run: int, + subset_id, + subset_window, + configured_N, + # remote_ind + additional_objs=None ): run += 1 - # print(run) - def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, _run) -> List[Dict[str, Any]]: - # _run += 1 + subset_window.appendleft(subset_id) + latest_subset_id, previous_subset_id = tuple(subset_window) + + if configured_N == 1 and latest_subset_id > previous_subset_id: + run -= 1 - def generate_init_sys_metrics(genesis_states_list, sim_id, _run): - # pprint(genesis_states_list) - # print() - # for d in genesis_states_list.asDict(): + def execute_run(sweep_dict, states_list, configs, env_processes, time_seq, _run) -> List[Dict[str, Any]]: + def generate_init_sys_metrics(genesis_states_list, sim_id, _subset_id, _run, _subset_window): for D in genesis_states_list: d = deepcopy(D) - # d = D - # print(str(sim_id) + ' ' + ' ' + str(_run)) - d['simulation'], d['run'], d['substep'], d['timestep'] = sim_id, _run, 0, 0 - # d['simulation'], d['run'], d['substep'], d['timestep'] = _run, sim_id, 0, 0 - # print('simulation_id') - # print(simulation_id) + d['simulation'], d['subset'], d['run'], d['substep'], d['timestep'] = \ + sim_id, _subset_id, _run, 0, 0 yield d - # print(tuple(states_list)) - states_list_copy: List[Dict[str, Any]] = list(generate_init_sys_metrics(tuple(states_list), simulation_id, run)) - # simulation_id = + 1 + states_list_copy: List[Dict[str, Any]] = list( + generate_init_sys_metrics(tuple(states_list), simulation_id, subset_id, run, subset_window) + ) first_timestep_per_run: List[Dict[str, Any]] = self.run_pipeline( sweep_dict, states_list_copy, configs, env_processes, time_seq, run, additional_objs @@ -250,8 +247,6 @@ def generate_init_sys_metrics(genesis_states_list, sim_id, _run): return first_timestep_per_run - # print('sim_id: ' + str(simulation_id)) - # print('run_id: ' + str(run)) pipe_run = flatten( [execute_run(sweep_dict, states_list, configs, env_processes, time_seq, run)] ) diff --git a/cadCAD/utils/sys_exec.py b/cadCAD/utils/sys_exec.py index 75222134..7c7559df 100644 --- a/cadCAD/utils/sys_exec.py +++ b/cadCAD/utils/sys_exec.py @@ -1,10 +1,14 @@ import warnings +from pprint import pprint from pyspark import RDD, Row from pyspark.sql import DataFrame, SparkSession import pandas as pd # Distributed +from tabulate import tabulate + + def align_type(init_condition: dict): def f(d): for y, x in init_condition.items(): @@ -32,21 +36,27 @@ def to_pandas(rdd: RDD): return pdf_from_rdd -def to_pandas_df(rdd: RDD, init_condition: dict = None): - # Typefull - if init_condition is not None: - return to_spark(rdd, init_condition).toPandas() - # Typeless +def to_pandas_df(rdd: RDD, string_conversion=False, init_condition: dict = None): + if init_condition is not None and string_conversion is False: + # Typefull + return to_spark(rdd=rdd, init_condition=init_condition).toPandas() + elif init_condition is None and string_conversion is True: + # String + return rdd.map(lambda d: Row(**dict([(k, str(v)) for k, v in d.items()]))).toDF() else: + # Typeless return to_pandas(rdd) -def to_spark_df(rdd: RDD, spark: SparkSession, init_condition: dict = None): - # Typefull - if init_condition is not None: +def to_spark_df(rdd: RDD, spark: SparkSession = None, init_condition: dict = None): + if init_condition is not None and spark is not None: + # Typefull return to_spark(rdd, init_condition) - # Typeless + elif spark is None and init_condition is None: + # String + return rdd.map(lambda d: Row(**dict([(k, str(v)) for k, v in d.items()]))).toDF() else: + # Typeless spark.conf.set("spark.sql.execution.arrow.enabled", "true") spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true") warnings.simplefilter(action='ignore', category=UserWarning) diff --git a/dist/cadCAD-0.4.17-py3-none-any.whl b/dist/cadCAD-0.4.17-py3-none-any.whl deleted file mode 100644 index 8ba29842..00000000 Binary files a/dist/cadCAD-0.4.17-py3-none-any.whl and /dev/null differ diff --git a/dist/cadCAD-0.4.17.tar.gz b/dist/cadCAD-0.4.17.tar.gz deleted file mode 100644 index 76a1eabc..00000000 Binary files a/dist/cadCAD-0.4.17.tar.gz and /dev/null differ diff --git a/documentation/Policy_Aggregation.md b/documentation/Policy_Aggregation.md index 296c707b..3b89e394 100644 --- a/documentation/Policy_Aggregation.md +++ b/documentation/Policy_Aggregation.md @@ -46,9 +46,10 @@ def p2_psu3(_params, step, sH, s, **kwargs): #### Aggregate Policies using functions ```python -from cadCAD.configuration import append_configs +from cadCAD.configuration import Experiment -append_configs( +exp = Experiment() +exp.append_configs( sim_configs=???, initial_state=???, partial_state_update_blocks=???, diff --git a/documentation/README.md b/documentation/README.md index 27f146cb..2dde7432 100644 --- a/documentation/README.md +++ b/documentation/README.md @@ -15,9 +15,10 @@ A Simulation Configuration is comprised of a [System Model](#System-Model) and a `append_configs`, stores a **Simulation Configuration** to be [Executed](/JS4Q9oayQASihxHBJzz4Ug) by cadCAD ```python -from cadCAD.configuration import append_configs +from cadCAD.configuration import Experiment -append_configs( +exp = Experiment() +exp.append_configs( user_id = ..., # OPTIONAL: cadCAD Session User ID initial_state = ..., # System Model partial_state_update_blocks = ..., # System Model @@ -38,8 +39,8 @@ Simulation properties are passed to `append_configs` in the `sim_configs` parame use the `config_sim` function in `cadCAD.configuration.utils` ```python -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import config_sim +from cadCAD.configuration import Experiment sim_config_dict = { "N": ..., @@ -49,7 +50,8 @@ sim_config_dict = { c = config_sim(sim_config_dict) -append_configs( +exp = Experiment() +exp.append_configs( ... sim_configs = c # Simulation Properties ) @@ -100,7 +102,7 @@ State Variables are passed to `append_configs` along with its initial values, as are the names of the variables and the `dict_values` are their initial values. ```python -from cadCAD.configuration import append_configs +from cadCAD.configuration import Experiment genesis_states = { 'state_variable_1': 0, @@ -109,7 +111,8 @@ genesis_states = { 'timestamp': '2019-01-01 00:00:00' } -append_configs( +exp = Experiment() +exp.append_configs( initial_state = genesis_states, ... ) @@ -202,6 +205,8 @@ Partial State Update Blocks are passed to `append_configs` as a List of Python ` state update functions and the values are the functions. ```python +from cadCAD.configuration import Experiment + PSUBs = [ { "policies": { @@ -220,12 +225,12 @@ PSUBs = [ {...} #PSUB_M ] -append_configs( +exp = Experiment() +exp.append_configs( ... partial_state_update_blocks = PSUBs, ... ) - ``` #### Substep diff --git a/documentation/examples/example_1.py b/documentation/examples/example_1.py index c019c49a..ffa21b7f 100644 --- a/documentation/examples/example_1.py +++ b/documentation/examples/example_1.py @@ -2,6 +2,7 @@ import pandas as pd from tabulate import tabulate + from cadCAD.engine import ExecutionMode, ExecutionContext, Executor from documentation.examples import sys_model_A, sys_model_B from cadCAD import configs diff --git a/documentation/examples/historical_state_access.py b/documentation/examples/historical_state_access.py index 714d7f40..5ca9015b 100644 --- a/documentation/examples/historical_state_access.py +++ b/documentation/examples/historical_state_access.py @@ -1,11 +1,11 @@ import pandas as pd from tabulate import tabulate -from cadCAD.configuration import append_configs + from cadCAD.configuration.utils import config_sim, access_block from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.configuration import Experiment from cadCAD import configs - policies, variables = {}, {} exclusion_list = ['nonexsistant', 'last_x', '2nd_to_last_x', '3rd_to_last_x', '4th_to_last_x'] @@ -87,7 +87,8 @@ def fourth_to_last_x(_g, substep, sH, s, _input): } ) -append_configs( +exp = Experiment() +exp.append_configs( sim_configs=sim_config, initial_state=genesis_states, partial_state_update_blocks=psubs diff --git a/documentation/examples/param_sweep.py b/documentation/examples/param_sweep.py index f664b4d1..9d10830e 100644 --- a/documentation/examples/param_sweep.py +++ b/documentation/examples/param_sweep.py @@ -4,9 +4,9 @@ import pandas as pd from tabulate import tabulate -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.configuration import Experiment from cadCAD import configs pp = pprint.PrettyPrinter(indent=4) @@ -92,7 +92,8 @@ def sweeped(_params, step, sH, s, _input): pp.pprint(psu_block) print() -append_configs( +exp = Experiment() +exp.append_configs( sim_configs=sim_config, initial_state=genesis_states, env_processes=env_process, diff --git a/documentation/examples/policy_aggregation.py b/documentation/examples/policy_aggregation.py index 6810793c..2a0edb95 100644 --- a/documentation/examples/policy_aggregation.py +++ b/documentation/examples/policy_aggregation.py @@ -1,9 +1,9 @@ import pandas as pd from tabulate import tabulate -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import config_sim from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.configuration import Experiment from cadCAD import configs # Policies per Mechanism @@ -76,7 +76,8 @@ def policies(_g, step, sH, s, _input): } ) -append_configs( +exp = Experiment() +exp.append_configs( sim_configs=sim_config, initial_state=genesis_states, partial_state_update_blocks=psubs, diff --git a/documentation/examples/sys_model_A.py b/documentation/examples/sys_model_A.py index 5c54dbe1..13ade45a 100644 --- a/documentation/examples/sys_model_A.py +++ b/documentation/examples/sys_model_A.py @@ -1,9 +1,8 @@ import numpy as np from datetime import timedelta - -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import bound_norm_random, config_sim, time_step, env_trigger +from cadCAD.configuration import Experiment seeds = { 'z': np.random.RandomState(1), @@ -94,7 +93,6 @@ def update_timestamp(_g, step, sH, s, _input): # Environment Process -# ToDo: Depreciation Waring for env_proc_trigger convention trigger_timestamps = ['2018-10-01 15:16:25', '2018-10-01 15:16:27', '2018-10-01 15:16:29'] env_processes = { "s3": [lambda _g, x: 5], @@ -150,7 +148,8 @@ def update_timestamp(_g, step, sH, s, _input): } ) -append_configs( +exp = Experiment() +exp.append_configs( sim_configs=sim_config, initial_state=genesis_states, env_processes=env_processes, diff --git a/documentation/examples/sys_model_B.py b/documentation/examples/sys_model_B.py index 2c6ad9e2..b9747d58 100644 --- a/documentation/examples/sys_model_B.py +++ b/documentation/examples/sys_model_B.py @@ -1,8 +1,8 @@ import numpy as np from datetime import timedelta -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import bound_norm_random, config_sim, env_trigger, time_step +from cadCAD.configuration import Experiment seeds = { 'z': np.random.RandomState(1), @@ -88,7 +88,6 @@ def update_timestamp(_g, step, sH, s, _input): # Environment Process -# ToDo: Depreciation Waring for env_proc_trigger convention trigger_timestamps = ['2018-10-01 15:16:25', '2018-10-01 15:16:27', '2018-10-01 15:16:29'] env_processes = { "s3": [lambda _g, x: 5], @@ -131,7 +130,6 @@ def update_timestamp(_g, step, sH, s, _input): } ] - sim_config = config_sim( { "N": 2, @@ -139,7 +137,8 @@ def update_timestamp(_g, step, sH, s, _input): } ) -append_configs( +exp = Experiment() +exp.append_configs( sim_configs=sim_config, initial_state=genesis_states, env_processes=env_processes, diff --git a/setup.py b/setup.py index 41ad37b6..decfa13b 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ """ setup(name='cadCAD', - version='0.4.17', + version='0.4.18', description=short_description, long_description=long_description, url='https://github.com/cadCAD-org/cadCAD', diff --git a/simulations/regression_tests/execs/multi_config_dist.py b/simulations/regression_tests/execs/multi_config_dist.py new file mode 100644 index 00000000..9d690430 --- /dev/null +++ b/simulations/regression_tests/execs/multi_config_dist.py @@ -0,0 +1,55 @@ +from pyspark.sql import DataFrame, Row +from tabulate import tabulate +from pprint import pprint +import pandas as pd + +from simulations.regression_tests.models import config1, config2 +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from cadCAD.configuration.utils import configs_as_dataframe #, configs_as_objs, configs_as_dicts +from cadCAD.utils.sys_exec import to_spark_df, to_pandas_df +from cadCAD import configs + +from distroduce.engine.execution import transform, distributed_simulations +from distroduce.session import sc_alt as sc + +exec_mode = ExecutionMode() +distributed_sims = distributed_simulations(transform) + +distributed_ctx = ExecutionContext(context=exec_mode.distributed, method=distributed_sims) +run = Executor(exec_context=distributed_ctx, configs=configs, spark_context=sc) + +raw_result, tensor_fields, sessions = run.execute() + +print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +print() +# pprint(sessions) +# print() + +print("Configuration Data:") +configs_df = configs_as_dataframe(configs) +print(tabulate(configs_df, headers='keys', tablefmt='psql')) +print("Tensor Field:") +print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +print("Output:") + +# RDD: +print() +print("RDD:") +result: list = raw_result.take(5) +pprint(result[:2]) +# to get all results execute the following +# result: list = raw_result.collect() +print() + +print("Spark DataFrame:") +sdf: DataFrame = to_spark_df(raw_result) +# sdf: DataFrame = to_spark_df(raw_result, spark) +sdf.show(5) +print() + +# Pandas: +print() +print("Pandas DataFrame:") +# pdf: pd.DataFrame = to_pandas_df(raw_result, config1.genesis_states) +pdf: pd.DataFrame = to_pandas_df(raw_result) +print(tabulate(pdf.head(), headers='keys', tablefmt='psql')) diff --git a/simulations/regression_tests/execs/multi_config_test.py b/simulations/regression_tests/execs/multi_config_test.py index 810cf8f3..919226f7 100644 --- a/simulations/regression_tests/execs/multi_config_test.py +++ b/simulations/regression_tests/execs/multi_config_test.py @@ -1,6 +1,6 @@ +from tabulate import tabulate from pprint import pprint import pandas as pd -from tabulate import tabulate from cadCAD.engine import ExecutionMode, ExecutionContext, Executor from simulations.regression_tests.models import config1, config2 diff --git a/simulations/regression_tests/execs/param_sweep_dist.py b/simulations/regression_tests/execs/param_sweep_dist.py new file mode 100644 index 00000000..901e8e71 --- /dev/null +++ b/simulations/regression_tests/execs/param_sweep_dist.py @@ -0,0 +1,54 @@ +from pyspark.sql import DataFrame +from tabulate import tabulate +from cadCAD import configs +from pprint import pprint +import pandas as pd + +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from simulations.regression_tests.models import sweep_config +from cadCAD.utils.sys_exec import to_spark_df, to_pandas_df +from cadCAD.configuration.utils import configs_as_dataframe #, configs_as_objs, configs_as_dicts + +from distroduce.engine.execution import transform, distributed_simulations +from distroduce.session import sc_alt as sc +from distroduce.session import spark_alt as spark + + +exec_mode = ExecutionMode() +distributed_sims = distributed_simulations(transform) +distributed_ctx = ExecutionContext(context=exec_mode.distributed, method=distributed_sims) +run = Executor(exec_context=distributed_ctx, configs=configs, spark_context=sc) + +raw_result, tensor_fields, sessions = run.execute() + +print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +pprint(sessions) + +print("Configuration Data:") +configs_df = configs_as_dataframe(configs) +print(tabulate(configs_df, headers='keys', tablefmt='psql')) +print("Tensor Field:") +print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +print("Output:") + +# RDD: +print() +print("RDD:") +result: list = raw_result.take(5) +pprint(result[:2]) +# to get all results execute the following +# result: list = raw_result.collect() +print() + +print("Spark DataFrame:") +sdf: DataFrame = to_spark_df(raw_result, spark, sweep_config.genesis_states) +# sdf: DataFrame = to_spark_df(raw_result, spark) +sdf.show(5) +print() + +# Pandas : +print() +print("Pandas DataFrame:") +pdf: pd.DataFrame = to_pandas_df(raw_result, sweep_config.genesis_states) +# pdf: pd.DataFrame = to_pandas_df(raw_result) +print(tabulate(pdf.head(), headers='keys', tablefmt='psql')) diff --git a/simulations/regression_tests/execs/param_sweep_test2.py b/simulations/regression_tests/execs/param_sweep_test2.py deleted file mode 100644 index 0b1d2cd3..00000000 --- a/simulations/regression_tests/execs/param_sweep_test2.py +++ /dev/null @@ -1,50 +0,0 @@ -from pprint import pprint - -import pandas as pd -from tabulate import tabulate -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from cadCAD.configuration.utils import configs_as_objs -from simulations.regression_tests.models import sweep_config -from cadCAD import configs - -exec_mode = ExecutionMode() - -local_proc_ctx = ExecutionContext(context=exec_mode.local_mode) -run = Executor(exec_context=local_proc_ctx, configs=configs) -# -raw_result, tensor_fields, sessions = run.execute() -result = pd.DataFrame(raw_result) -print(result.head(10)) -# print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) -# pprint(sessions) -# print(tabulate(result.head(), headers='keys', tablefmt='psql')) - -a = configs -b = configs_as_objs(configs) - -for config in configs: - print() - # pprint(config.__dict__) - print('simulation_id: '+ str(config.__dict__['simulation_id'])) - print('run_id: '+ str(config.__dict__['run_id'])) - print('N: ' + str(config.__dict__['sim_config']['N'])) - # print('M: ' + str(config.__dict__['sim_config']['M'])) - # print('sim_config:') - # pprint(config.__dict__['sim_config']) - # print() - -# pprint(a) -# print() -# print() -# pprint(b[0].sim_config) -# pprint(b[0]['sim_config']) - -configs -configs = configs_as_objs(configs) - - -# {'run_id': 0, -# 'session_id': 'cadCAD_user=0_0', -# 'simulation_id': 0, -# 'user_id': 'cadCAD_user' -# } \ No newline at end of file diff --git a/simulations/regression_tests/execs/policy_agg_dist.py b/simulations/regression_tests/execs/policy_agg_dist.py new file mode 100644 index 00000000..2c2060b5 --- /dev/null +++ b/simulations/regression_tests/execs/policy_agg_dist.py @@ -0,0 +1,54 @@ +from pyspark.sql import DataFrame +from tabulate import tabulate +from cadCAD import configs +from pprint import pprint +import pandas as pd + +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from simulations.regression_tests.models import policy_aggregation +from cadCAD.utils.sys_exec import to_spark_df, to_pandas_df +from cadCAD.configuration.utils import configs_as_dataframe #, configs_as_objs, configs_as_dicts + +from distroduce.engine.execution import transform, distributed_simulations +from distroduce.session import sc_alt as sc +from distroduce.session import spark_alt as spark + +exec_mode = ExecutionMode() +distributed_sims = distributed_simulations(transform) + +distributed_ctx = ExecutionContext(context=exec_mode.distributed, method=distributed_sims) +run = Executor(exec_context=distributed_ctx, configs=configs, spark_context=sc) + +raw_result, tensor_fields, sessions = run.execute() +print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +pprint(sessions) + +print("Configuration Data:") +configs_df = configs_as_dataframe(configs) +print(tabulate(configs_df, headers='keys', tablefmt='psql')) +print("Tensor Field:") +print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +print("Output:") + +# RDD: +print() +print("RDD:") +result: list = raw_result.take(5) +pprint(result[:2]) +# to get all results execute the following +# result: list = raw_result.collect() +print() + +print("Spark DataFrame:") +sdf: DataFrame = to_spark_df(raw_result, spark, policy_aggregation.genesis_states) +# sdf: DataFrame = to_spark_df(raw_result, spark) +sdf.show(5) +print() + +# Pandas : +print() +print("Pandas DataFrame:") +pdf: pd.DataFrame = to_pandas_df(raw_result, policy_aggregation.genesis_states) +# pdf: pd.DataFrame = to_pandas_df(raw_result) +print(tabulate(pdf.head(), headers='keys', tablefmt='psql')) + diff --git a/simulations/regression_tests/experiments/__init__.py b/simulations/regression_tests/experiments/__init__.py index 947ce42d..6d4cf830 100644 --- a/simulations/regression_tests/experiments/__init__.py +++ b/simulations/regression_tests/experiments/__init__.py @@ -1,3 +1,10 @@ from cadCAD.configuration import Experiment -exp_a = Experiment() \ No newline at end of file +config1_exp = Experiment() +config2_exp = Experiment() +ext_ds_exp = Experiment() +hist_exp = Experiment() +policy_exp = Experiment() +sweep_exp = Experiment() +udo1_exp = Experiment() +udo2_exp = Experiment() diff --git a/simulations/regression_tests/models/config1.py b/simulations/regression_tests/models/config1.py index bfc80651..80ddc607 100644 --- a/simulations/regression_tests/models/config1.py +++ b/simulations/regression_tests/models/config1.py @@ -2,9 +2,8 @@ from datetime import timedelta from cadCAD import configs -# from cadCAD.configuration import append_configs from cadCAD.configuration.utils import bound_norm_random, config_sim, time_step, env_trigger -from simulations.regression_tests.experiments import exp_a +from simulations.regression_tests.experiments import config1_exp seeds = { 'z': np.random.RandomState(1), @@ -95,7 +94,6 @@ def update_timestamp(_g, step, sL, s, _input, **kwargs): # Environment Process -# ToDo: Depreciation Waring for env_proc_trigger convention trigger_timestamps = ['2018-10-01 15:16:25', '2018-10-01 15:16:27', '2018-10-01 15:16:29'] env_processes = { "s3": [lambda _g, x: 5], @@ -149,8 +147,7 @@ def update_timestamp(_g, step, sL, s, _input, **kwargs): } sim_config = config_sim(sim_config_dict) -# exp_a = Experiment() -exp_a.append_configs( +config1_exp.append_configs( config_list=configs, user_id='user_a', sim_configs=sim_config, diff --git a/simulations/regression_tests/models/config2.py b/simulations/regression_tests/models/config2.py index 2f66794e..392734c4 100644 --- a/simulations/regression_tests/models/config2.py +++ b/simulations/regression_tests/models/config2.py @@ -2,9 +2,8 @@ from datetime import timedelta from cadCAD import configs -# from cadCAD.configuration import append_configs from cadCAD.configuration.utils import bound_norm_random, config_sim, env_trigger, time_step -from simulations.regression_tests.experiments import exp_a +from simulations.regression_tests.experiments import config2_exp seeds = { 'z': np.random.RandomState(1), @@ -90,7 +89,6 @@ def update_timestamp(_g, step, sL, s, _input, **kwargs): # Environment Process -# ToDo: Depreciation Waring for env_proc_trigger convention trigger_timestamps = ['2018-10-01 15:16:25', '2018-10-01 15:16:27', '2018-10-01 15:16:29'] env_processes = { "s3": [lambda _g, x: 5], @@ -140,8 +138,7 @@ def update_timestamp(_g, step, sL, s, _input, **kwargs): sim_config = config_sim(sim_config_dict) -# exp_a = Experiment() -exp_a.append_configs( +config2_exp.append_configs( config_list=configs, user_id='user_b', sim_configs=sim_config, diff --git a/simulations/regression_tests/models/external_dataset.py b/simulations/regression_tests/models/external_dataset.py index a1efc574..4458c084 100644 --- a/simulations/regression_tests/models/external_dataset.py +++ b/simulations/regression_tests/models/external_dataset.py @@ -1,7 +1,7 @@ -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import config_sim import pandas as pd from cadCAD.utils import SilentDF +from simulations.regression_tests.experiments import ext_ds_exp df = SilentDF(pd.read_csv('simulations/external_data/output.csv')) @@ -21,7 +21,6 @@ def p2(_g, substep, sL, s, **kwargs): del result_dict["ds1"], result_dict["ds2"] return {k: list(v.values()).pop() for k, v in result_dict.items()} -# ToDo: SilentDF(df) wont work #integrate_ext_dataset def integrate_ext_dataset(_g, step, sL, s, _input, **kwargs): result_dict = query(s, df).to_dict() @@ -59,7 +58,7 @@ def view_policies(_g, step, sL, s, _input, **kwargs): "T": range(4) }) -append_configs( +ext_ds_exp.append_configs( sim_configs=sim_config, initial_state=state_dict, partial_state_update_blocks=partial_state_update_blocks, diff --git a/simulations/regression_tests/models/historical_state_access.py b/simulations/regression_tests/models/historical_state_access.py index 20987880..c8c3a604 100644 --- a/simulations/regression_tests/models/historical_state_access.py +++ b/simulations/regression_tests/models/historical_state_access.py @@ -1,5 +1,5 @@ -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import config_sim, access_block +from simulations.regression_tests.experiments import hist_exp policies, variables = {}, {} exclusion_list = ['nonexsistant', 'last_x', '2nd_to_last_x', '3rd_to_last_x', '4th_to_last_x'] @@ -84,7 +84,7 @@ def fourth_to_last_x(_g, substep, sH, s, _input, **kwargs): ) -append_configs( +hist_exp.append_configs( sim_configs=sim_config, initial_state=genesis_states, partial_state_update_blocks=partial_state_update_block diff --git a/simulations/regression_tests/models/param_sweep.py b/simulations/regression_tests/models/param_sweep.py new file mode 100644 index 00000000..ed64e142 --- /dev/null +++ b/simulations/regression_tests/models/param_sweep.py @@ -0,0 +1,94 @@ +import pprint +from typing import Dict, List + +# from cadCAD.configuration import append_configs +from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list +from testing.experiments import exp_param_sweep + +pp = pprint.PrettyPrinter(indent=4) + +def some_function(x): + return x + +# Optional +# dict must contain lists opf 2 distinct lengths +g: Dict[str, List[int]] = { + 'alpha': [1], + 'beta': [2, some_function], + 'gamma': [3, 4], + # 'beta': [1], + # 'gamma': [4], + 'omega': [7] +} + +psu_steps = ['m1', 'm2', 'm3'] +system_substeps = len(psu_steps) +var_timestep_trigger = var_substep_trigger([0, system_substeps]) +env_timestep_trigger = env_trigger(system_substeps) +env_process = {} + + +# ['s1', 's2', 's3', 's4'] +# Policies per Mechanism +def gamma(_g, step, sL, s, **kwargs): + return {'gamma': _g['gamma']} + + +def omega(_g, step, sL, s, **kwargs): + return {'omega': _g['omega']} + + +# Internal States per Mechanism +def alpha(_g, step, sL, s, _input, **kwargs): + return 'alpha', _g['alpha'] + + +def beta(_g, step, sL, s, _input, **kwargs): + return 'beta', _g['beta'] + + +def policies(_g, step, sL, s, _input, **kwargs): + return 'policies', _input + + +def sweeped(_g, step, sL, s, _input, **kwargs): + return 'sweeped', {'beta': _g['beta'], 'gamma': _g['gamma']} + +psu_block = {k: {"policies": {}, "variables": {}} for k in psu_steps} +for m in psu_steps: + psu_block[m]['policies']['gamma'] = gamma + psu_block[m]['policies']['omega'] = omega + psu_block[m]["variables"]['alpha'] = alpha + psu_block[m]["variables"]['beta'] = beta + psu_block[m]['variables']['policies'] = policies + psu_block[m]["variables"]['sweeped'] = var_timestep_trigger(y='sweeped', f=sweeped) + + +# Genesis States +genesis_states = { + 'alpha': 0, + 'beta': 0, + 'policies': {}, + 'sweeped': {} +} + +# Environment Process +env_process['sweeped'] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[5], funct_list=[lambda _g, x: _g['beta']]) + + +sim_config = config_sim( + { + "N": 2, + "T": range(2), + "M": g, # Optional + } +) + +# New Convention +partial_state_update_blocks = psub_list(psu_block, psu_steps) +exp_param_sweep.append_configs( + sim_configs=sim_config, + initial_state=genesis_states, + env_processes=env_process, + partial_state_update_blocks=partial_state_update_blocks +) diff --git a/simulations/regression_tests/models/policy_aggregation.py b/simulations/regression_tests/models/policy_aggregation.py index e3035f9a..d499175f 100644 --- a/simulations/regression_tests/models/policy_aggregation.py +++ b/simulations/regression_tests/models/policy_aggregation.py @@ -1,8 +1,8 @@ -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import config_sim - # Policies per Mechanism +from simulations.regression_tests.experiments import policy_exp + def p1m1(_g, step, sL, s, **kwargs): return {'policy1': 1} def p2m1(_g, step, sL, s, **kwargs): @@ -74,10 +74,9 @@ def policies(_g, step, sH, s, _input, **kwargs): # Aggregation == Reduce Map / Reduce Map Aggregation # using env functions (include in reg test using / for env proc) -append_configs( +policy_exp.append_configs( sim_configs=sim_config, initial_state=genesis_states, partial_state_update_blocks=partial_state_update_block, - # ToDo: subsequent functions should include policy dict for access to each policy (i.e shouldnt be a map) - policy_ops=[lambda a, b: a + b, lambda y: y * 2] # Default: lambda a, b: a + b ToDO: reduction function requires high lvl explanation -) \ No newline at end of file + policy_ops=[lambda a, b: a + b, lambda y: y * 2] # Default: lambda a, b: a + b +) diff --git a/simulations/regression_tests/models/sweep_config.py b/simulations/regression_tests/models/sweep_config.py index 35ebf793..38306bc4 100644 --- a/simulations/regression_tests/models/sweep_config.py +++ b/simulations/regression_tests/models/sweep_config.py @@ -2,11 +2,12 @@ from datetime import timedelta import pprint -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, time_step, psub_list from typing import Dict, List +from simulations.regression_tests.experiments import sweep_exp + pp = pprint.PrettyPrinter(indent=4) seeds = { @@ -110,12 +111,6 @@ def es4(_g, step, sL, s, _input, **kwargs): for m in ['m1','m2','m3']: psu_block[m]["variables"]['s4'] = var_timestep_trigger(y='s4', f=es4) - -# ToDo: The number of values entered in sweep should be the # of config objs created, -# not dependent on the # of times the sweep is applied -# sweep exo_state func and point to exo-state in every other funtion -# param sweep on genesis states - # Genesis States genesis_states = { 's1': 0.0, @@ -127,7 +122,6 @@ def es4(_g, step, sL, s, _input, **kwargs): # Environment Process -# ToDo: Validate - make env proc trigger field agnostic env_process["s3"] = [lambda _g, x: _g['beta'], lambda _g, x: x + 1] env_process["s4"] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[5], funct_list=[lambda _g, x: _g['beta']]) @@ -135,7 +129,7 @@ def es4(_g, step, sL, s, _input, **kwargs): # config_sim Necessary sim_config = config_sim( { - "N": 3, + "N": 1, "T": range(2), "M": g, # Optional } @@ -143,7 +137,7 @@ def es4(_g, step, sL, s, _input, **kwargs): # New Convention partial_state_update_blocks = psub_list(psu_block, psu_steps) -append_configs( +sweep_exp.append_configs( # user_id='user_a', sim_configs=sim_config, initial_state=genesis_states, @@ -153,8 +147,8 @@ def es4(_g, step, sL, s, _input, **kwargs): ) -print() -print("Policie State Update Block:") -pp.pprint(partial_state_update_blocks) -print() -print() +# print() +# print("Partial State Update Block:") +# pp.pprint(partial_state_update_blocks) +# print() +# print() diff --git a/simulations/regression_tests/models/tests.py b/simulations/regression_tests/models/tests.py deleted file mode 100644 index 424eca78..00000000 --- a/simulations/regression_tests/models/tests.py +++ /dev/null @@ -1,35 +0,0 @@ -import unittest - -import pandas as pd -# from tabulate import tabulate -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from cadCAD import configs - -exec_mode = ExecutionMode() -first_config = configs # only contains config1 -single_proc_ctx = ExecutionContext(context=exec_mode.single_mode) -run = Executor(exec_context=single_proc_ctx, configs=first_config) -raw_result, tensor_field = run.execute() -result = pd.DataFrame(raw_result) - -class TestStringMethods(unittest.TestCase): - def __init__(self, result: pd.DataFrame, tensor_field: pd.DataFrame) -> None: - self.result = result - self.tensor_field = tensor_field - - def test_upper(self): - self.assertEqual('foo'.upper(), 'FOO') - - def test_isupper(self): - self.assertTrue('FOO'.isupper()) - self.assertFalse('Foo'.isupper()) - - def test_split(self): - s = 'hello world' - self.assertEqual(s.split(), ['hello', 'world']) - # check that s.split fails when the separator is not a string - with self.assertRaises(TypeError): - s.split(2) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/simulations/regression_tests/models/udo.py b/simulations/regression_tests/models/udo.py index 44b8bc85..4fe1b5b5 100644 --- a/simulations/regression_tests/models/udo.py +++ b/simulations/regression_tests/models/udo.py @@ -1,13 +1,11 @@ import pandas as pd -# from fn.func import curried from datetime import timedelta import pprint as pp -from cadCAD.utils import SilentDF #, val_switch -from cadCAD.configuration import append_configs +from cadCAD.utils import SilentDF from cadCAD.configuration.utils import time_step, config_sim, var_trigger, var_substep_trigger, env_trigger, psub_list from cadCAD.configuration.utils.userDefinedObject import udoPipe, UDO - +from simulations.regression_tests.experiments import udo1_exp DF = SilentDF(pd.read_csv('simulations/external_data/output.csv')) @@ -44,8 +42,6 @@ def read(self, ds_uri): def write(self, ds_uri): pd.to_csv(ds_uri) - # ToDo: Generic update function - pass @@ -59,7 +55,6 @@ def write(self, ds_uri): "T": range(4) }) -# ToDo: DataFrame Column order state_dict = { 'increment': 0, 'state_udo': state_udo, 'state_udo_tracker': 0, @@ -166,10 +161,9 @@ def update_timestamp(_g, step, sL, s, _input, **kwargs): # ) # psu_block[m]["variables"]['timestamp'] = update_timestamp -# ToDo: Bug without specifying parameters # New Convention partial_state_update_blocks = psub_list(psu_block, psu_steps) -append_configs( +udo1_exp.append_configs( sim_configs=sim_config, initial_state=state_dict, partial_state_update_blocks=partial_state_update_blocks diff --git a/simulations/regression_tests/models/udo_inter_substep_update.py b/simulations/regression_tests/models/udo_inter_substep_update.py index e18af0de..583f2386 100644 --- a/simulations/regression_tests/models/udo_inter_substep_update.py +++ b/simulations/regression_tests/models/udo_inter_substep_update.py @@ -4,10 +4,9 @@ from datetime import timedelta from cadCAD.utils import SilentDF #, val_switch -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import time_step, config_sim from cadCAD.configuration.utils.userDefinedObject import udoPipe, UDO - +from simulations.regression_tests.experiments import udo1_exp DF = SilentDF(pd.read_csv('simulations/external_data/output.csv')) @@ -43,7 +42,6 @@ def write(self, ds_uri): pass # can be accessed after an update within the same substep and timestep - state_udo = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception']) policy_udoA = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception']) policy_udoB = UDO(udo=udoExample(0, DF), masked_members=['obj', 'perception']) @@ -59,7 +57,6 @@ def udo_policyB(_g, step, sL, s, **kwargs): policies = {"p1": udo_policyA, "p2": udo_policyB} -# ToDo: DataFrame Column order state_dict = { 'increment': 0, 'state_udo': state_udo, 'state_udo_tracker_a': 0, 'state_udo_tracker_b': 0, @@ -152,8 +149,7 @@ def apply_incriment_condition(s): "T": range(4) }) -# ToDo: Bug without specifying parameters -append_configs( +udo1_exp.append_configs( sim_configs=sim_config, initial_state=state_dict, seeds={}, diff --git a/testing/experiments/__init__.py b/testing/experiments/__init__.py index a78e6840..fdfe3766 100644 --- a/testing/experiments/__init__.py +++ b/testing/experiments/__init__.py @@ -1,3 +1,4 @@ from cadCAD.configuration import Experiment -exp_param_sweep = Experiment() \ No newline at end of file +exp_param_sweep = Experiment() +exp_policy_agg = Experiment() \ No newline at end of file diff --git a/testing/generic_test.py b/testing/generic_test.py index 7e6e7e56..e3d281b6 100644 --- a/testing/generic_test.py +++ b/testing/generic_test.py @@ -21,7 +21,7 @@ def wrapped_eval(a, b): df[test_name] = df.apply( lambda x: wrapped_eval( x.filter(items=target_cols).to_dict(), - expected_results[(x['simulation'], x['run'], x['timestep'], x['substep'])] + expected_results[(x['subset'], x['run'], x['timestep'], x['substep'])] ), axis=1 ) @@ -39,7 +39,8 @@ def generic_test(self, tested_df, expected_reults, test_name): if erroneous.empty is False: # Or Entire df IS NOT erroneous for index, row in erroneous.iterrows(): - expected = expected_reults[(row['simulation'], row['run'], row['timestep'], row['substep'])] + # expected = expected_reults[(row['simulation'], row['run'], row['timestep'], row['substep'])] + expected = expected_reults[(row['subset'], row['run'], row['timestep'], row['substep'])] unexpected = {f"invalid_{k}": expected[k] for k in expected if k in row and expected[k] != row[k]} for key in unexpected.keys(): diff --git a/testing/models/param_sweep.py b/testing/models/param_sweep.py index b4c1c31c..ed64e142 100644 --- a/testing/models/param_sweep.py +++ b/testing/models/param_sweep.py @@ -16,6 +16,8 @@ def some_function(x): 'alpha': [1], 'beta': [2, some_function], 'gamma': [3, 4], + # 'beta': [1], + # 'gamma': [4], 'omega': [7] } @@ -76,7 +78,7 @@ def sweeped(_g, step, sL, s, _input, **kwargs): sim_config = config_sim( { - "N": 1, + "N": 2, "T": range(2), "M": g, # Optional } @@ -90,10 +92,3 @@ def sweeped(_g, step, sL, s, _input, **kwargs): env_processes=env_process, partial_state_update_blocks=partial_state_update_blocks ) - - -print() -print("Policie State Update Block:") -pp.pprint(partial_state_update_blocks) -print() -print() diff --git a/testing/models/policy_aggregation.py b/testing/models/policy_aggregation.py index 48400e29..2e2277c5 100644 --- a/testing/models/policy_aggregation.py +++ b/testing/models/policy_aggregation.py @@ -1,8 +1,10 @@ -from cadCAD.configuration import append_configs from cadCAD.configuration.utils import config_sim # Policies per Mechanism +from testing.experiments import exp_policy_agg + + def p1m1(_g, step, sL, s, **kwargs): return {'policy1': 1} def p2m1(_g, step, sL, s, **kwargs): @@ -73,7 +75,7 @@ def policies(_g, step, sH, s, _input, **kwargs): ) -append_configs( +exp_policy_agg.append_configs( sim_configs=sim_config, initial_state=genesis_states, partial_state_update_blocks=partial_state_update_block, diff --git a/testing/tests/out_check.py b/testing/tests/out_check.py index 33c5e3ef..99d5fedb 100644 --- a/testing/tests/out_check.py +++ b/testing/tests/out_check.py @@ -10,10 +10,19 @@ exec_mode = ExecutionMode() exec_ctx = ExecutionContext(context=exec_mode.local_mode) +# exec_ctx = ExecutionContext(context=exec_mode.multi_proc) run = Executor(exec_context=exec_ctx, configs=configs) raw_result, tensor_fields, sessions = run.execute() result = pd.DataFrame(raw_result) -print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) -pprint(sessions) +# print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +# pprint(sessions) print(tabulate(result, headers='keys', tablefmt='psql')) + +print() + +raw_result, tensor_fields, sessions = run.execute() +result = pd.DataFrame(raw_result) +# print(tabulate(tensor_fields[0], headers='keys', tablefmt='psql')) +# pprint(sessions) +print(tabulate(result, headers='keys', tablefmt='psql')) \ No newline at end of file diff --git a/testing/tests/param_sweep.py b/testing/tests/param_sweep.py deleted file mode 100644 index 61de0fde..00000000 --- a/testing/tests/param_sweep.py +++ /dev/null @@ -1,86 +0,0 @@ -import unittest -import pandas as pd - -from cadCAD.engine import ExecutionMode, ExecutionContext, Executor -from testing.models import param_sweep -from cadCAD import configs - -from testing.generic_test import make_generic_test -from testing.models.param_sweep import some_function, g as sweep_params - - -exec_mode = ExecutionMode() -exec_ctx = ExecutionContext(context=exec_mode.multi_mode) -run = Executor(exec_context=exec_ctx, configs=configs) - -# sim, run, substep, timestep -def get_expected_results(sim, run, beta, gamma): - return { - (sim, run, 0, 0): {'policies': {}, 'sweeped': {}, 'alpha': 0, 'beta': 0}, - (sim, run, 1, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 1, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 1, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 2, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 2, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 2, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 3, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 3, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 3, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 4, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 4, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 4, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, - (sim, run, 5, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': beta, 'alpha': 1, 'beta': beta}, - (sim, run, 5, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': beta, 'alpha': 1, 'beta': beta}, - (sim, run, 5, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': beta, 'alpha': 1, 'beta': beta} - } - - -def generate_expected(sweep_params): - def template(sweep_params): - sim_count = max(len(x) for x in list(sweep_params.values())) - expected_results, expected_results_1, expected_results_2 = {}, {}, {} - for sim in range(sim_count): - expected_results_1a = get_expected_results(sim, 1, 2, 3) - expected_results_1b = get_expected_results(sim, 2, 2, 3) - expected_results_1.update(expected_results_1a) - expected_results_1.update(expected_results_1b) - - expected_results_2 = {} - expected_results_2a = get_expected_results(sim, 1, some_function, 4) - expected_results_2b = get_expected_results(sim, 2, some_function, 4) - expected_results_2.update(expected_results_2a) - expected_results_2.update(expected_results_2b) - - expected_results.update(expected_results_1) - expected_results.update(expected_results_2) - - yield expected_results - - merged_expected = list(template(sweep_params)) - result = {} - for d in merged_expected: - result.update(d) - - return result - - -def row(a, b): - return a == b - - -def create_test_params(feature, fields): - raw_result, tensor_fields, sessions = run.execute() - df = pd.DataFrame(raw_result) - expected = generate_expected(sweep_params) - return [[feature, df, expected, fields, [row]]] - - -params = list(create_test_params("param_sweep", ['alpha', 'beta', 'policies', 'sweeped'])) - - -class GenericTest(make_generic_test(params)): - pass - - -if __name__ == '__main__': - unittest.main() diff --git a/testing/tests/param_sweep_test.py b/testing/tests/param_sweep_test.py new file mode 100644 index 00000000..f908198e --- /dev/null +++ b/testing/tests/param_sweep_test.py @@ -0,0 +1,88 @@ +import unittest +from pprint import pprint + +import pandas as pd + +from cadCAD.engine import ExecutionMode, ExecutionContext, Executor +from testing.models import param_sweep +from cadCAD import configs + +from testing.generic_test import make_generic_test +from testing.models.param_sweep import some_function, g as sweep_params + + +exec_mode = ExecutionMode() +exec_ctx = ExecutionContext(context=exec_mode.multi_mode) +run = Executor(exec_context=exec_ctx, configs=configs) + +# sim, run, substep, timestep +def get_expected_results(subset, run, beta, gamma): + return { + (subset, run, 0, 0): {'policies': {}, 'sweeped': {}, 'alpha': 0, 'beta': 0}, + (subset, run, 1, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 1, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 1, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 2, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 2, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 2, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 3, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 3, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 3, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 4, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 4, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 4, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': {'beta': beta, 'gamma': gamma}, 'alpha': 1, 'beta': beta}, + (subset, run, 5, 1): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': beta, 'alpha': 1, 'beta': beta}, + (subset, run, 5, 2): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': beta, 'alpha': 1, 'beta': beta}, + (subset, run, 5, 3): {'policies': {'gamma': gamma, 'omega': 7}, 'sweeped': beta, 'alpha': 1, 'beta': beta} + } + + +def generate_expected(sweep_params): + def template(sweep_params): + subset_count = max(len(x) for x in list(sweep_params.values())) + expected_results, expected_results_1, expected_results_2 = {}, {}, {} + for subset in range(subset_count): + expected_results_1a = get_expected_results(subset, 1, 2, 3) + expected_results_1b = get_expected_results(subset, 2, 2, 3) + expected_results_1.update(expected_results_1a) + expected_results_1.update(expected_results_1b) + + expected_results_2 = {} + expected_results_2a = get_expected_results(subset, 1, some_function, 4) + expected_results_2b = get_expected_results(subset, 2, some_function, 4) + expected_results_2.update(expected_results_2a) + expected_results_2.update(expected_results_2b) + + expected_results.update(expected_results_1) + expected_results.update(expected_results_2) + + yield expected_results + + merged_expected = list(template(sweep_params)) + result = {} + for d in merged_expected: + result.update(d) + + return result + + +def row(a, b): + return a == b + + +def create_test_params(feature, fields): + raw_result, tensor_fields, sessions = run.execute() + df = pd.DataFrame(raw_result) + expected = generate_expected(sweep_params) + return [[feature, df, expected, fields, [row]]] + + +params = list(create_test_params("param_sweep", ['alpha', 'beta', 'policies', 'sweeped'])) + + +class GenericTest(make_generic_test(params)): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/testing/tests/policy_aggregation.py b/testing/tests/policy_aggregation_test.py similarity index 100% rename from testing/tests/policy_aggregation.py rename to testing/tests/policy_aggregation_test.py