Skip to content

Commit

Permalink
Merge pull request #189 from Idein/onnx2nnoir-with-mypy
Browse files Browse the repository at this point in the history
enable mypy check in nnoir_onnx
  • Loading branch information
eguchi1904 authored Sep 29, 2023
2 parents 9ecd45d + 40c9040 commit e89e820
Show file tree
Hide file tree
Showing 94 changed files with 886 additions and 552 deletions.
5 changes: 0 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ jobs:
dir: nnoir
- lint:
dir: nnoir
- run:
name: manual mypy check # workaround https://github.com/pfnet/pysen/issues/32
working_directory: nnoir
command: |
poetry run mypy . --strict --implicit-reexport --ignore-missing-imports
- run:
name: Test nnoir
working_directory: nnoir
Expand Down
6 changes: 3 additions & 3 deletions nnoir-onnx/nnoir_onnx/freeze.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@
from nnoir_onnx import utils


def command_list(args) -> None:
def command_list(args: argparse.Namespace) -> None:
model = onnx.load(args.input)
s = utils.list_dimension_variables(model)
if len(s) != 0:
print(s)


def command_freeze(args) -> None:
def command_freeze(args: argparse.Namespace) -> None:
model = onnx.load(args.input)
fixed_model = utils.freeze_dimension_variables(model, args.fix_dimension)
onnx.save(fixed_model, args.output)


def freeze():
def freeze() -> None:
print("Warning: freeze_onnx is deprecated. Instead use `onnx2nnoir --fix_dimension`.")
parser = argparse.ArgumentParser(description="ONNX Freezer")
subparsers = parser.add_subparsers()
Expand Down
135 changes: 65 additions & 70 deletions nnoir-onnx/nnoir_onnx/onnx.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,48 @@
import re
import tempfile
from itertools import chain
from typing import Any, Dict, List, Optional

import numpy as np
import onnx
import onnxruntime
from nnoir import *
from nnoir import NNOIR, Value
from nnoir.functions import Function
from nnoir_onnx.operators import *
from numpy.typing import NDArray

from .operators.utils import InvalidONNXData, UnknownSizedVariable, UnsupportedONNXOperation
from .operators.utils import InvalidONNXData, Op, UnknownSizedVariable, UnsupportedONNXOperation
from .utils import freeze_dimension_variables, list_dimension_variables


def tensor_to_narray(tensor):
def tensor_to_narray(tensor: onnx.TensorProto) -> NDArray[Any]:
arr = []
storage = onnx.mapping.TENSOR_TYPE_TO_STORAGE_TENSOR_TYPE[tensor.data_type]
storage = onnx.mapping.STORAGE_TENSOR_TYPE_TO_FIELD[storage]
arr = getattr(tensor, storage)
if arr == []:
result = np.frombuffer(tensor.raw_data, dtype=onnx.mapping.TENSOR_TYPE_TO_NP_TYPE[tensor.data_type])
result: NDArray[Any] = np.frombuffer(tensor.raw_data, dtype=onnx.mapping.TENSOR_TYPE_TO_NP_TYPE[tensor.data_type]) # type: ignore
else:
result = np.array(arr, dtype=onnx.mapping.TENSOR_TYPE_TO_NP_TYPE[tensor.data_type])
shape = tensor.dims if tensor.dims != [] else [1]
return result.reshape(*shape)


def narray_to_value_info(name, arr):
def narray_to_value_info(name: str, arr: NDArray[Any]) -> onnx.ValueInfoProto:
return onnx.helper.make_tensor_value_info(name, onnx.mapping.NP_TYPE_TO_TENSOR_TYPE[arr.dtype], arr.shape)


def value_info_to_zero_narray(vi):
def value_info_to_zero_narray(vi: onnx.ValueInfoProto) -> NDArray[Any]:
return np.zeros(
list(map(lambda x: x.dim_value, vi.type.tensor_type.shape.dim)),
dtype=onnx.mapping.TENSOR_TYPE_TO_NP_TYPE[vi.type.tensor_type.elem_type],
)


