Source code for gotran.codegeneration.compilemodule

# Copyright (C) 2012 Johan Hake
#
# This file is part of Gotran.
#
# Gotran is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Gotran is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Gotran. If not, see <http://www.gnu.org/licenses/>.

import sys
import os
import re
import numpy
import instant
import hashlib
import types

import gotran
from gotran.common import check_arg, check_kwarg, push_log_level, \
     pop_log_level, info, INFO, value_error
from gotran.model.ode import ODE
from gotran.model.loadmodel import load_ode
from gotran.common.options import parameters
from gotran.codegeneration.codegenerators import PythonCodeGenerator, \
     CCodeGenerator, class_name, DOLFINCodeGenerator
from gotran.codegeneration.algorithmcomponents import rhs_expressions, \
     monitored_expressions, jacobian_expressions

# Set log level of instant
instant.set_log_level("WARNING")

__all__ = ["compile_module"]

additional_declarations_ = r"""
%init%{{
import_array();
%}}

%include <exception.i>
%feature("autodoc", "1");

// Typemaps
%typemap(in) (double* {rhs_name})
{{
  // Check type
  if (!PyArray_Check($input))
    SWIG_exception(SWIG_TypeError, "Numpy array expected");

  // Get PyArrayObject
  PyArrayObject *xa = (PyArrayObject *)$input;

  // Check data type
  if (!(PyArray_ISCONTIGUOUS(xa) && PyArray_TYPE(xa) == NPY_DOUBLE))
    SWIG_exception(SWIG_TypeError, "Contigous numpy array of doubles "
                  "expected. Make sure the numpy array is contiguous, "
                  "and uses dtype=float_.");

  // Check size of passed array
  if ( PyArray_SIZE(xa) != {num_states} )
    SWIG_exception(SWIG_ValueError, "Expected a numpy array of size: "
                                  "{num_states}, for the {rhs_name} argument.");

  $1 = (double *)PyArray_DATA(xa);
}}

%typemap(in) (const double* {states_name})
{{
  // Check type
  if (!PyArray_Check($input))
    SWIG_exception(SWIG_TypeError, "Numpy array expected");

  // Get PyArrayObject
  PyArrayObject *xa = (PyArrayObject *)$input;

  // Check data type
  if (!(PyArray_ISCONTIGUOUS(xa) && PyArray_TYPE(xa) == NPY_DOUBLE))
    SWIG_exception(SWIG_TypeError, "Contigous numpy array of doubles expected."
           " Make sure the numpy array is contiguous, and uses dtype=float_.");

  // Check size of passed array
  if ( PyArray_SIZE(xa) != {num_states} )
    SWIG_exception(SWIG_ValueError, "Expected a numpy array of size: "
                              "{num_states}, for the {states_name} argument.");

  $1 = (double *)PyArray_DATA(xa);
}}

%typemap(in) (double* {states_name})
{{
  // Check type
  if (!PyArray_Check($input))
    SWIG_exception(SWIG_TypeError, "Numpy array expected");

  // Get PyArrayObject
  PyArrayObject *xa = (PyArrayObject *)$input;

  // Check data type
  if (!(PyArray_ISCONTIGUOUS(xa) && PyArray_TYPE(xa) == NPY_DOUBLE))
    SWIG_exception(SWIG_TypeError, "Contigous numpy array of doubles expected."
           " Make sure the numpy array is contiguous, and uses dtype=float_.");

  // Check size of passed array
  if ( PyArray_SIZE(xa) != {num_states} )
    SWIG_exception(SWIG_ValueError, "Expected a numpy array of size: "
                              "{num_states}, for the {states_name} argument.");

  $1 = (double *)PyArray_DATA(xa);
}}

%typemap(in) (double* {parameters_name})
{{
  // Check type
  if (!PyArray_Check($input))
    SWIG_exception(SWIG_TypeError, "Numpy array expected");

  // Get PyArrayObject
  PyArrayObject *xa = (PyArrayObject *)$input;

  // Check data type
  if (!(PyArray_ISCONTIGUOUS(xa) && PyArray_TYPE(xa) == NPY_DOUBLE))
    SWIG_exception(SWIG_TypeError, "Contigous numpy array of doubles expected."
           " Make sure the numpy array is contiguous, and uses dtype=float_.");

  // Check size of passed array
  if ( PyArray_SIZE(xa) != {num_parameters} )
    SWIG_exception(SWIG_ValueError, "Expected a numpy array of size: "
             "{num_parameters}, for the {parameters_name} argument.");

  $1 = (double *)PyArray_DATA(xa);

}}

// The typecheck
%typecheck(SWIG_TYPECHECK_DOUBLE_ARRAY) double *
{{
    $1 = PyArray_Check($input) ? 1 : 0;
}}

%typecheck(SWIG_TYPECHECK_DOUBLE_ARRAY) const double *
{{
    $1 = PyArray_Check($input) ? 1 : 0;
}}

%pythoncode%{{
def {rhs_function_name}({args},{rhs_name} = None):
    '''
    Evaluates the right hand side of the model

    Arguments
    ---------
{args_doc}
    {rhs_name} : np.ndarray (optional)
        The computed result
    '''
    import numpy as np
    if {rhs_name} is None:
        {rhs_name} = np.zeros_like({states_name})

    _{rhs_function_name}({args}, {rhs_name})
    return {rhs_name}

{python_code}
%}}

%rename (_{rhs_function_name}) {rhs_function_name};

{jacobian_declaration}

{monitor_declaration}
"""

