# Copyright (C) 2011-2012 Johan Hake
#
# This file is part of Gotran.
#
# Gotran is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Gotran is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Gotran. If not, see <http://www.gnu.org/licenses/>.
import re
from future.standard_library import install_aliases
install_aliases()
import urllib.request, urllib.parse, urllib.error
from xml.etree import ElementTree
from functools import cmp_to_key
from collections import OrderedDict, deque, defaultdict
from gotran.common import info, warning, error, check_arg, begin_log, end_log
from gotran.model.odeobjects import cmp
from modelparameters.codegeneration import _all_keywords
from modelparameters.parameterdict import *
# Local imports
from gotran.common.options import parameters
from gotran.input.mathml import MathMLBaseParser
__all__ = ["cellml2ode", "CellMLParser"]
si_unit_map = {"ampere":"A", "becquerel":"Bq", "candela":"cd", "celsius":"gradC",
"coulomb":"C","dimensionless":"1", "farad":"F", "gram":"g",
"gray":"Gy", "henry":"H", "hertz":"Hz", "joule":"J", "katal":"kat",
"kelvin":"K", "kilogram":"kg", "liter":"l", "litre":"l","molar":"M",
"lumen":"lm", "lux":"lx", "meter":"m", "metre":"m", "mole":"mole",
"newton":"N", "ohm":"Omega", "pascal":"Pa", "radian":"rad",
"second":"s", "siemens":"S", "sievert":"Sv", "steradian":"sr",
"tesla":"T", "volt":"V", "watt":"W", "weber":"Wb", "dimensionless":"1"}
prefix_map = {"deca":"da", "hecto":"h", "kilo":"k", "mega":"M", "giga":"G",
"tera":"T", "peta":"P", "exa":"E", "zetta":"Z", "yotta":"Y",
"deci":"d", "centi":"c", "milli":"m", "micro":"u", "nano":"n",
"pico":"p", "femto":"f", "atto":"a", "zepto":"z", "yocto":"y",
None:"", "-3":"m"}
ui = "UNINITIALIZED"
class Equation(object):
"""
Class for holding information about an Equation
"""
def __init__(self, name, expr, used_variables):
self.name = name
self.expr = expr
self.used_variables = used_variables
self.dependent_equations = []
self.component = None
def check_dependencies(self, equation):
"""
Check Equation dependencies
"""
assert(isinstance(equation, Equation))
if equation.name in self.used_variables:
self.dependent_equations.append(equation)
def __str__(self):
return self.name
def __repr__(self):
return "Equation({0} = {1})".format(self.name, "".join(self.expr))
def __eq__(self, other):
if not isinstance(other, Equation):
return False
return other.name == self.name and other.component == self.component
def __hash__(self):
return hash(self.name+self.component.name)
class Component(object):
"""
Class for holding information about a CellML Component
"""
def __init__(self, name, variables, equations, state_variables=None):
self.name = name
self.variable_info = {}
self.state_variables = OrderedDict((state, variables.pop(state, None))\
for state in state_variables)
for state, info in list(self.state_variables.items()):
self.variable_info[state] = info
self.variable_info["type"] = "state_variable"
self.parameters = OrderedDict((name, info) for name, info in \
list(variables.items()) if info["init"] is not None)
for param, info in list(self.parameters.items()):
self.variable_info[param] = info
self.variable_info["type"] = "parameter"
self.derivatives = state_variables
self.parent = None
self.children = []
self.dependencies = OrderedDict()
self.used_in = OrderedDict()
# Store equations
self.sort_and_store_equations(equations)
# Extract info
dummy = dict(init=None, unit="1", private=True)
for eq in self.equations:
self.variable_info[eq.name] = variables.get(eq.name, dummy)
self.variable_info[eq.name]["type"] = "equation"
# Get used variables
self.used_variables = set()
for equation in self.equations:
self.used_variables.update(equation.used_variables)
# Remove dependencies on names defined by component
self.used_variables.difference_update(\
equation.name for equation in self.equations)
self.used_variables.difference_update(\
name for name in self.parameters)
self.used_variables.difference_update(\
name for name in self.state_variables)
def sort_and_store_equations(self, equations):
# Check if reserved name for state derivativeis is used as equation
# name
derivative_names = ["d{0}_dt".format(der) for der in self.derivatives]
removal =[]
for eq in equations[:]:
if eq.name in derivative_names and len(eq.expr) == 1 and \
eq.expr[0] == eq.name:
equations.remove(eq)
# Check internal dependencies
for eq0 in equations:
eq0.dependent_equations = []
# Store component
eq0.component = self
for eq1 in equations:
if eq0 == eq1:
continue
eq0.check_dependencies(eq1)
sorted_equations = []
while equations:
equation = equations.pop(0)
if any(dep in equations for dep in equation.dependent_equations):
equations.append(equation)
else:
sorted_equations.append(equation)
# Store the sorted equations
self.equations = sorted_equations
def __hash__(self):
return hash(self.name)
def __str__(self):
return self.name + "<{0}>".format(len(self.state_variables))
def __repr__(self):
return "Component<{0}, {1}>".format(self.name, \
len(self.state_variables))
def __eq__(self, other):
if not isinstance(other, Component):
return False
return other.name == self.name
def check_dependencies(self, component):
"""
Check components dependencies
"""
assert(isinstance(component, Component))
if any(equation.name in self.used_variables \
for equation in component.equations):
dep_equations = [equation for equation in component.equations \
if equation.name in self.used_variables]
# Register mutual dependencies
self.dependencies[component] = dep_equations
component.used_in[self] = dep_equations
# Add logics for dependencies of all Equations.
for other_equation in component.equations:
for equation in self.equations:
if other_equation.name in equation.used_variables:
equation.dependent_equations.append(other_equation)
def change_parameter_name(self, oldname, newname=None):
"""
Change the name of a parameter
Assume the name is only used locally within this component
"""
assert(oldname in self.parameters)
# If no newname is give we pick one based on the component name
if newname is None:
newname = oldname + "_" + self.name.split("_")[0]
warning("Locally change parameter name '{0}' to '{1}' in "\
"component '{2}'.".format(oldname, newname, self.name))
# Update parameters
self.parameters = OrderedDict((newname if name == oldname else \
name, value) for name, value in \
list(self.parameters.items()))
# Update equations
for eqn in self.equations:
while oldname in eqn.expr:
eqn.expr[eqn.expr.index(oldname)] = newname
return newname
def change_state_name(self, oldname, newname=None):
"""
Change the name of a state
Assume the name is only used locally within this component
"""
assert(oldname in self.state_variables)
# If no newname is give we pick one based on the component name
if newname is None:
newname = oldname + "_" + self.name.split("_")[0]
warning("Locally change state name '{0}' to '{1}' in component "\
"'{2}'.".format(oldname, newname, self.name))
# Update parameters
self.state_variables = OrderedDict((newname if name == oldname \
else name, value) for name, value in \
list(self.state_variables.items()))
oldder = self.derivatives[oldname]
newder = oldder.replace(oldname, newname)
self.derivatives = OrderedDict((newname if name == oldname else name, \
newder if value == oldder else value) \
for name, value in \
list(self.derivatives.items()))
# Update equations
for eqn in self.equations:
while oldname in eqn.expr:
eqn.expr[eqn.expr.index(oldname)] = newname
while oldder in eqn.expr:
eqn.expr[eqn.expr.index(oldder)] = newder
if oldder == eqn.name:
eqn.name = newder
# Update derivative equation
old_eq_name = "d{0}_dt".format(oldname)
new_eq_name = "d{0}_dt".format(newname)
self.variable_info[new_eq_name] = self.variable_info.pop(\
old_eq_name, dict(init=None, unit="1", private=True, type="equation"))
return newname
def change_equation_name(self, oldname, newname=None):
assert(oldname in self.variable_info and \
self.variable_info[oldname].get("type")=="equation")
# If no newname is give we pick one based on the component name
if newname is None:
newname = oldname + "_" + self.name.split("_")[0]
warning("Locally change equation name '{0}' to '{1}' in "\
"component '{2}'.".format(oldname, newname, self.name))
# Go through all equations and change the name used locally
for ind, eqn in enumerate(self.equations[:]):
while oldname in eqn.expr:
eqn.expr[eqn.expr.index(oldname)] = newname
# Update the actuall equation
if eqn.name == oldname:
eqn.name = newname
self.equations[ind] = eqn
# Update meta info
self.variable_info[newname] = self.variable_info.pop(oldname)
return newname
def change_variable_name(self, oldname, newname=None):
if oldname not in self.variable_info:
error("Cannot change variable name. {0} is not a variable "\
"in component {1}".format(oldname, self.name))
return
vartype = self.variable_info[oldname]["type"]
assert(vartype in ["state_variable", "parameter", "equation"])
if vartype == "state_variable":
return self.change_state_name(oldname, newname)
if vartype == "parameter":
return self.change_parameter_name(oldname, newname)
return self.change_equation_name(oldname, newname)
[docs]class CellMLParser(object):
"""
This module parses a CellML XML-file and converts it to PyCC code
"""
[docs] @staticmethod
def default_parameters():
# Get the default parameters from the global parameters object
return parameters.input.cellml.copy()
def __init__(self, model_source, params=None, targets=None, ):
"""
Arguments:
----------
model_source: str
Path or url to CellML file
params : dict
A dict with parameters for the
targets : list, dict (optional)
Components of the model to parse
"""
targets = targets or []
params = params or {}
check_arg(model_source, str)
check_arg(targets, (list, dict))
self._params = self.default_parameters()
self._params.update(params)
# Open file or url
try:
with open(model_source, "r") as fp:
self.cellml = ElementTree.parse(fp).getroot()
except IOError:
try:
fp = urllib.request.urlopen(model_source)
self.cellml = ElementTree.parse(fp).getroot()
except:
error("ERROR: Unable to open " + model_source)
self.model_source = model_source
self.name = self.cellml.attrib['name']
begin_log("Parsing CellML model: {0}".format(self.name))
self.mathmlparser = MathMLBaseParser(self._params.use_sympy_integers)
self.cellml_namespace = self.cellml.tag.split("}")[0] + "}"
self.parse_units()
self.components = self.parse_components(targets)
self.documentation = self.parse_documentation()
end_log()
[docs] def parse_documentation(self):
"""
Parse the documentation of the article
"""
namespace = "{http://cellml.org/tmp-documentation}"
article = self.cellml.getiterator(namespace+"article")
try:
article = list(article)[0]
except IndexError:
return ""
articleinfo = list(article.getiterator(namespace+"articleinfo"))[0]
# Get title
if articleinfo and articleinfo.getiterator(namespace+"title"):
title = list(articleinfo.getiterator(namespace+"title"))[0].text
else:
title = ""
# Get model structure comments
for child in article.getchildren():
if child.attrib.get("id") == "sec_structure":
content = []
for par in child.getiterator(namespace+"para"):
# Get lines
splitted_line = deque(("".join(text.strip() \
for text in par.itertext())).\
split(" "))
# Cut them in lines which are not longer than 80 characters
ret_lines = []
while splitted_line:
line_stumps = []
line_length = 0
while splitted_line and (line_length + \
len(splitted_line[0]) < 80):
line_stumps.append(splitted_line.popleft())
line_length += len(line_stumps[-1]) + 1
ret_lines.append(" ".join(line_stumps))
content.extend(ret_lines)
content.append("\n")
# Clean up content
content = ("\n".join(cont.strip() for cont in content)).\
replace(" ", " ").replace(" .", ".").replace(\
" ,", ",")
break
else:
content = ""
if title or content:
return "%s\n\n%s" % (title, content)
return ""
def _gettag(self, node):
"""
Splits off the namespace part from name, and returns the rest, the tag
"""
return "".join(node.tag.split("}")[1:])
[docs] def parse_units(self):
"""
Parse any declared units in the model
"""
collected_units = OrderedDict()
unit_map = si_unit_map.copy()
# Extend unit_map
for cellml_pref, pref in list(prefix_map.items()):
if cellml_pref:
unit_map[cellml_pref+"molar"] = pref+"M"
collected_units[cellml_pref+"molar"] = {pref+"M": (pref+"M", "1")}
parsed_twice = []
all_units = deque(self.get_iterator("units"))
while all_units:
units = all_units.popleft()
unit_name = units.attrib["name"]
if unit_name in unit_map:
continue
collected_parts = OrderedDict()
for unit in units.getchildren():
if unit.attrib.get("multiplier"):
warning("skipping multiplier in unit {0}".format(units.attrib["name"]))
if unit.attrib.get("multiplier"):
warning("skipping multiplier in unit {0}".format(units.attrib["name"]))
cellml_unit = unit.attrib.get("units")
prefix = prefix_map[unit.attrib.get("prefix")]
exponent = unit.attrib.get("exponent", "1")
if cellml_unit in si_unit_map:
abbrev = si_unit_map[cellml_unit]
name = prefix+abbrev
if exponent not in ["0", "1"]:
fullname = name + "**" + exponent
else:
fullname = name
collected_parts[name] = (fullname, exponent)
elif cellml_unit in collected_units:
if prefix:
warning("Skipping prefix of unit '{0}'".format(cellml_unit))
for name, (fullnam, part_exponent) in \
list(collected_units[cellml_unit].items()):
new_exponent = float(part_exponent) * float(exponent)
if new_exponent % 1.0 == 0.0:
new_exponent = str(int(new_exponent))
else:
new_exponent = str(new_exponent)
if new_exponent not in ["0", "1"]:
fullname = name + "**" + new_exponent
else:
fullname = name
collected_parts[name] = (fullname, exponent)
elif units not in parsed_twice:
all_units.append(units)
parsed_twice.append(units)
break
else:
warning("Unknown unit '{0}' in ".format(cellml_unit, units["name"]))
else:
# Try change mole*l**-1 to mM...
#print unit_name, collected_parts
#for name, (fullname, exponent) in collected_parts.items():
# if "mole" in name and "l" in collected_parts and \
# collected_parts["l"][1][0]!=exponent[0]:
# print "FOUND"
collected_units[unit_name] = collected_parts
unit_map[unit_name] = "*".join(fullname for fullname, exp \
in list(collected_parts.values()))
# Store unit map
self.units_map = unit_map
[docs] def get_iterator(self, name, item=None):
"""
Return an element tree iterator
"""
item = item if item is not None else self.cellml
return item.getiterator(self.cellml_namespace+name)
[docs] def check_and_register_component_variables(\
self, comp, collected_states, collected_parameters, collected_equations):
"""
Check if component variables are allready collected
"""
# Check for duplication of states
for name in list(comp.state_variables.keys()):
der_name = "d{0}_dt".format(name)
if name in self._params.change_state_names:
newname = name + "_" + comp.name.split("_")[0]
comp.change_state_name(name, newname)
name = newname
# Check state name vs collected state names
elif name in collected_states:
state_comp = collected_states[name]
begin_log("Same state name: '{0}' is used in component: '{1}' "\
"and '{2}'.".format(name, comp.name, state_comp.name))
for change_comp in [comp, state_comp]:
if change_comp.state_variables[name]["private"] and \
change_comp.variable_info[der_name]["private"]:
new_name = change_comp.change_state_name(name)
if change_comp == state_comp:
collected_states[new_name] = change_comp
break
else:
warning("Could not resolve duplicated state name {0} in "\
"component {1} and {2}. None of them are private "\
"to the components.".format(name, comp.name,
state_comp.name))
end_log()
# Check state name vs collected parameter names
elif name in collected_parameters:
param_comp = collected_parameters[name]
begin_log("State name: '{0}' from component '{1}' is used as "\
"parameter in component '{2}'.".format(\
name, comp.name, param_comp.name))
# If parameter is private we change that
if param_comp.parameters[name]["private"]:
new_name = param_comp.change_parameter_name(name)
collected_parameters.pop(name)
collected_parameters[new_name] = param_comp
# Elseif the state variable is private we change that one
elif comp.state_variables[name]["private"] and \
comp.variable_info[der_name]["private"]:
name = comp.change_state_name(name)
else:
warning("Could not resolve duplicated state and "\
"parameter name {0} in component {1} and {2}. "\
"None of them are private to the "\
"components.".format(name, comp.name,
param_comp.name))
end_log()
# Check state name vs collected equation names
elif name in collected_equations:
eq_comp = collected_equations[name]
begin_log("State name '{0}' from component '{1}' is used as "\
"parameter in component '{2}'.".format(\
name, comp.name, eq_comp.name))
# If state is private we change it
if comp.state_variables[name]["private"] and \
comp.variable_info[der_name]["private"]:
name = comp.change_state_name(name)
elif eq_comp.variable_info[name]["private"]:
new_name = eq_comp.change_equation_name(name)
collected_equation.pop(name)
collected_equation[new_name] = eq_comp
else:
warning("Could not resolve duplicated state and "\
"equation name {0} in component {1} and {2}. "\
"None of them are private to the "\
"components.".format(name, comp.name, eq_comp.name))
end_log()
# Register the collected state
collected_states[name] = comp
# Check parameters
for name in list(comp.parameters.keys()):
# Check parameter name vs collected state names
if name in collected_states:
state_comp = collected_states[name]
der_name = "d{0}_dt".format(name)
begin_log("Same parameter and state name: '{0}' is used in "\
"component '{1}' and '{2}'.".format(\
name, comp.name, state_comp.name))
# If parameter is private we change that
if comp.parameters[name]["private"]:
name = comp.change_parameter_name(name)
elif state_comp.state_variables[name]["private"] and \
state_comp.variable_info[der_name]["private"]:
new_name = state_comp.change_state_name(name)
collected_states.pop(name)
collected_states[new_name] = state_comp
else:
warning("Could not resolve duplicated state name {0} in "\
"component {1} and {2}. None of them are private "\
"to the components.".format(name, comp.name,
state_comp.name))
end_log()
# Check state name vs collected parameter names
elif name in collected_parameters:
param_comp = collected_parameters[name]
begin_log("Parameter name '{0}' from component '{1}' is used as "\
"parameter in component '{2}'.".format(\
name, comp.name, param_comp.name))
# If registered parameter is private we change that
if param_comp.parameters[name]["private"]:
new_name = param_comp.change_parameter_name(name)
collected_parameters.pop(name)
collected_parameters[new_name] = param_comp
# if the parameter is private we change that one
elif comp.parameters[name]["private"]:
name = comp.change_parameter_name(name)
else:
warning("Could not resolve duplicated parameter names "\
"{0} in component {1} and {2}. "\
"None of them are private to the "\
"components.".format(name, comp.name,
param_comp.name))
end_log()
# Check state name vs collected equation names
elif name in collected_equations:
eq_comp = collected_equations[name]
begin_log("Parameter name '{0}' from component '{1}' "\
"is used as parameter in component '{2}'.".format(\
name, comp.name, eq_comp.name))
# If parameter is private we change it
if comp.parameters[name]["private"]:
name = comp.change_parameter_name(name)
elif eq_comp.variable_info[name]["private"]:
new_name = eq_comp.change_equation_name(name)
collected_equation.pop(name)
collected_equation[new_name] = eq_comp
else:
warning("Could not resolve duplicated parameter and "\
"equation names {0} in component {1} and {2}. "\
"None of them are private to the "\
"components.".format(name, comp.name, eq_comp.name))
end_log()
collected_parameters[name] = comp
# Check equation name
for eq in comp.equations:
name = eq.name
# If the equation is private we do not care if it is duplicated
# elsewhere. Risky...
#if comp.variable_info[name]["private"]:
# collected_equations[name] = comp
# continue
# If the equation will be broadcasted to another name we continue
#print comp.name, name, self.new_variable_connections.get(comp.name, {}).keys()
#if name in self.new_variable_connections.get(comp.name, {}).keys():
# continue
# Check equation name vs collected state names
if name in collected_states:
state_comp = collected_states[name]
der_name = "d{0}_dt".format(name)
begin_log("Same equation and state name '{0}' is used in "\
"component '{1}' and '{2}'.".format(\
name, comp.name, state_comp.name))
# If equation is private we change that
#if comp.variable_info[name]["private"]:
# name = comp.change_equation_name(name)
if state_comp.state_variables[name]["private"] and \
state_comp.variable_info[der_name]["private"]:
new_name = state_comp.change_state_name(name)
collected_states.pop(name)
collected_states[new_name] = state_comp
else:
warning("Could not resolve duplicated state and "\
"equation name {0} in component {1} and {2}. "\
"None of them are private to the "\
"components.".format(name, comp.name,
state_comp.name))
end_log()
# Check equation name vs collected parameter names
elif name in collected_parameters:
param_comp = collected_parameters[name]
begin_log("Equation name '{0}' from component '{1}' is used as "\
"parameter in component '{2}'.".format(\
name, comp.name, param_comp.name))
# If registered parameter is private we change that
if param_comp.parameters[name]["private"]:
new_name = param_comp.change_parameter_name(name)
collected_parameters.pop(name)
collected_parameters[new_name] = param_comp
# If the parameter is private we change that one
#elif comp.variable_info[name]["private"]:
# name = comp.change_equation_name(name)
else:
warning("Could not resolve duplicated parameter and "\
"equation name {0} in component {1} and {2}. "\
"None of them are private to the "\
"components.".format(name, comp.name,
param_comp.name))
end_log()
# Check equation name vs collected equation names
elif name in collected_equations:
eq_comp = collected_equations[name]
info("Equation name '{0}' from component '{1}' is used as "\
"equation name in component '{2}'.".format(\
name, comp.name, eq_comp.name))
# If equation is private we change it
#if comp.variable_info[name]["private"] and \
# eq_comp.variable_info[name]["private"]:
# warning("Both equations are private. We do not change "\
# "name on either one of them.")
#elif eq_comp.variable_info[name]["private"]:
# new_name = eq_comp.change_equation_name(name)
# collected_equations.pop(name)
# collected_equations[new_name] = eq_comp
#
#else:
# warning("Could not resolve duplicated parameter and "\
# "equation names {0} in component {1} and {2}. "\
# "None of them are private to the "\
# "components.".format(name, comp.name, eq_comp.name))
collected_equations[name] = comp
[docs] def parse_imported_model(self):
"""
Parse any imported models
"""
components = OrderedDict()
# Collect states and parameters to check for duplicates
collected_states = dict()
collected_parameters = dict()
collected_equations = dict()
# Import other models
imports = self.get_iterator("import")
if imports:
info("Parsing imported models:")
for model in imports:
import_comp_names = dict()
for comp in self.get_iterator("component", model):
import_comp_names[comp.attrib["component_ref"]] = \
comp.attrib["name"]
model_parser = CellMLParser(\
model.attrib["{http://www.w3.org/1999/xlink}href"], \
targets=import_comp_names)
for comp in model_parser.components:
components[comp.name] = comp
# Extract states, parameters and equations
for comp in list(components.values()):
self.check_and_register_component_variables(\
comp, collected_states, collected_parameters, \
collected_equations)
return components, collected_states, collected_parameters,\
collected_equations
[docs] def get_parents(self, grouping, element=None):
"""
If group was used in the cellml use it to gather parent information
about the components
"""
# Collect component encapsulation
def get_encapsulation(elements, all_parents, parent=None):
children = {}
for encap in elements:
name = encap.attrib["component"]
all_parents[name] = parent
if encap.getchildren():
nested_children = get_encapsulation(\
encap.getchildren(), all_parents, name)
children[name] = dict(children=nested_children, \
parent=parent)
else:
children[name] = dict(children=None, parent=parent)
return children
encapsulations = dict()
all_parents = dict()
for group in self.get_iterator("group", element):
children = group.getchildren()
if children and children[0].attrib.get("relationship") == \
grouping:
encapsulations = get_encapsulation(children[1:], all_parents)
# If no group information in cellml extract potential parent information
# from component names
if not all_parents:
# Iterate over the components
comp_names = [comp.attrib["name"] for comp in self.get_iterator(\
"component", element)]
for parent_name in comp_names:
for name in comp_names:
if parent_name in name and parent_name != name:
all_parents[name] = parent_name
return encapsulations, all_parents
[docs] def parse_single_component(self, comp, collected_states, \
collected_parameters, collected_equations):
"""
Parse a single component and create a Component object
"""
comp_name = comp.attrib["name"]
# Collect variables and equations
variables = OrderedDict()
equations = []
state_variables = OrderedDict()
#derivatives = []
# Get variables that are used outside the component
variables_used_in_connections = list(self.new_variable_connections.get(\
comp_name, dict()).keys()) + list(self.same_variable_connections.get(\
comp_name, dict()).keys())
# Get variable and initial values
for var in self.get_iterator("variable", comp):
var_name = var.attrib["name"]
if var_name in _all_keywords:
var_name = var_name + "_"
# Check connection and pair it with interface information to
# check if a variable is public or not
public = var_name in variables_used_in_connections and \
(var.attrib.get("public_interface") == "out" or \
var.attrib.get("private_interface") == "out")
# Store variables using initial and unit
variables[var_name] = dict(\
init = var.attrib.get("initial_value"),\
unit = self.units_map[var.attrib.get("units")],\
private = not public,
public_interface = var.attrib.get("public_interface"),
private_interface = var.attrib.get("private_interface"),
)
# Get equations
for math in comp.getiterator(\
"{http://www.w3.org/1998/Math/MathML}math"):
for eq in math.getchildren():
equation_list, state_variable, derivative, \
used_variables = self.mathmlparser.parse(eq)
# Get equation name
eq_name = equation_list[0]
if eq_name in _all_keywords:
equation_list[0] = eq_name + "_"
eq_name = equation_list[0]
# Discard collected equation name from used variables
used_variables.discard(eq_name)
assert(re.findall("(\w+)", eq_name)[0]==eq_name)
assert(equation_list[1] == self.mathmlparser["eq"])
equations.append(Equation(eq_name, equation_list[2:],
used_variables))
# Do not register state variables twice
if state_variable is not None and \
state_variable not in state_variables:
state_variables[state_variable] = derivative
# Create Component
comp = Component(comp_name, variables, equations, state_variables)
# Collect and check variables
self.check_and_register_component_variables(\
comp, collected_states, collected_parameters, \
collected_equations)
return comp
[docs] def sort_components(self, components, sorted_once=False):
if not sorted_once:
begin_log("Sorting components with respect to dependencies")
else:
begin_log("Sorting components with respect to dependencies second time")
# Check internal dependencies
for comp0 in components:
for comp1 in components:
if comp0 == comp1:
continue
comp0.check_dependencies(comp1)
def simple_sort(components):
components = deque(components)
dependant_components = []
sorted_components = []
while components:
component = components.popleft()
# Chek for circular dependancy
if dependant_components.count(component) > 4:
components.append(component)
break
if any(dep in components for dep in \
component.dependencies):
components.append(component)
dependant_components.append(component)
else:
sorted_components.append(component)
return sorted_components, list(components)
# Initial sorting
sorted_components, circular_components = simple_sort(components)
# If no circular dependencies
if not circular_components:
end_log()
return sorted_components
try:
import networkx as nx
except:
warning("networkx could not be imported. Circular "\
"dependencies between components will not be sorted out.")
return sorted_components + circular_components
# Collect zero and one dependencies
zero_dep_equations = set()
one_dep_equations = set()
equation_map = {}
# Gather zero dependent equations
for comp in circular_components:
for dep_comp, equations in list(comp.dependencies.items()):
for equation in equations:
if not equation.dependent_equations and equation.name \
in comp.used_variables:
zero_dep_equations.add(equation)
equation_map[equation.name] = equation
# Check for one dependency if that is the zero one
one_dep_zero_dep = defaultdict(set)
for comp in circular_components:
for dep_comp, equations in list(comp.dependencies.items()):
for equation in equations:
if len(equation.dependent_equations) == 1 and \
equation.name in comp.used_variables and \
equation.dependent_equations[0] in \
zero_dep_equations:
equation_map[equation.name] = equation
one_dep_equations.add(equation)
one_dep_zero_dep[equation.name].add(\
equation.dependent_equations[0].name)
# Try to eliminate circular dependency
# Extract dependent equation to a new component
# Find ODE component. If not found create one
for comp in components:
if comp.name == self.name:
ode_comp = comp
break
else:
ode_comp = Component(self.name, {}, [], {})
# Valid edges for removal
valid_edges = [eq.name for eq in zero_dep_equations] + \
[eq.name for eq in one_dep_equations]
G = nx.MultiDiGraph()
G.add_nodes_from([comp.name for comp in components])
# Build graph
for comp in components:
[G.add_edge(dep.name, comp.name, key=equation.name)
for dep, equations in list(comp.dependencies.items()) \
for equation in equations]
# collecte edges that breaks cycles
cycle_breakers = []
# Collect data over the best edge to remove
edge_score = defaultdict(lambda : 0)
edge_to_nodes = defaultdict(set)
for cycle in nx.simple_cycles(G):
local_breaker = []
for n0, n1 in zip(cycle[:-1], cycle[1:]):
if all(edge in valid_edges for edge in G[n0][n1]):
local_breaker.append([edge for edge in G[n0][n1]])
for edge in local_breaker[-1]:
edge_to_nodes[edge].add((n0, n1))
cycle_breakers.append(local_breaker)
for local_edges in local_breaker:
for edge in local_edges:
edge_score[edge] += 1
# Sort the edges we should remove by a score given by how many
# cycles it breaks by being removed.
edge_score = sorted((edge_score[edge], edge) for edge in edge_score)
# Collect edges to be removed
edge_removal = set()
cycles_fixed = [False]*len(cycle_breakers)
while not all(cycles_fixed) and edge_score:
score, edge = edge_score.pop()
# Remove this/these edge/edges this iteration
local_removal = [edge]
# If adding a one dep edge we need to also remove its dependent edge
dep_removal = set()
if edge in one_dep_zero_dep:
local_removal.extend(one_dep_zero_dep[edge])
dep_removal.update(one_dep_zero_dep[edge])
for edge_remove in local_removal:
# Iterate over the different cycles
for i, local_breakers in enumerate(cycle_breakers):
# If the cycle is fixed
if cycles_fixed[i]:
continue
# Go through the collected valid breakers and pick the one
# with least edges first
for j, local_edges in enumerate(sorted(\
local_breakers, key=cmp_to_key(lambda o0, o1: cmp(len(o0), len(o1))))):
# If the removed edge is in the local edges
if edge_remove in local_edges:
local_edges.remove(edge_remove)
edge_removal.add(edge_remove)
# Check any dependent edges
for dep_edge in one_dep_zero_dep.get(\
edge_remove, []):
edge_removal.add(dep_edge)
if len(local_edges) == 0:
cycles_fixed[i] = True
break
# If no edge sugested for removal we pick the zero dep equations
if not edge_removal:
if zero_dep_equations:
edge_removal = [eq.name for eq in zero_dep_equations]
elif one_dep_equations:
edge_removal = [eq.name for eq in one_dep_equations]
else:
warning("Could not sort out circular dependencies.")
# Remove the edges from the graph
for edge in edge_removal:
for n0, n1 in edge_to_nodes[edge]:
G.remove_edge(n0, n1, key=edge)
# Move the marked edges (equations) from the relevant components
removed_equations = {}
for edge in edge_removal:
eq = equation_map[edge]
old_comp = eq.component
ode_comp.equations.append(eq)
old_comp.equations.remove(eq)
ode_comp.variable_info[eq.name] = old_comp.variable_info.pop(eq.name)
# Store changed
removed_equations[eq] = old_comp
# Transfer used_in to new component
new_dependent_componets = OrderedDict()
for dep_comp, equations in list(old_comp.used_in.items()):
assert equations
if eq not in equations or dep_comp == ode_comp:
continue
# Remove dependency from old component and add it to the new
if len(equations) == 1:
new_dependent_componets[dep_comp] = \
old_comp.used_in.pop(dep_comp)
else:
new_dependent_componets[dep_comp] = [\
old_comp.used_in[dep_comp].pop(equations.index(eq))]
# Change component dependencies
if old_comp in dep_comp.dependencies and eq in \
dep_comp.dependencies[old_comp]:
if len(dep_comp.dependencies[old_comp]) == 1:
dep_comp.dependencies.pop(old_comp)
else:
dep_comp.dependencies[old_comp].remove(eq)
if ode_comp not in dep_comp.dependencies:
dep_comp.dependencies[ode_comp] = [eq]
else:
dep_comp.dependencies[ode_comp].append(eq)
# Store new component to the extracted equation
eq.component = ode_comp
ode_comp.dependent_componets = new_dependent_componets
if ode_comp not in sorted_components:
sorted_components.insert(0, ode_comp)
# Sort newly added equations
ode_comp.sort_and_store_equations(ode_comp.equations)
# Sort graph components and apply the sortings to the collected
# CellML compoents
sort_again = False
try:
sorted_components = nx.topological_sort(G)
components.sort(lambda n0, n1: cmp(sorted_components.index(n0.name), \
sorted_components.index(n1.name)))
message = "To avoid circular dependency the following equations "\
"has been moved:"
except Exception as e:
warning("Topological sort failed: " + str(e))
message = "In a try to avoid circular dependency the following equations "\
"has been moved:"
sort_again = True
# Insert the ODE component with extracted equations first
components.insert(0, ode_comp)
warning(message)
for eq, old_comp in list(removed_equations.items()):
warning("{0} : from {1} to {2} component".format(\
eq.name, old_comp.name, ode_comp.name))
end_log()
if 0:#sort_again:
from IPython import embed; embed()
exit()
# Try rebuild the graph and make another topological sort
G = nx.MultiDiGraph()
G.add_nodes_from([comp.name for comp in components])
# Build graph
for comp in components:
[G.add_edge(dep.name, comp.name, key=equation.name)
for dep, equations in list(comp.dependencies.items()) \
for equation in equations]
try:
pass
# sorted_components = nx.topological_sort(G)
# components = nx.topological_sort(G)
# components.sort(lambda n0, n1: cmp(list(sorted_components)[n0.name],list(sorted_components)[n1.name]))
# components.sort(lambda n0, n1: cmp(sorted_components.index(n0.name), \
# sorted_components.index(n1.name)))
except Exception as e:
warning("Topological sort failed a second time: " + str(e))
return components
[docs] def parse_components(self, targets):
"""
Build a dictionary containing dictionarys describing each
component of the cellml model
"""
# First parse connections which is used to determine the interface of
# each variable
self.new_variable_connections, self.same_variable_connections = \
self.parse_connections()
# Parse imported components
components, collected_states, collected_parameters, \
collected_equations = self.parse_imported_model()
# Get parent relationship between components
encapsulations, all_parents = self.get_parents(self._params.grouping)
if targets:
# If the parent information was not of type encapsulation
# regather parent information
if self._params.grouping != "encapsulation":
encapsulations, dummy = self.get_parents("encapsulation")
# Add any encapsulated components to the target list
for target, new_target_name in list(targets.items()):
if target in encapsulations:
for child in encapsulations[target]["children"]:
targets[child] = child.replace(\
target, new_target_name)
target_parents = dict()
# Update all_parents
for comp_name, parent_name in list(all_parents.items()):
if parent_name not in targets:
continue
target_parents[targets[comp_name]] = targets[parent_name]
# Iterate over the components
for comp in self.get_iterator("component"):
comp_name = comp.attrib["name"]
# Only parse selected and non-empty components
if (targets and comp_name not in targets) or \
len(comp.getchildren()) == 0:
continue
# If targets provides a name mapping give the component a new name
if targets and isinstance(targets, dict):
new_name = targets[comp_name]
comp.attrib["name"] = new_name
comp_name = new_name
# Store component
components[comp_name] = self.parse_single_component(\
comp, collected_states, collected_parameters, collected_equations)
# Add parent information
all_component_names = list(components.keys())
for name, comp in list(components.items()):
if targets:
parent_name = target_parents.get(name)
else:
parent_name = all_parents.get(name)
if parent_name:
comp.parent = components[parent_name]
# If parent name in child name, reduce child name length
if self._params.strip_parent_name and parent_name in comp.name:
old_name = comp.name
new_name = old_name.replace(parent_name, "").strip("_")
if new_name not in all_component_names:
comp.name = new_name
all_component_names.remove(old_name)
all_component_names.append(new_name)
components[parent_name].children.append(comp)
# If we only extract a sub set of component we do not sort
if targets:
return list(components.values())
# Before dependencies are checked we change names according to
# variable mappings in the original CellML file
if self.new_variable_connections:
begin_log("\nRenaming variables through connections")
for comp, variables in list(self.new_variable_connections.items()):
# Iterate over old and new names
for oldname, newnames in list(variables.items()):
# Check that the direction of the connection is correct
# If no variable info is registered for this comp or its
# children it is the wrong direction
#print
#print comp, oldname
#print components[comp].variable_info.keys()
#for child in components[comp].children:
# print child.name, child.variable_info.keys()
# if oldname in child.variable_info.keys():
# print "HURRAY"
# print child.variable_info[oldname]
if oldname not in components[comp].variable_info and all(\
oldname not in child.variable_info \
for child in components[comp].children):
continue
# Try access var_info
var_info = components[comp].variable_info.get(oldname)
# If not we try components children
if var_info is None:
for child in components[comp].children:
if oldname in child.variable_info:
var_info = child.variable_info[oldname]
break
#print comp, oldname
#print var_info
# If variable is not intended out
if not (var_info.get("public_interface")=="out" or \
var_info.get("private_interface")=="out"):
continue
# Check if the oldname is used in any components
old_name_used = self.same_variable_connections[comp].get(oldname)
# If there are only one newname we change the name of the
# original equation
if len(newnames) == 1:
# If not old name used or old name only used in children and
# we then assume it is private to the component relationship
# parent-child
if not old_name_used or all(components[other_comp] in \
components[comp].children for other_comp in old_name_used):
# Get new name
newname = list(newnames.keys())[0]
# If oldname in component
if components[comp].variable_info.get(oldname) is not None:
components[comp].change_variable_name(oldname, newname)
if old_name_used:
for other_comp in old_name_used:
components[other_comp].change_variable_name(\
oldname, newname)
for child in components[comp].children:
if child.variable_info.get(oldname) is not None:
child.change_variable_name(oldname, newname)
else:
# FIXME: Add equation with name change to component
pass
else:
# FIXME: Add equation with name change to component
pass
if self.new_variable_connections:
end_log()
# Add dependencies and sort the components accordingly
return self.sort_components(list(components.values()))
[docs] def parse_connections(self):
new_variable_names = dict()
same_variable_names = dict()
for con in self.get_iterator("connection"):
con_map = list(self.get_iterator("map_components", con))[0]
comp1 = con_map.attrib["component_1"]
comp2 = con_map.attrib["component_2"]
for var_map in self.get_iterator("map_variables", con):
var1 = var_map.attrib["variable_1"]
var2 = var_map.attrib["variable_2"]
if var1 != var2:
if comp1 not in new_variable_names:
new_variable_names[comp1] = {var1:defaultdict(list)}
elif var1 not in new_variable_names[comp1]:
new_variable_names[comp1][var1] = defaultdict(list)
new_variable_names[comp1][var1][var2].append(comp2)
# Register both directions
if comp2 not in new_variable_names:
new_variable_names[comp2] = {var2:defaultdict(list)}
elif var2 not in new_variable_names[comp2]:
new_variable_names[comp2][var2] = defaultdict(list)
new_variable_names[comp2][var2][var1].append(comp1)
else:
if comp1 not in same_variable_names:
same_variable_names[comp1] = {var1:[comp2]}
elif var1 not in same_variable_names[comp1]:
same_variable_names[comp1][var1] = [comp2]
else:
same_variable_names[comp1][var1].append(comp2)
# Register both directions
if comp2 not in same_variable_names:
same_variable_names[comp2] = {var2:[comp1]}
elif var2 not in same_variable_names[comp2]:
same_variable_names[comp2][var2] = [comp1]
else:
same_variable_names[comp2][var2].append(comp1)
return new_variable_names, same_variable_names
[docs] def to_gotran(self):
"""
Generate a gotran file
"""
gotran_lines = []
for docline in self.documentation.split("\n"):
gotran_lines.append("# " + docline)
if gotran_lines:
gotran_lines.extend([""])
# Add component info
declaration_lines = []
equation_lines = []
def unders_score_replace(comp):
new_name = comp.name.replace("_", " ")
# If only 1 state it might be included in the name
state_name = ""
if len(comp.state_variables) == 1:
state_name = list(comp.state_variables.keys())[0]
if state_name in comp.name:
new_name = state_name.join(part.replace("_", " ") \
for part in comp.name.split(state_name))
# Captitalize first word
single_words = new_name.split(" ")
if len(single_words[0]) > 1 and "_" not in single_words[0]:
single_words[0] = single_words[0][0].upper()+single_words[0][1:]
# If first word is only a character we assume the first two
# words to stick together
elif len(single_words[0]) == 1 and len(single_words)>1 and \
single_words[0] != state_name:
single_words = [single_words[0]+"_"+single_words[1]] + single_words[2:]
return " ".join(single_words)
# Iterate over components and collect stuff
for comp in self.components:
names = deque([unders_score_replace(comp)])
parent = comp.parent
while parent is not None:
names.appendleft(unders_score_replace(parent))
parent = parent.parent
comp_name = ", ".join("\"{0}\"".format(name) for name in names)
# Collect initial state values
if comp.state_variables:
declaration_lines.append("")
declaration_lines.append("states({0},".format(comp_name))
for name, info in list(comp.state_variables.items()):
if info["unit"] != "1":
declaration_lines.append(" {0} = ScalarParam({1}"\
", unit=\"{2}\"),".format(name, float(info["init"]), info["unit"]))
else:
declaration_lines.append(" {0} = {1},".format(\
name, info["init"]))
declaration_lines[-1] = declaration_lines[-1][:-1]+")"
# Collect initial parameters values
if comp.parameters:
declaration_lines.append("")
declaration_lines.append("parameters({0},".format(comp_name))
for name, info in list(comp.parameters.items()):
if info["unit"] != "1":
declaration_lines.append(" {0} = ScalarParam({1}"\
", unit=\"{2}\"),".format(name, float(info["init"]), info["unit"]))
else:
declaration_lines.append(" {0} = {1},".format(\
name, info["init"]))
declaration_lines[-1] = declaration_lines[-1][:-1]+")"
# Collect all intermediate equations
if comp.equations:
equation_lines.append("")
equation_lines.append("expressions({0})".format(\
comp_name))
for eq in comp.equations:
expr = []
for eqi in eq.expr:
if eqi.isdigit():
expr.append(str(float(eqi)))
else:
expr.append(eqi)
eq.expr = expr
equation_lines.extend("{0} = {1}{2}".format(\
eq.name, "".join(eq.expr), " # {0}".format(\
comp.variable_info[eq.name]["unit"]) \
if comp.variable_info[eq.name]["unit"] != "1" else "")\
for eq in comp.equations)
gotran_lines.append("# gotran file generated by cellml2gotran from "\
"{0}".format(self.model_source))
gotran_lines.extend(declaration_lines)
gotran_lines.extend(equation_lines)
gotran_lines.append("")
gotran_lines.append("")
# Return joined lines
return "\n".join(gotran_lines)
# Write file
open("{0}.ode".format(self.name), "w").write()
[docs]def cellml2ode(model_source, **options):
"""
Convert a CellML model into an ode
Arguments
---------
model_source: str
Path or url to CellML file
options : dict
Optional parameters to control cellml parser
"""
check_arg(model_source, str)
from gotran.model.loadmodel import exec_ode
params = CellMLParser.default_parameters()
params.update(options)
cellml = CellMLParser(model_source, params=params)
return exec_ode(cellml.to_gotran(), cellml.name)