class ONNX:
def __init__(self, path, graph_name=None, fix_dimension=None):
def __init__(self, path: str, graph_name: Optional[str] = None, fix_dimension: Optional[Dict[str, int]] = None):
self.onnx_path = path
self.model = onnx.load(path)
self.model: onnx.ModelProto = onnx.load(path)
if graph_name is not None:
self.model.graph.name = graph_name
if fix_dimension is not None:
Expand Down Expand Up @@ -120,13 +123,13 @@ def __init__(self, path, graph_name=None, fix_dimension=None):
self.constant_nodes = {n: self.nodes[n] for n in constant_nodes}
self.opset_version = self.model.opset_import[0].version

def _internal_values_info(self, model):
values = list(set([v for n in model.graph.node for v in n.output]))
def _internal_values_info(self, model: onnx.ModelProto) -> List[onnx.ValueInfoProto]:
values: List[str] = list(set([v for n in model.graph.node for v in n.output]))
return [onnx.helper.make_empty_tensor_value_info(v) for v in values]

def _rename_to_c_ident(self):
def _rename_to_c_ident(self) -> None:
m = copy.deepcopy(self.model)
value_names = [i.name for i in m.graph.input] + [v.name for v in self._internal_values_info(m)]
value_names: List[str] = [i.name for i in m.graph.input] + [v.name for v in self._internal_values_info(m)]
# Initializer id is not restricted C identifier syntax rules.
for initializer in self.model.graph.initializer:
rename_step = 0
Expand All @@ -151,19 +154,19 @@ def _rename_to_c_ident(self):
n.input[i] = rename_candidate
initializer.name = rename_candidate