jacobian_declaration_template_ = """
// Typemaps
%typemap(in) (double* {jac_name})
{{
  // Check type
  if (!PyArray_Check($input))
    SWIG_exception(SWIG_TypeError, "Numpy array expected");

  // Get PyArrayObject
  PyArrayObject *xa = (PyArrayObject *)$input;

  // Check data type
  if (!(PyArray_ISCONTIGUOUS(xa) && PyArray_TYPE(xa) == NPY_DOUBLE))
    SWIG_exception(SWIG_TypeError, "Contigous numpy array of doubles "
                  "expected. Make sure the numpy array is contiguous, "
                  "and uses dtype=float_.");

  // Check size of passed array
  if ( PyArray_SIZE(xa) != {num_states}*{num_states} )
    SWIG_exception(SWIG_ValueError, "Expected a numpy array of size: "
                                  "{num_states}*{num_states}, for the {jac_name} argument.");

  $1 = (double *)PyArray_DATA(xa);
}}

%pythoncode%{{
def {jacobian_function_name}({args}, {jac_name}=None):
    '''
    Evaluates the jacobian of the model

    Arguments
    ---------
{args_doc}
    {jac_name} : np.ndarray (optional)
        The computed result
    '''
    import numpy as np
    if {jac_name} is None:
        {jac_name} = np.zeros({num_states}*{num_states}, dtype=np.float_)
    elif not isinstance({jac_name}, np.ndarray):
        raise TypeError(\"expected a NumPy array.\")
    elif len({jac_name}.shape) != 2 or {jac_name}.shape[0] != {jac_name}.shape[1] or {jac_name}.shape[0] != {num_states}:
        raise ValueError(\"expected a square shaped matrix with size ({num_states}, {num_states})\")
    else:
        # Flatten Matrix
        {jac_name}.shape = ({num_states}*{num_states},)

    _{jacobian_function_name}({args}, {jac_name})
    {jac_name}.shape = ({num_states},{num_states})
    return {jac_name}
%}}

%rename (_{jacobian_function_name}) {jacobian_function_name};
"""

