Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
backbone
backbone
Commits
8391e9c4
Commit
8391e9c4
authored
Sep 28, 2016
by
Pekka Savolainen
Browse files
Setup class is now in setup.py file. Cleaned up Setup class unittest file.
parent
33e517cf
Changes
6
Hide whitespace changes
Inline
Side-by-side
titan/models.py
View file @
8391e9c4
...
...
@@ -9,7 +9,7 @@ Note: These models have nothing to do with Balmorel, WILMAR or PSS-E.
import
logging
from
PyQt5.QtCore
import
Qt
,
QVariant
,
QAbstractItemModel
,
\
QAbstractListModel
,
QModelIndex
,
QSortFilterProxyModel
from
tool
import
Setup
from
setup
import
Setup
class
SetupModel
(
QAbstractItemModel
):
...
...
titan/setup.py
0 → 100644
View file @
8391e9c4
"""
Setup class.
:authors: Erkka Rinne <erkka.rinne@vtt.fi>, Pekka Savolainen <pekka.t.savolainen@vtt.fi>
:date: 26.10.2015
"""
import
os
import
shutil
import
fnmatch
import
logging
import
json
from
PyQt5.QtCore
import
pyqtSignal
,
pyqtSlot
from
config
import
INPUT_STORAGE_DIR
,
OUTPUT_STORAGE_DIR
from
metaobject
import
MetaObject
from
helpers
import
create_dir
,
find_in_latest_output_folder
class
Setup
(
MetaObject
):
"""Setup class."""
setup_finished_signal
=
pyqtSignal
()
def
__init__
(
self
,
name
,
description
,
project
,
parent
=
None
):
"""Class constructor.
Args:
name (str): Name of tool setup
description (str): Description
project (SceletonProject): The project this setup belongs to
parent (Setup): Parent setup of this setup
"""
super
().
__init__
(
name
,
description
)
self
.
_parent
=
parent
self
.
_children
=
list
()
self
.
is_root
=
False
if
name
==
'root'
:
self
.
is_root
=
True
self
.
project
=
project
self
.
tool
=
None
self
.
cmdline_args
=
""
self
.
tool_instances
=
[]
self
.
is_ready
=
False
self
.
_setup_process
=
None
self
.
_running_tool
=
None
# Create path to setup input directory (except for root)
if
not
self
.
is_root
:
self
.
input_dir
=
os
.
path
.
join
(
project
.
project_dir
,
INPUT_STORAGE_DIR
,
self
.
short_name
)
self
.
output_dir
=
self
.
input_dir
create_dir
(
self
.
input_dir
)
# If not root, add self to parent's children
if
parent
is
not
None
:
parent
.
_add_child
(
self
)
def
_add_child
(
self
,
child
):
"""Add children to Setup. Do not use outside this class!
Args:
child (Setup): Child Setup to add.
"""
self
.
_children
.
append
(
child
)
def
insert_child
(
self
,
position
,
child
):
"""Used to add children for Setup if it was created without parent.
Args:
position (int): Position to insert child Setup
child (Setup): Setup to insert
Returns:
Boolean variable depending on operation's success
"""
n
=
len
(
self
.
_children
)
if
position
<
0
:
# NOTE: Removed or position > n
logging
.
error
(
"Invalid position. Setup:{2} Position:{0} nr of children:{1}"
.
format
(
position
,
n
,
self
.
name
))
return
False
self
.
_children
.
insert
(
position
,
child
)
child
.
_parent
=
self
return
True
def
remove_child
(
self
,
position
):
"""Remove child Setup.
Args:
position (int): Position of removed child
"""
if
position
<
0
or
position
>
len
(
self
.
_children
):
return
False
child
=
self
.
_children
.
pop
(
position
)
child
.
_parent
=
None
return
True
def
child
(
self
,
row
):
"""Returns child Setup on given row.
Args:
row (int): Row number
"""
try
:
ch
=
self
.
_children
[
row
]
except
IndexError
:
ch
=
None
return
ch
def
child_count
(
self
):
"""Returns number of children."""
return
len
(
self
.
_children
)
def
parent
(
self
):
"""Returns the parent of this Setup."""
return
self
.
_parent
def
children
(
self
):
"""Returns the children of this Setup."""
return
self
.
_children
def
row
(
self
):
"""Returns the row on which this Setup is located."""
if
self
.
_parent
is
not
None
:
return
self
.
_parent
.
_children
.
index
(
self
)
return
0
def
log
(
self
,
tab_level
=-
1
):
"""[OBSOLETE] Returns Setup structure as a string.
Args:
tab_level (int): Tab level
"""
output
=
""
tab_level
+=
1
for
i
in
range
(
tab_level
):
output
+=
"
\t
"
output
+=
"|------"
+
self
.
short_name
+
"
\n
"
for
child
in
self
.
_children
:
output
+=
child
.
log
(
tab_level
)
tab_level
-=
1
output
+=
"
\n
"
return
output
def
attach_tool
(
self
,
tool
,
cmdline_args
=
""
):
"""Attach a tool to this Setup.
Args:
tool (Tool): The tool to be used in this process
cmdline_args (str, optional): Extra command line arguments for this tool
Returns:
True if successful, False if not
"""
# Create path for setup output directory
self
.
output_dir
=
os
.
path
.
join
(
self
.
project
.
project_dir
,
OUTPUT_STORAGE_DIR
,
self
.
short_name
)
if
not
create_dir
(
self
.
output_dir
):
logging
.
error
(
"Could not create output directory '{0}' for Setup '{1}'"
.
format
(
self
.
output_dir
,
self
.
name
))
return
False
if
self
.
tool
is
not
None
:
logging
.
info
(
"Replacing tool '{0}' with tool '{1}' in Setup '{2}'"
.
format
(
self
.
tool
.
name
,
tool
.
name
,
self
.
name
))
self
.
tool
=
tool
self
.
cmdline_args
=
cmdline_args
logging
.
debug
(
"Tool '{0}' with cmdline args '{1}' added to Setup '{2}'"
.
format
(
self
.
tool
.
name
,
self
.
cmdline_args
,
self
.
name
))
return
True
def
detach_tool
(
self
):
"""Remove Tool and command line arguments from Setup.
Used when Tool is changed."""
self
.
tool
=
None
self
.
cmdline_args
=
""
# Make input the same as output again so that finding files works correctly
self
.
output_dir
=
self
.
input_dir
# TODO: Add cleanup of output dir
def
get_input_files
(
self
):
"""Get list of all input files in this Setup."""
return
os
.
listdir
(
self
.
input_dir
)
def
get_output_files
(
self
):
"""Get list of all output files in this Setup.
Lists input files if there is no tool."""
return
os
.
listdir
(
self
.
output_dir
)
def
find_input_file
(
self
,
fname
,
is_ancestor
=
False
):
"""Find a given input file in the setup hierarchy.
Args:
fname (str): Input file name or pattern
is_ancestor (bool): Specifies if looking at an ancestor setup
Returns:
Full path to file
"""
if
self
.
is_root
:
return
None
# Look at own input
if
not
is_ancestor
:
if
fname
in
self
.
get_input_files
():
return
os
.
path
.
join
(
self
.
input_dir
,
fname
)
# Look from ancestor's output
# If ancestor has no Tool, then its output directory is the same as its input directory
elif
not
self
.
tool
:
# Same as self.output_dir == self.input_dir:
if
fname
in
self
.
get_output_files
():
return
os
.
path
.
join
(
self
.
output_dir
,
fname
)
# If ancestor has a Tool, then files must be searched from the most recent output directory
else
:
folders
=
self
.
get_output_files
()
# Find file in the output folder with the most recent timestamp
latest_folder_path
=
find_in_latest_output_folder
(
self
.
short_name
,
self
.
output_dir
,
folders
,
fname
)
if
latest_folder_path
:
return
os
.
path
.
join
(
latest_folder_path
,
fname
)
return
self
.
_parent
.
find_input_file
(
fname
,
is_ancestor
=
True
)
def
find_input_files
(
self
,
pattern
,
is_ancestor
=
False
,
used_filenames
=
None
):
"""Find all input files which match a pattern in the setup hierarchy.
Args:
pattern (str): Input file name or pattern
is_ancestor (bool): Specifies if looking at an ancestor setup
used_filenames (set): Set of filenames already used
Returns:
List of full paths to file
"""
if
self
.
is_root
:
return
list
()
filenames
=
set
()
if
not
used_filenames
else
used_filenames
filepaths
=
list
()
# Look in own input
if
not
is_ancestor
:
src_files
=
self
.
get_input_files
()
src_dir
=
self
.
input_dir
# ...or look from ancestor's output
elif
not
self
.
tool
:
# Setup does not have a tool so input and output folders are the same
src_files
=
self
.
get_output_files
()
src_dir
=
self
.
output_dir
else
:
# Setup has a tool so look from the most recent output folder
folders
=
self
.
get_output_files
()
# Get the most recent output folder file in the output folder with the most recent timestamp
src_dir
=
find_in_latest_output_folder
(
self
.
short_name
,
self
.
output_dir
,
folders
)
if
not
src_dir
:
src_dir
=
''
src_files
=
list
()
else
:
src_files
=
os
.
listdir
(
src_dir
)
logging
.
debug
(
"Looking for files matching pattern '{0}' in source dir: '{1}'"
.
format
(
pattern
,
src_dir
))
# Search for files
new_fnames
=
[
f
for
f
in
fnmatch
.
filter
(
src_files
,
pattern
)
if
f
not
in
filenames
]
filenames
.
update
(
new_fnames
)
filepaths
+=
[
os
.
path
.
join
(
src_dir
,
fname
)
for
fname
in
new_fnames
]
# Recourse to parent
filepaths
+=
self
.
_parent
.
find_input_files
(
pattern
,
is_ancestor
=
True
,
used_filenames
=
filenames
)
return
filepaths
def
save
(
self
,
path
=
''
):
"""[OBSOLETE] Save Setup object to disk.
Args:
path (str): File save path (project dir)
"""
if
path
==
''
:
logging
.
error
(
"No path given"
)
return
the_dict
=
{}
if
self
.
_parent
is
not
None
:
the_dict
[
'parent'
]
=
self
.
_parent
.
short_name
if
self
.
tool
:
the_dict
[
'processes'
]
=
[
self
.
tool
.
short_name
]
the_dict
[
'is_ready'
]
=
self
.
is_ready
jsonfile
=
os
.
path
.
join
(
path
,
'{}.json'
.
format
(
self
.
short_name
))
with
open
(
jsonfile
,
'w'
)
as
fp
:
json
.
dump
(
the_dict
,
fp
,
indent
=
4
)
def
execute
(
self
,
ui
):
"""Execute Setup.
Args:
ui (TitanUI): User interface
"""
logging
.
info
(
"Executing Setup '{}'"
.
format
(
self
.
name
))
if
self
.
is_ready
:
logging
.
debug
(
"Setup '{}' ready. Starting next Setup"
.
format
(
self
.
name
))
self
.
setup_finished_signal
.
emit
()
return
# Get Setup tool and command line arguments
if
not
self
.
tool
:
# No tool in setup
self
.
setup_finished
(
0
)
return
try
:
instance
=
self
.
tool
.
create_instance
(
ui
,
self
.
cmdline_args
,
self
.
output_dir
,
self
.
short_name
)
except
OSError
:
logging
.
error
(
"Tool instance creation failed"
)
ui
.
add_msg_signal
.
emit
(
"Creating a Tool instance failed"
,
2
)
return
# Connect instance_finished_signal to setup_finished() method
instance
.
instance_finished_signal
.
connect
(
self
.
setup_finished
)
self
.
tool_instances
.
append
(
instance
)
if
not
self
.
copy_input
(
self
.
tool
,
ui
,
instance
):
ui
.
add_msg_signal
.
emit
(
"Copying input files for Setup '{0}' failed"
.
format
(
self
.
name
),
2
)
return
instance
.
execute
(
ui
)
# Wait for instance_finished_signal to start setup_finished()
@
pyqtSlot
(
int
)
def
setup_finished
(
self
,
ret
):
"""Executed when tool has finished processing.
Args:
ret (int): Return code from sub-process
Returns:
True if tool was executed successfully, False otherwise
"""
if
ret
==
0
:
# logging.debug("Setup <%s> finished successfully. Setting is_ready to True" % self.name)
self
.
is_ready
=
True
else
:
logging
.
debug
(
"Setup <%s> failed"
%
self
.
name
)
self
.
is_ready
=
False
# Run next Setup
self
.
setup_finished_signal
.
emit
()
def
copy_input
(
self
,
tool
,
ui
,
tool_instance
=
None
):
"""Copy input of a tool in this setup to a tool instance.
Args:
tool (Tool): The tool
ui (TitanUI): Titan UI
tool_instance (ToolInstance): Tool instance. If not
none, execution is done in tool directory.
Returns:
ret (bool): Operation success
"""
if
tool
is
None
:
return
True
if
tool_instance
:
input_dir
=
tool_instance
.
basedir
# Run tool in work directory
else
:
input_dir
=
tool
.
path
# Run tool in /tools/ directory
ui
.
add_msg_signal
.
emit
(
"Copying input files for Tool '{}' to work directory"
.
format
(
tool
.
name
),
0
)
logging
.
info
(
"Copying input files for Tool '{}' to work directory"
.
format
(
tool
.
name
))
# Process required and optional input files
for
filepath
in
tool
.
infiles
|
tool
.
infiles_opt
:
prefix
,
filename
=
os
.
path
.
split
(
filepath
)
dst_dir
=
os
.
path
.
join
(
input_dir
,
prefix
)
# Create the destination directory
try
:
os
.
makedirs
(
dst_dir
,
exist_ok
=
True
)
except
OSError
as
e
:
logging
.
error
(
e
)
ui
.
add_msg_signal
.
emit
(
"Creating directory '{0}' failed. Check permissions."
.
format
(
dst_dir
),
2
)
return
False
if
'*'
in
filename
:
# Deal with wildcards
found_files
=
self
.
find_input_files
(
filename
)
else
:
found_file
=
self
.
find_input_file
(
filename
)
# Required file not found
if
filepath
in
tool
.
infiles
and
not
found_file
:
logging
.
error
(
"Could not find required input file '{}'"
.
format
(
filename
))
ui
.
add_msg_signal
.
emit
(
"Required input file '{0}' not found"
.
format
(
filename
),
2
)
return
False
# Construct a list
found_files
=
[
found_file
]
if
found_file
else
[]
# Do copying
for
src_file
in
found_files
:
try
:
ret
=
shutil
.
copy
(
src_file
,
dst_dir
)
logging
.
debug
(
"File '{}' copied to '{}'"
.
format
(
src_file
,
ret
))
except
OSError
:
logging
.
error
(
"Copying file '{}' to directory '{}' failed"
.
format
(
src_file
,
dst_dir
))
ui
.
add_msg_signal
.
emit
(
"Copying file '{0}' to directory '{1}' failed."
" Check directory permissions."
.
format
(
src_file
,
dst_dir
),
2
)
return
False
logging
.
info
(
"Finished copying input files for Tool '{}'"
.
format
(
tool
.
name
))
ui
.
add_msg_signal
.
emit
(
"Done"
,
1
)
return
True
def
cleanup
(
self
):
"""Remove temporary files of the setup. Removes Tool instance directories."""
# TODO: Check if this is needed
for
t
in
self
.
tool_instances
:
t
.
remove
()
titan/test/test_
s
etup.py
→
titan/test/test_
S
etup.py
View file @
8391e9c4
"""
Unit tests for Setup class [Needs updating).
Unit tests for Setup class.
Note: PyCharm did not like filename test_setup.py so this was renamed to test_Setup.py
@author: Pekka Savolainen <pekka.t.savolainen@vtt.fi>
@date: 26.2.2016
...
...
@@ -10,11 +11,11 @@ import os
import
sys
import
shutil
import
logging
as
log
from
tool
import
Setup
from
setup
import
Setup
import
tool
import
ui_main
from
GAMS
import
GAMSModel
from
config
import
APPLICATION_PATH
,
PROJECT_DIR
,
CONFIGURATION_FILE
,
GENERAL_OPTIONS
from
config
import
APPLICATION_PATH
,
CONFIGURATION_FILE
,
GENERAL_OPTIONS
from
helpers
import
copy_files
,
create_dir
,
create_output_dir_timestamp
from
project
import
SceletonProject
from
models
import
SetupModel
...
...
@@ -24,7 +25,6 @@ from configuration import ConfigurationParser
class
TestSetup
(
TestCase
):
# TODO: Mock create_dir so that directories are not actually created
# TODO: Fix or move execution tests
# add_msg_signal = mock.Mock() # Example of a mock signal
...
...
@@ -328,40 +328,6 @@ class TestSetup(TestCase):
retval
=
s_dummy_a
.
copy_input
(
t_magic_operation
,
gui
,
magic_operation_instance
)
self
.
assertTrue
(
retval
)
# # Add model into Setup. Creates model input directories.
# self.child_dummy_a.attach_tool(self.tool)
# # Add input formats to model
# self.tool.add_input_format(GDX_DATA_FMT)
# self.tool.add_input_format(GAMS_INC_FILE)
# # Copy dummy test files to setup model folders
# self.prepare_test_data()
# # DO THE ACTUAL TEST
# self.child_dummy_a.pop_model()
# gdx_count = 1
# inc_count = 1
# for fmt in self.child_dummy_a.running_model.input_formats:
# # get_input_files gives the names of the input files in a list
# filenames = self.child_dummy_a.get_input_files(self.child_dummy_a.running_model, fmt)
# count = 0
# for file in filenames:
# # log.info(".%s file #%d: %s" % (fmt.extension, count, file))
# count += 1
# if fmt.extension == 'gdx':
# gdx_count = count
# elif fmt.extension == 'inc':
# inc_count = count
# log.debug("GDX count:%d INC_count:%d" % (gdx_count, inc_count))
# # Simply check that 4 gdx files and 2 inc files were found
# self.assertEqual(gdx_count, 4)
# self.assertEqual(inc_count, 2)
# # # This is how you can check variables if a test fails
# # try:
# # self.assertTrue(False)
# # except AssertionError as e:
# # print("Assertion error caught")
# # # Do something with variables
# # raise e
@
mock
.
patch
(
'ui_main.TitanUI'
,
autospec
=
True
)
def
test_execute_base_setup
(
self
,
mock_ui
):
"""Execute Base Setup with no Tool."""
...
...
@@ -373,199 +339,6 @@ class TestSetup(TestCase):
base
.
execute
(
gui
)
self
.
assertTrue
(
base
.
is_ready
)
@
skip
(
"Method 'collect_input_files' does not exist"
)
def
test_collect_input_files
(
self
):
# TODO: Test here that changes in child Setup input files have an effect (change1.inc)
pass
# log.info("Testing collect_input_files()")
# # Delete files from test model input folders except .gitignore
# self.delete_files(self.tool.input_dir)
# # Add model into Setup. Creates model input directories.
# self.child_dummy_a.attach_tool(self.tool)
# # Add input formats to model
# self.tool.add_input_format(GDX_DATA_FMT)
# self.tool.add_input_format(GAMS_INC_FILE)
# # Copy dummy test files to setup model folders
# self.prepare_test_data()
# self.child_dummy_a.pop_model()
# # DO THE ACTUAL TEST
# self.child_dummy_a.collect_input_files()
# # Check that created changes.inc is the same as reference changes.inc
# changes_path = os.path.abspath(os.path.join(self.tool.input_dir, 'changes.inc'))
# changes_ref_path = os.path.abspath(os.path.join(APPLICATION_PATH, 'test', 'resources',
# 'test_input', 'changes_reference.inc'))
# # Check that both have the same number of lines
# with open(changes_path, 'r') as changes:
# n_changes = sum(1 for line in changes)
# with open(changes_ref_path, 'r') as ref:
# n_ref = sum(1 for line in ref)
# self.assertEqual(n_changes, n_ref, "Number of lines in changes.inc and reference file does not match")
# # Check files line by line
# mismatch_found = False
# n = 0
# with open(changes_path, 'r') as changes:
# with open(changes_ref_path, 'r') as ref:
# for line in changes:
# n += 1
# ref_line = ref.readline()
# if line == ref_line:
# # log.debug("\nLine #%d:>\n%smatches line:>\n%s" % (n, line, ref_line))
# pass
# else:
# log.debug("\nMismatch on Line #%d:>\n%sand line:>\n%s" % (n, line, ref_line))
# mismatch_found = True
# self.assertFalse(mismatch_found, "There was a mismatch in changes.inc and reference file")
@
skip
(
"Obsolete"
)
def
test_execute_and_model_finished
(
self
):
"""Executes a setup tree with base setup (no model) and one setup (gams model)
with base parent. Skips start_process() and goes straight to model_finished().
"""
log
.
info
(
"Testing execute() and model_finished()"
)
# Add input formats to model
self
.
tool
.
add_input_format
(
GDX_DATA_FMT
)
self
.
tool
.
add_input_format
(
GAMS_INC_FILE
)
self
.
tool
.
add_output_format
(
GDX_DATA_FMT
)
# Create data parameters
g
=
Dimension
(
'g'
,
'generators'
)
param
=
Dimension
(
'param'
,
'parameters'
)
data
=
DataParameter
(
'data'
,
'generation data'
,
'?'
,
[
g
,
param
])
# Add input data parameters to model
self
.
tool
.
add_input
(
data
)
log
.
disable
(
level
=
log
.
ERROR
)
# Add model into Setup. Creates model input & output directories.
self
.
child_dummy_a
.
attach_tool
(
self
.
tool
,
'MIP=CPLEX'
)
log
.
disable
(
level
=
log
.
NOTSET
)
# Copy dummy test files to setup model folders
self
.
prepare_test_data
()
# DO THE ACTUAL TEST
log
.
debug
(
"STARTING EXECUTE"
)
# effects = [self.side_effect_a, Exception("No more side effects left")]
effects
=
[
self
.
child_dummy_a
.
model_finished
,
Exception
(
"No more effects left"
)]
def
side_effect
(
*
args
,
**
kwargs
):
effect
=
effects
.
pop
(
0
)
if
isinstance
(
effect
,
Exception
):
raise
effect
gams_ret_val
=
0
log
.
debug
(
"Calling %s(%d) for '%s'"
%
(
effect
.
__name__
,
gams_ret_val
,
effect
.
__self__
.
name
))
effect
(
gams_ret_val
)
# Calls popped method with argument
# effect(*args, **kwargs) # Calls the method
# THIS WORKS!!!!!
with
mock
.
patch
(
'tool.qsubprocess.QSubProcess.start_process'
,
side_effect
=
side_effect
)
as
mock_start_process
:
self
.
child_dummy_a
.
execute
()
# log.debug("Mock calls: %s" % mock_start_process.mock_calls)
assert
mock_start_process
is
tool
.
qsubprocess
.
QSubProcess
.
start_process
log
.
debug
(
"EXECUTE FINISHED"
)
# Assert that Base and Setup is_ready is True
self
.
assertTrue
(
self
.
base_setup
.
is_ready
)
self
.
assertTrue
(
self
.
child_dummy_a
.
is_ready
)
@
skip
(
"Obsolete"
)
def
test_execute_with_multiple_models_in_setup
(
self
):
"""Testing execute with base setup (no model) and child setup (2 GAMS models)."""
log
.
info
(
"Testing execute() with multiple models in a setup"
)
# Add input formats to model
self
.
tool
.
add_input_format
(
GDX_DATA_FMT
)
self
.
tool
.
add_input_format
(
GAMS_INC_FILE
)
self
.
tool
.
add_output_format
(
GDX_DATA_FMT
)
# Create data parameters
g
=
Dimension
(
'g'
,
'generators'
)
param
=
Dimension
(
'param'
,
'parameters'
)
data
=
DataParameter
(
'data'
,
'generation data'
,
'?'
,
[
g
,
param
])
# Add input data parameters to model
self
.
tool
.
add_input
(
data
)
log
.
disable
(
level
=
log
.
ERROR
)
# Add two models into Setup child_dummy_a. Creates model input & output directories.
self
.
child_dummy_a
.
attach_tool
(
self
.
tool
,
'MIP=CPLEX'
)
self
.
child_dummy_a
.
attach_tool
(
self
.
tool_two
)
log
.
disable
(
level
=
log
.
NOTSET
)
effects
=
[
self
.
child_dummy_a
.
model_finished
,
self
.
child_dummy_a
.
model_finished
,
Exception
(
"No effects left"
)]
def
side_effect
(
*
args
,
**
kwargs
):
effect
=
effects
.
pop
(
0
)
if
isinstance
(
effect
,
Exception
):
raise
effect
gams_ret_val
=
0
log
.
debug
(
"Calling %s(%d) for '%s'"
%
(
effect
.
__name__
,
gams_ret_val
,
effect
.
__self__
.
name
))
effect
(
gams_ret_val
)
# Calls popped method with argument
with
mock
.
patch
(
'tool.qsubprocess.QSubProcess.start_process'
,
side_effect
=
side_effect
)
as
mock_start_process
:
self
.
child_dummy_a
.
execute
()
assert
mock_start_process
is
tool
.
qsubprocess
.
QSubProcess
.
start_process
self
.
assertTrue
(
self
.
base_setup
.
is_ready
)
self
.
assertTrue
(
self
.
child_dummy_a
.
is_ready
)
@
skip
(
"Obsolete"
)
def
test_execute_with_three_setups_and_two_models
(
self
):
"""Test execute with three setups: base -> setup_dummy_a -> setup_dummy_b.
setup_dummy_a and setup_dummy_b have a GAMS model."""
log
.
debug
(
"Testing execute() with three setups and two models"
)
log
.
disable
(
level
=
log
.
ERROR
)
self
.
child_dummy_a
.
attach_tool
(
self
.
tool
,
'MIP=CPLEX'
)
self
.
child_dummy_b
=
Setup
(
'Setup Dummy B'
,
'Setup with parent A'
,
self
.
project
,
self
.
child_dummy_a
)
self
.
child_dummy_b
.
attach_tool
(
self
.
tool_two
)
log
.
disable
(
level
=
log
.
NOTSET
)
effects
=
[
self
.
child_dummy_a
.
model_finished
,
self
.
child_dummy_b
.
model_finished
,
Exception
(
"No effects left"
)]
def
side_effect
(
*
args
,
**
kwargs
):
effect
=
effects
.
pop
(
0
)
if
isinstance
(
effect
,
Exception
):
raise
effect
gams_ret_val
=
0
log
.
debug
(
"Calling %s(%d) for '%s'"
%
(
effect
.
__name__
,
gams_ret_val
,
effect
.
__self__
.
name
))
effect
(
gams_ret_val
)
# Calls popped method with argument