def _try_run(self, constant_nodes):
def _try_run(self, constant_nodes: List[str]) -> Dict[str, NDArray[Any]]:
model = copy.deepcopy(self.model)
while len(model.graph.output) > 0:
model.graph.output.pop(0)
inits = [v.name for v in model.graph.initializer]
input_values = [v for v in model.graph.input if v.name not in inits]
dummy_inputs = {i.name: value_info_to_zero_narray(i) for i in input_values}
outputs = [
input_values: List[onnx.ValueInfoProto] = [v for v in model.graph.input if v.name not in inits]
dummy_inputs: Dict[str, NDArray[Any]] = {i.name: value_info_to_zero_narray(i) for i in input_values}
outputs: List[onnx.ValueInfoProto] = [
*[v for v in model.graph.input],
*self._internal_values_info(model),
]

result = copy.deepcopy(dummy_inputs)
result: Dict[str, NDArray[Any]] = copy.deepcopy(dummy_inputs)
for t in model.graph.initializer:
result[t.name] = tensor_to_narray(t)
with tempfile.NamedTemporaryFile() as f:
Expand Down Expand Up @@ -198,62 +201,64 @@ def _try_run(self, constant_nodes):
for k, v in zip(outputs, sess.run(outputs, {i: dummy_inputs[i] for i in inputs})):
if k not in constant_nodes:
# save memory usage
v = np.broadcast_to(np.zeros(1, dtype=v.dtype), (1,) if v.ndim == 0 else v.shape)
v = np.broadcast_to(np.zeros(1, dtype=v.dtype), (1,) if v.ndim == 0 else v.shape) # type: ignore
result[k] = v
dummy_inputs[k] = v
model.graph.input.append(narray_to_value_info(k, v))

return result

def _find(self, p, xs, default=None):
def _find(self, p, xs, default=None): # type: ignore
return next(filter(p, xs), default)

def _find_initializer(self, name):
return self._find(lambda n: name == n.name, self.model.graph.initializer)
def _find_initializer(self, name: str) -> Optional[onnx.TensorProto]:
return self._find(lambda n: name == n.name, self.model.graph.initializer) # type: ignore

def _has_initializer(self, name):
def _has_initializer(self, name: str) -> bool:
return self._find_initializer(name) is not None

def _find_generator(self, name):
return self._find(lambda n: name in n.output, self.model.graph.node)
def _find_generator(self, name: str) -> Optional[onnx.NodeProto]:
return self._find(lambda n: name in n.output, self.model.graph.node) # type: ignore

def _find_input(self, name):
return self._find(lambda n: name == n.name, self.model.graph.input)
def _find_input(self, name: str) -> Optional[onnx.ValueInfoProto]:
return self._find(lambda n: name == n.name, self.model.graph.input) # type: ignore

def _has_input(self, name):
def _has_input(self, name: str) -> bool:
return self._find_input(name) is not None

def to_NNOIR(self):
inputs = list(map(lambda x: x.name, self.sess.get_inputs()))
outputs = list(map(lambda x: x.name, self.sess.get_outputs()))
def to_NNOIR(self) -> NNOIR:

inputs: List[str] = [x.name for x in self.sess.get_inputs()]
outputs: List[str] = [x.name for x in self.sess.get_outputs()]
try:
functions = self._to_NNOIR_functions()
except UnsupportedONNXOperation as e:
self._dump_dot()
raise e
nodes = [Value(n, self.nodes[n]) for n in set(chain.from_iterable(map(lambda x: x.inputs + x.outputs, functions)))]

# rename to C ident (some frameworks don't satisfy the onnx spec.)
renaming_table = {n.name: f"v{i}".encode("utf-8") for i, n in enumerate(nodes)}
# FIXME: name of nnoir.Value should be bytes. (Throughout the onnx_nnoir source code, there are many mismatches between byte and str types.)
nodes = [Value(n, self.nodes[n]) for n in set(chain.from_iterable(map(lambda x: x.inputs + x.outputs, functions)))] # type: ignore

renaming_table: Dict[str, bytes] = {n.name: f"v{i}".encode("utf-8") for i, n in enumerate(nodes)} # type: ignore

def rename(x):
def rename(x: str) -> bytes:
try:
return renaming_table[x]
except Exception as e:
raise RuntimeError(f"not found key {x} in renaming_table")

inputs = list(map(rename, inputs))
outputs = list(map(rename, outputs))
inputs: List[bytes] = list(map(rename, inputs)) # type: ignore
outputs: List[bytes] = list(map(rename, outputs)) # type: ignore

def rename_function(e):
e.inputs = list(map(rename, e.inputs))
e.outputs = list(map(rename, e.outputs))
def rename_function(e: Function) -> Function:
e.inputs = list(map(rename, e.inputs)) # type: ignore
e.outputs = list(map(rename, e.outputs)) # type: ignore
return e

functions = list(map(rename_function, functions))

def rename_node(n):
n.name = rename(n.name)
def rename_node(n: Value) -> Value:
n.name = rename(n.name) # type: ignore
return n

nodes = list(map(rename_node, nodes))
Expand All @@ -262,13 +267,13 @@ def rename_node(n):
self.model.graph.name.encode("utf-8"),
self.model.producer_name,
self.model.producer_version,
inputs,
outputs,
inputs, # type: ignore
outputs, # type: ignore
nodes,
functions,
)

def _eval_nodes(self, nodes):
def _eval_nodes(self, nodes: List[str]) -> Dict[str, Any]:
m = copy.deepcopy(self.model)
for n in m.graph.output:
m.graph.output.remove(n)
Expand All @@ -286,7 +291,7 @@ def _eval_nodes(self, nodes):
result = []
return dict(zip(output_names, result))

def test(self):
def test(self) -> None:
with tempfile.NamedTemporaryFile() as tmpf:
m = copy.deepcopy(self.model)
for n in m.graph.output:
Expand All @@ -302,14 +307,14 @@ def test(self):
outputs = [x.name for x in sess.get_inputs()]
results = sess.run(outputs, inputs)