monitor_declaration_template = """
%typemap(in) (double* {monitored_name})
{{
  // Check type
  if (!PyArray_Check($input))
    SWIG_exception(SWIG_TypeError, "Numpy array expected");

  // Get PyArrayObject
  PyArrayObject *xa = (PyArrayObject *)$input;

  // Check data type
  if (!(PyArray_ISCONTIGUOUS(xa) && PyArray_TYPE(xa) == NPY_DOUBLE))
    SWIG_exception(SWIG_TypeError, "Contigous numpy array of doubles expected."
           " Make sure the numpy array is contiguous, and uses dtype=float_.");

  // Check size of passed array
  if ( PyArray_SIZE(xa) != {num_monitored} )
    SWIG_exception(SWIG_ValueError, "Expected a numpy array of size: "
             "{num_monitored}, for the {monitored_name} argument.");

  $1 = (double *)PyArray_DATA(xa);

}}

%pythoncode%{{
def {monitored_function_name}({args}, {monitored_name}=None):
    '''
    Evaluates any monitored intermediates of the model

    Arguments
    ---------
{args_doc}
    {monitored_name} : np.ndarray (optional)
        The computed result
    '''
    import numpy as np
    if {monitored_name} is None:
        {monitored_name} = np.zeros({num_monitored}, dtype=np.float_)
    elif not isinstance({monitored_name}, np.ndarray):
        raise TypeError(\"expected a NumPy array.\")
    elif len({monitored_name}) != {num_monitored}:
        raise ValueError(\"expected a numpy array of size: {num_monitored}\")

    _{monitored_function_name}({args}, {monitored_name})
    return {monitored_name}
%}}

%rename (_{monitored_function_name}) {monitored_function_name};
"""

