From c11365c7e6f7e861aea3f3b3f016dc70b4f5948d Mon Sep 17 00:00:00 2001 From: Megan <113795130+YUUU23@users.noreply.github.com> Date: Wed, 12 Jun 2024 08:57:56 -0500 Subject: [PATCH] Iss632 after rebase on future (#721) * refact branch rebase off of future; introduce custom error (UnparallelizableError and AdjLineNoteImplemented Error) caught before general errors, introduce custom error to ast_to_ir and ir in compiler at appropriate places with more detail error messages Signed-off-by: YUUU23 * refactor: import custom error to ast_to_ir, raise unparallelizable err in pash_compiler Signed-off-by: YUUU23 * refactor: put all expansion custom error from the sh_expand library (expand.py file) under one ExpansionError class in custom_error to catch and log these errors separately Signed-off-by: YUUU23 * fix: remove duplicated ExpansionError in excepts (compile_ir) Signed-off-by: YUUU23 * refactor: import ExpansionError (ExpansionError class initiated within expand package Signed-off-by: YUUU23 * delete: remove expand.py changes as it will be changed in the original package Signed-off-by: YUUU23 * fix: import expansion error Signed-off-by: YUUU23 --------- Signed-off-by: YUUU23 --- compiler/ast_to_ir.py | 6 ++++-- compiler/custom_error.py | 5 +++++ compiler/ir.py | 15 ++++++++------- compiler/pash_compiler.py | 14 ++++++++++++-- 4 files changed, 29 insertions(+), 11 deletions(-) create mode 100644 compiler/custom_error.py diff --git a/compiler/ast_to_ir.py b/compiler/ast_to_ir.py index 8d6f755a4..c1e753fa3 100644 --- a/compiler/ast_to_ir.py +++ b/compiler/ast_to_ir.py @@ -8,6 +8,8 @@ from util import * from parse import from_ast_objects_to_shell +from custom_error import * + ## TODO: Separate the ir stuff to the bare minimum and ## try to move this to the shell_ast folder. @@ -159,7 +161,7 @@ def combine_pipe(ast_nodes): else: ## If any part of the pipe is not an IR, the compilation must fail. log("Node: {} is not pure".format(ast_nodes[0])) - raise Exception("Not pure node in pipe") + raise UnparallelizableError("Node: {} is not a pure node in pipe".format(ast_nodes[0])) ## Combine the rest of the nodes for ast_node in ast_nodes[1:]: @@ -168,7 +170,7 @@ def combine_pipe(ast_nodes): else: ## If any part of the pipe is not an IR, the compilation must fail. log("Node: {} is not pure".format(ast_nodes)) - raise Exception("Not pure node in pipe") + raise UnparallelizableError("This specific node: {} is not a pure node in pipe".format(ast_node)) return [combined_nodes] diff --git a/compiler/custom_error.py b/compiler/custom_error.py new file mode 100644 index 000000000..eedb6f738 --- /dev/null +++ b/compiler/custom_error.py @@ -0,0 +1,5 @@ +class UnparallelizableError(Exception): + pass + +class AdjLineNotImplementedError(Exception): + pass \ No newline at end of file diff --git a/compiler/ir.py b/compiler/ir.py index c1534494a..b7319cc23 100644 --- a/compiler/ir.py +++ b/compiler/ir.py @@ -38,6 +38,7 @@ from shell_ast.ast_util import * from util import * +from custom_error import * import config @@ -242,11 +243,11 @@ def compile_command_to_DFG(fileIdGen, command, options, redirections=None): command_invocation ) if io_info is None: - raise Exception( + raise UnparallelizableError( f"InputOutputInformation for {format_arg_chars(command)} not provided so considered side-effectful." ) if io_info.has_other_outputs(): - raise Exception( + raise UnparallelizableError( f"Command {format_arg_chars(command)} has outputs other than streaming." ) para_info: ParallelizabilityInfo = ( @@ -840,7 +841,7 @@ def apply_parallelization_to_node( node_id, parallelizer, fileIdGen, fan_out ) else: - raise Exception("Splitter not yet implemented") + raise UnparallelizableError("Splitter not yet implemented for command: {}".format(self.get_node(node_id=node_id).cmd_invocation_with_io_vars.cmd_name)) def apply_round_robin_parallelization_to_node( self, node_id, parallelizer, fileIdGen, fan_out, r_split_batch_size @@ -849,11 +850,11 @@ def apply_round_robin_parallelization_to_node( # currently, this cannot be done since splitter etc. would be added... aggregator_spec = parallelizer.get_aggregator_spec() if aggregator_spec.is_aggregator_spec_adj_lines_merge(): - raise Exception("adj_lines_merge not yet implemented in PaSh") + raise AdjLineNotImplementedError("adj_lines_merge not yet implemented in PaSh") elif aggregator_spec.is_aggregator_spec_adj_lines_seq(): - raise Exception("adj_lines_seq not yet implemented in PaSh") + raise AdjLineNotImplementedError("adj_lines_seq not yet implemented in PaSh") elif aggregator_spec.is_aggregator_spec_adj_lines_func(): - raise Exception("adj_lines_func not yet implemented in PaSh") + raise AdjLineNotImplementedError("adj_lines_func not yet implemented in PaSh") # END of what to move node = self.get_node(node_id) @@ -1192,7 +1193,7 @@ def introduce_aggregators_for_consec_chunks( fileIdGen, ) else: - raise Exception("aggregator kind not yet implemented") + raise UnparallelizableError("aggregator kind not yet implemented for command: {}".format(original_cmd_invocation_with_io_vars.cmd_name)) else: # we got auxiliary information assert parallelizer.core_aggregator_spec.is_aggregator_spec_custom_2_ary() map_in_aggregator_ids = in_aggregator_ids diff --git a/compiler/pash_compiler.py b/compiler/pash_compiler.py index c4fc7282e..a949457ba 100644 --- a/compiler/pash_compiler.py +++ b/compiler/pash_compiler.py @@ -4,6 +4,7 @@ from datetime import datetime from sh_expand import env_vars_util +from sh_expand.expand import ExpansionError import config from ir import * @@ -11,6 +12,7 @@ from ir_to_ast import to_shell from pash_graphviz import maybe_generate_graphviz from util import * +from custom_error import * from definitions.ir.aggregator_node import * @@ -92,9 +94,17 @@ def compile_ir(ir_filename, compiled_script_file, args, compiler_config): ret = compile_optimize_output_script( ir_filename, compiled_script_file, args, compiler_config ) + except ExpansionError as e: + log("WARNING: Exception caught because some region(s) are not expandable and therefore unparallelizable:", e) + except UnparallelizableError as e: + log("WARNING: Exception caught because some region(s) are unparallelizable:", e) + # log(traceback.format_exc()) # uncomment for exact trace report (PaSh user should see informative messages for unparellizable regions) + except (AdjLineNotImplementedError, NotImplementedError) as e: + log("WARNING: Exception caught because some part is not implemented:", e) + log(traceback.format_exc()) except Exception as e: log("WARNING: Exception caught:", e) - # traceback.print_exc() + log(traceback.format_exc()) return ret @@ -142,7 +152,7 @@ def compile_optimize_output_script( ret = optimized_ast_or_ir else: - raise Exception("Script failed to compile!") + raise UnparallelizableError("Script failed to compile!") return ret