def op_for_node(self, node):
def op_for_node(self, node: onnx.NodeProto) -> Op:
op_name = f"Op{node.op_type}"
if op_name in globals():
return globals()[op_name](node, self.opset_version)
return globals()[op_name](node, self.opset_version) # type: ignore
else:
raise UnsupportedONNXOperation(node, f"converting from {node.op_type} is undefined")

def _to_NNOIR_functions(self):
def _to_NNOIR_functions(self) -> List[Function]:
outputs = list(map(lambda x: x.name, self.sess.get_outputs()))
visited = []
known_generator = []
Expand All @@ -324,27 +329,30 @@ def _to_NNOIR_functions(self):
continue
if generator is not None:
function = self.op_for_node(generator).to_function(self.nodes, self.constant_nodes)

inputs = list(chain.from_iterable(map(lambda x: x.inputs, function)))
outputs += inputs
functions += function
known_generator.append(generator)
initializer = self._find_initializer(o)
if initializer is not None:
raise UnsupportedONNXOperation(initializer, "converting from Constant is undefined")

return functions

def _list_constant_nodes(self):
outputs = list(map(lambda x: x.name, self.sess.get_outputs()))
def _list_constant_nodes(self) -> List[str]:
outputs: List[str] = [x.name for x in self.sess.get_outputs()]

def dfs(visited, nodes, result):
def dfs(visited: List[str], nodes: List[str], result: List[str]) -> None:
for n in nodes:
if self._has_initializer(n):
result.append(n)
elif self._has_input(n):
pass
else:
generator = self._find_generator(n)
if generator is None:
raise InvalidONNXData(f"generator {n} not found")

if generator.op_type == "Shape": # In nnoir, array shape is known information.
result.append(n)
next_nodes = []
Expand All @@ -360,11 +368,11 @@ def dfs(visited, nodes, result):
result.append(o)
visited.append(n)

result = []
result: List[str] = []
dfs([], outputs, result)
return result

def _dot_box_color(self, node):
def _dot_box_color(self, node: onnx.NodeProto) -> str:
if not all([o in self.constant_nodes for o in node.output]):
try:
_ = self.op_for_node(node).to_function(self.nodes, self.constant_nodes)
Expand All @@ -376,14 +384,14 @@ def _dot_box_color(self, node):
else:
return "white"

def _dump_dot(self):
def _dump_dot(self) -> None:
dot_path = f"{self.onnx_path}.dot"
ln = "\l"

value_name_table = NameTable("val")
function_name_table = NameTable("fun")

def is_used(name):
def is_used(name: str) -> bool:
for n in self.model.graph.node:
if name in n.input:
return True
Expand Down Expand Up @@ -446,22 +454,9 @@ def is_used(name):
)


def to_dummy_input(x):
if hasattr(x.type, "tensor_type"):
if x.type.tensor_type.elem_type == onnx.TensorProto.FLOAT:
return np.zeros(
tuple(map(lambda d: d.dim_value, x.type.tensor_type.shape.dim)),
dtype=np.float32,
)
else:
raise "unsupported"
else:
raise "unsupported"


class NameTable:
def __init__(self, prefix: str) -> None:
self.tbl = dict()
self.tbl: Dict[str, str] = dict()
self.prefix = prefix

def __getitem__(self, key: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion nnoir-onnx/nnoir_onnx/onnx2nnoir.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from nnoir_onnx import ONNX, utils


def main():
def main() -> None:
parser = argparse.ArgumentParser(description="ONNX to NNOIR Converter")
parser.add_argument(
"-o", "--output", dest="output", type=str, required=True, metavar="NNOIR", help="output(NNOIR) file path"
Expand Down
6 changes: 6 additions & 0 deletions nnoir-onnx/nnoir_onnx/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
from typing import Any, Dict, List, Optional, Tuple

import onnx
from nnoir.functions import Function
from numpy.typing import NDArray

from .add import OpAdd
from .average_pool import OpAveragePool
from .batch_normalization import OpBatchNormalization
Expand Down
Loading

0 comments on commit e89e820

Please sign in to comment.