[docs]def compile_module(ode, language="C", monitored=None, generation_params=None, additional_declarations=None, jacobian_declaration_template=None): """ JIT compile an ode Arguments --------- ode : gotran.ODE, str The gotran ode language : str (optional) The language of the generated code Defaults : 'C' ['C', 'Python'] monitored : list A list of names of intermediates of the ODE. Code for monitoring the intermediates will be generated. generation_params : dict Parameters controling the code generation """ monitored = monitored or [] generation_params = generation_params or {} check_arg(ode, (ODE, str)) if isinstance(ode, str): ode = load_ode(ode) check_kwarg(language, "language", str) language = language.capitalize() valid_languages = ["C", "Python", 'Dolfin'] if language not in valid_languages: value_error("Expected one of {0} for the language kwarg.".format(\ ", ".join("'{0}'".format(lang) for lang in valid_languages))) params = parameters.generation.copy() params.update(generation_params) if language == "C": return compile_extension_module(ode, monitored, params, additional_declarations, jacobian_declaration_template) # Create unique module name for this application run modulename = "gotran_{0}_module_{1}_{2}".format(\ language.lower(), class_name(ode.name), hashlib.sha1(str(\ ode.signature() + str(monitored) + repr(params) + \ gotran.__version__ + instant.__version__).encode('utf-8')).hexdigest()) # Check cache python_module = instant.import_module(modulename) if python_module: return getattr(python_module, class_name(ode.name))() # No module in cache generate python version if language == 'Dolfin': pgen = DOLFINCodeGenerator(params) else: pgen = PythonCodeGenerator(params) # Generate class code, execute it and collect namespace code = "from __future__ import division\nimport numpy as np\nimport math" + pgen.class_code(ode, monitored=monitored) # Make a temporary module path for compilation module_path = os.path.join(instant.get_temp_dir(), modulename) instant.instant_assert(not os.path.exists(module_path), "Not expecting module_path to exist: '{}'".format(module_path)) instant.makedirs(module_path) original_path = os.getcwd() try: module_path = os.path.abspath(module_path) os.chdir(module_path) instant.write_file("__init__.py", code) module_path = instant.copy_to_cache(\ module_path, instant.get_default_cache_dir(), modulename) finally: # Always get back to original directory. os.chdir(original_path) python_module = instant.import_module(modulename) return getattr(python_module, class_name(ode.name))()
def compile_extension_module(ode, monitored, params, additional_declarations=None, jacobian_declaration_template=None): """ Compile an extension module, based on the C code from the ode """ # Add function prototype args=[] args_doc=[] for arg in params.code.default_arguments: if arg == "s": args.append("states") args_doc.append(""" {0} : np.ndarray The state values""".format(params.code.states.array_name)) elif arg == "t": args.append("time") args_doc.append(""" time : scalar The present time""") elif arg == "p" and \ params.code.parameters.representation != "numerals": args.append("parameters") args_doc.append(""" {0} : np.ndarray The parameter values""".format(params.code.parameters.array_name)) elif arg == "b" and \ params.code.body.representation != "named": args.append("body") args_doc.append(""" {0} : np.ndarray The body values""".format(params.code.body.array_name)) # Create unique module name for this application run modulename = "gotran_compiled_module_{0}_{1}".format(\ class_name(ode.name), hashlib.sha1(str(\ ode.signature() + str(monitored) + repr(params) + \ gotran.__version__ + instant.__version__).encode('utf-8')).hexdigest()) # Check cache compiled_module = instant.import_module(modulename) if compiled_module: return compiled_module args = ", ".join(args) args_doc = "\n".join(args_doc) # Do not generate any Python functions python_params = params.copy() for name in python_params.functions: if name == "monitored": continue python_params.functions[name].generate = False jacobian_declaration = "" monitor_declaration = "" if params.functions.jacobian.generate: # Flatten jacobian params if not params.code.array.flatten: debug("Generating jacobian C-code, forcing jacobian array "\ "to be flattened.") params.code.array.flatten = True jacobian_declaration_template = jacobian_declaration_template_ if \ jacobian_declaration_template is None \ else jacobian_declaration_template jacobian_declaration = jacobian_declaration_template.format(\ num_states = ode.num_full_states, args=args, args_doc=args_doc, jac_name=params.functions.jacobian.result_name, jacobian_function_name=params.functions.jacobian.function_name, ) if monitored and params.functions.monitored.generate: monitor_declaration = monitor_declaration_template.format(\ num_states = ode.num_full_states, num_monitored = len(monitored), args=args, args_doc=args_doc, monitored_name=params.functions.monitored.result_name, monitored_function_name=params.functions.monitored.function_name, ) pgen = PythonCodeGenerator(python_params) cgen = CCodeGenerator(params) pcode = "\n\n".join(\ list(pgen.code_dict(ode, monitored=monitored).values())) ccode = "\n\n".join(list(cgen.code_dict(ode, monitored=monitored, include_init=False, include_index_map=False).values())) # push_log_level(INFO) info("Calling GOTRAN just-in-time (JIT) compiler, this may take some "\ "time...") sys.stdout.flush() # Configure instant and add additional system headers instant_kwargs = configure_instant() declaration_form = dict(\ num_states = ode.num_full_states, num_parameters = ode.num_parameters, num_monitored = len(monitored), python_code = pcode, args=args, args_doc=args_doc, jacobian_declaration=jacobian_declaration, rhs_name=params.functions.rhs.result_name, rhs_function_name=params.functions.rhs.function_name, states_name=params.code.states.array_name, parameters_name=params.code.parameters.array_name, monitor_declaration=monitor_declaration, ) additional_declarations = additional_declarations_ if \ additional_declarations is None \ else additional_declarations # Compile extension module with instant compiled_module = instant.build_module(\ code = ccode, additional_declarations = additional_declarations.format(\ **declaration_form), signature = modulename, **instant_kwargs) info(" done") # pop_log_level() sys.stdout.flush() return compiled_module def configure_instant(): """ Check system requirements Returns a dict with kwargs that can be passed to instant.build_module. """ instant_kwargs = {} swig_include_dirs = [] instant_kwargs['swig_include_dirs'] = swig_include_dirs instant_kwargs['include_dirs'] = [numpy.get_include()] instant_kwargs['system_headers'] = ["numpy/arrayobject.h", "math.h"]#, "complex.h"] instant_kwargs['swigargs'] =['-O -c++'] instant_kwargs['cppargs'] = ['-O2'] return instant_kwargs