r"""This module is used to by the CoProcessingScriptGenerator plugin to aid in
capturing ParaView state as CoProcessing python script.
This can capture the ParaView state in a Pipeline object that can then be used
in CoProcessing scripts. The entry point into this module is the function
DumpPipeline() which returns the Python trace script. Most of the other
functions can be considered internal.
Also refer to paraview.cpexport Module which is used to generate a complete
Python CoProcessing script that can be used with in a vtkCPPythonScriptPipeline.
"""
from paraview import smtrace, smstate, servermanager
[docs]class cpstate_globals: pass
[docs]def reset_cpstate_globals():
cpstate_globals.write_frequencies = {}
cpstate_globals.simulation_input_map = {}
cpstate_globals.view_proxies = []
cpstate_globals.screenshot_info = {}
cpstate_globals.export_rendering = False
cpstate_globals.cinema_tracks = {}
cpstate_globals.cinema_arrays = {}
reset_cpstate_globals()
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]class ProducerAccessor(smtrace.RealProxyAccessor):
"""This accessor is created instead of the standard one for proxies that
have been marked as simulation inputs. This accessor override the
trace_ctor() method to trace the constructor as the CreateProducer() call,
since the proxy is a dummy, in this case.
"""
def __init__(self, varname, proxy, simname):
self.SimulationInputName = simname
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
# this cpSimulationInput attribute is used to locate the proxy later on.
proxy.SMProxy.cpSimulationInput = simname
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
trace = smtrace.TraceOutput()
trace.append("# create a producer from a simulation input")
trace.append("%s = coprocessor.CreateProducer(datadescription, '%s')" % \
(self, self.SimulationInputName))
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class SliceAccessor(smtrace.RealProxyAccessor):
"""
augments traces of slice filters with information to explore the
parameter space for cinema playback (if enabled)
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor(\
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
if cpstate_globals.cinema_tracks and self.varname in cpstate_globals.cinema_tracks:
valrange = cpstate_globals.cinema_tracks[self.varname]
trace.append_separated(["# register the filter with the coprocessor's cinema generator"])
trace.append(["coprocessor.RegisterCinemaTrack('slice', %s, 'SliceOffsetValues', %s)" % (self, valrange)])
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class ContourAccessor(smtrace.RealProxyAccessor):
"""
augments traces of contour filters with information to explore the
parameter space for cinema playback (if enabled)
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor(\
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
if cpstate_globals.cinema_tracks and self.varname in cpstate_globals.cinema_tracks:
valrange = cpstate_globals.cinema_tracks[self.varname]
trace.append_separated(["# register the filter with the coprocessor's cinema generator"])
trace.append(["coprocessor.RegisterCinemaTrack('contour', %s, 'Isosurfaces', %s)" % (self, valrange)])
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class ClipAccessor(smtrace.RealProxyAccessor):
"""
augments traces of clip filters with information to explore the
parameter space for cinema playback (if enabled)
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args = None, skip_assignment = False):
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
if cpstate_globals.cinema_tracks and self.varname in cpstate_globals.cinema_tracks:
valrange = cpstate_globals.cinema_tracks[self.varname]
trace.append_separated(["# register the filter with the coprocessor's cinema generator"])
trace.append(["coprocessor.RegisterCinemaTrack('clip', %s, 'OffsetValues', %s)" % (self, valrange)])
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# TODO: Make Slice, Contour & Clip Accessors to share an interface to reduce code
# duplication. ArrayAccessor could be used as this interface.
# -----------------------------------------------------------------------------
[docs]class ArrayAccessor(smtrace.RealProxyAccessor):
''' Augments traces of filters by defining names of arrays to be explored.'''
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.varname = varname
[docs] def trace_ctor(self, ctor, filter, ctor_args = None, skip_assignment = False):
original_trace = smtrace.RealProxyAccessor.trace_ctor( \
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
if self.varname in cpstate_globals.cinema_arrays:
arrays = cpstate_globals.cinema_arrays[self.varname]
trace.append_separated(["# define target arrays of filters."])
trace.append(["coprocessor.AddArraysToCinemaTrack(%s, 'arraySelection', %s)" % (self, arrays)])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class ViewAccessor(smtrace.RealProxyAccessor):
"""Accessor for views. Overrides trace_ctor() to trace registering of the
view with the coprocessor. (I wonder if this registering should be moved to
the end of the state for better readability of the generated state files.
"""
def __init__(self, varname, proxy, proxyname):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
self.ProxyName = proxyname
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
original_trace = smtrace.RealProxyAccessor.trace_ctor(\
self, ctor, filter, ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
if self.ProxyName in cpstate_globals.screenshot_info:
trace.append_separated(["# register the view with coprocessor",
"# and provide it with information such as the filename to use,",
"# how frequently to write the images, etc."])
params = cpstate_globals.screenshot_info[self.ProxyName]
assert len(params) == 7
trace.append([
"coprocessor.RegisterView(%s," % self,
" filename='%s', freq=%s, fittoscreen=%s, magnification=%s, width=%s, height=%s, cinema=%s)" %\
(params[0], params[1], params[2], params[3], params[4], params[5], params[6]),
"%s.ViewTime = datadescription.GetTime()" % self])
trace.append_separator()
return trace.raw_data()
# -----------------------------------------------------------------------------
[docs]class WriterFilter(smtrace.PipelineProxyFilter):
[docs] def should_never_trace(self, prop):
"""overridden to never trace 'WriteFrequency' and 'FileName' properties
on writers."""
if prop.get_property_name() in ["WriteFrequency", "FileName"]: return True
return super(WriterFilter, self).should_never_trace(prop)
# -----------------------------------------------------------------------------
[docs]class WriterAccessor(smtrace.RealProxyAccessor):
"""Accessor for writers. Overrides trace_ctor() to use the actual writer
proxy name instead of the dummy-writer proxy's name. Also updates the
write_frequencies maintained in cpstate_globals with the write frequencies
for the writer.
"""
def __init__(self, varname, proxy):
smtrace.RealProxyAccessor.__init__(self, varname, proxy)
write_frequency = proxy.GetProperty("WriteFrequency").GetElement(0)
# Locate which simulation input this write is connected to, if any. If so,
# we update the write_frequencies datastructure accordingly.
sim_inputs = locate_simulation_inputs(proxy)
for sim_input_name in sim_inputs:
if not write_frequency in cpstate_globals.write_frequencies[sim_input_name]:
cpstate_globals.write_frequencies[sim_input_name].append(write_frequency)
cpstate_globals.write_frequencies[sim_input_name].sort()
[docs] def get_proxy_label(self, xmlgroup, xmlname):
pxm = servermanager.ProxyManager()
prototype = pxm.GetPrototypeProxy(xmlgroup, xmlname)
if not prototype:
# a bit of a hack but we assume that there's a stub of some
# writer that's not available in this build but is available
# with the build used by the simulation code (probably through a plugin)
# this stub must have the proper name in the coprocessing hints
print ("WARNING: Could not find", xmlname, "writer in", xmlgroup, \
"XML group. This is not a problem as long as the writer is available with " \
"the ParaView build used by the simulation code.")
ctor = servermanager._make_name_valid(xmlname)
else:
ctor = servermanager._make_name_valid(prototype.GetXMLLabel())
# TODO: use servermanager.ProxyManager().NewProxy() instead
# we create the writer proxy such that it is not registered with the
# ParaViewPipelineController, so its state is not sent to ParaView Live.
return "servermanager.%s.%s" % (xmlgroup, ctor)
[docs] def trace_ctor(self, ctor, filter, ctor_args=None, skip_assignment=False):
xmlElement = self.get_object().GetHints().FindNestedElementByName("WriterProxy")
xmlgroup = xmlElement.GetAttribute("group")
xmlname = xmlElement.GetAttribute("name")
write_frequency = self.get_object().GetProperty("WriteFrequency").GetElement(0)
filename = self.get_object().GetProperty("FileName").GetElement(0)
ctor = self.get_proxy_label(xmlgroup, xmlname)
original_trace = smtrace.RealProxyAccessor.trace_ctor(\
self, ctor, WriterFilter(), ctor_args, skip_assignment)
trace = smtrace.TraceOutput(original_trace)
trace.append_separated(["# register the writer with coprocessor",
"# and provide it with information such as the filename to use,",
"# how frequently to write the data, etc."])
trace.append("coprocessor.RegisterWriter(%s, filename='%s', freq=%s)" % \
(self, filename, write_frequency))
trace.append_separator()
return trace.raw_data()
[docs]def cp_hook(varname, proxy):
"""callback to create our special accessors instead of the standard ones."""
pname = smtrace.Trace.get_registered_name(proxy, "sources")
if pname:
if pname in cpstate_globals.simulation_input_map:
return ProducerAccessor(varname, proxy, cpstate_globals.simulation_input_map[pname])
elif proxy.GetHints() and proxy.GetHints().FindNestedElementByName("WriterProxy"):
return WriterAccessor(varname, proxy)
elif ("servermanager.Slice" in proxy.__class__().__str__() and
"Plane object" in proxy.__getattribute__("SliceType").__str__()):
return SliceAccessor(varname, proxy)
elif ("servermanager.Clip" in proxy.__class__().__str__() and
"Plane object" in proxy.__getattribute__("ClipType").__str__()):
return ClipAccessor(varname, proxy)
elif "servermanager.Contour" in proxy.__class__().__str__():
return ContourAccessor(varname, proxy)
else:
return ArrayAccessor(varname, proxy)
pname = smtrace.Trace.get_registered_name(proxy, "views")
if pname:
cpstate_globals.view_proxies.append(proxy)
return ViewAccessor(varname, proxy, pname)
raise NotImplementedError
[docs]class cpstate_filter_proxies_to_serialize(object):
"""filter used to skip views and representations a when export_rendering is
disabled."""
def __call__(self, proxy):
if not smstate.visible_representations()(proxy): return False
if (not cpstate_globals.export_rendering) and \
(proxy.GetXMLGroup() in ["views", "representations"]): return False
return True
# -----------------------------------------------------------------------------
[docs]def DumpPipeline(export_rendering, simulation_input_map, screenshot_info,
cinema_tracks, cinema_arrays):
"""
Method that will dump the current pipeline and return it as a string trace
- export_rendering : boolean telling if we want to export rendering
- simulation_input_map: string->string map with key being the proxyname
while value being the simulation input name.
- screenshot_info : map with information about screenshots
key -> view proxy name
value -> [filename, writefreq, fitToScreen,
magnification, width, height,
cinemacamera options]
- cinema_tracks : map with information about cinema tracks to record
key -> proxy name
value -> argument ranges
- cinema_arrays : map with information about value arrays to be exported
key -> proxy name
value -> list of array names
"""
# reset the global variables.
reset_cpstate_globals()
cpstate_globals.export_rendering = export_rendering
cpstate_globals.simulation_input_map = simulation_input_map
cpstate_globals.screenshot_info = screenshot_info
cpstate_globals.cinema_tracks = cinema_tracks
cpstate_globals.cinema_arrays = cinema_arrays
# Initialize the write frequency map
for key in cpstate_globals.simulation_input_map.values():
cpstate_globals.write_frequencies[key] = []
# Start trace
filter = cpstate_filter_proxies_to_serialize()
smtrace.RealProxyAccessor.register_create_callback(cp_hook)
state = smstate.get_state(filter=filter, raw=True)
smtrace.RealProxyAccessor.unregister_create_callback(cp_hook)
# iterate over all views that were saved in state and update write requencies
if export_rendering:
pxm = servermanager.ProxyManager()
for key, vtuple in screenshot_info.iteritems():
view = pxm.GetProxy("views", key)
if not view: continue
image_write_frequency = int(vtuple[1])
# Locate which simulation input this write is connected to, if any. If so,
# we update the write_frequencies datastructure accordingly.
sim_inputs = locate_simulation_inputs_for_view(view)
for sim_input_name in sim_inputs:
if not image_write_frequency in cpstate_globals.write_frequencies:
cpstate_globals.write_frequencies[sim_input_name].append(image_write_frequency)
cpstate_globals.write_frequencies[sim_input_name].sort()
# Create global fields values
pipelineClassDef = "\n"
pipelineClassDef += "# ----------------------- CoProcessor definition -----------------------\n\n"
# Create the resulting string that will contains the pipeline definition
pipelineClassDef += "def CreateCoProcessor():\n"
pipelineClassDef += " def _CreatePipeline(coprocessor, datadescription):\n"
pipelineClassDef += " class Pipeline:\n";
# add the traced code.
for original_line in state:
for line in original_line.split("\n"):
if line.find("import *") != -1 or \
line.find("#### import the simple") != -1:
continue
if line:
pipelineClassDef += " " + line + "\n"
else:
pipelineClassDef += "\n"
pipelineClassDef += " return Pipeline()\n";
pipelineClassDef += "\n"
pipelineClassDef += " class CoProcessor(coprocessing.CoProcessor):\n"
pipelineClassDef += " def CreatePipeline(self, datadescription):\n"
pipelineClassDef += " self.Pipeline = _CreatePipeline(self, datadescription)\n"
pipelineClassDef += "\n"
pipelineClassDef += " coprocessor = CoProcessor()\n";
pipelineClassDef += " # these are the frequencies at which the coprocessor updates.\n"
pipelineClassDef += " freqs = " + str(cpstate_globals.write_frequencies) + "\n"
pipelineClassDef += " coprocessor.SetUpdateFrequencies(freqs)\n"
pipelineClassDef += " return coprocessor\n"
return pipelineClassDef
#------------------------------------------------------------------------------
[docs]def run(filename=None):
"""Create a dummy pipeline and save the coprocessing state in the filename
specified, if any, else dumps it out on stdout."""
from paraview import simple, servermanager
simple.LoadDistributedPlugin("CatalystScriptGeneratorPlugin")
wavelet = simple.Wavelet(registrationName="Wavelet1")
contour = simple.Contour()
display = simple.Show()
view = simple.Render()
# create a new 'Parallel PolyData Writer'
parallelPolyDataWriter0 = simple.ParallelPolyDataWriter()
viewname = servermanager.ProxyManager().GetProxyName("views", view.SMProxy)
script = DumpPipeline(export_rendering=True,
simulation_input_map={"Wavelet1" : "input"},
screenshot_info={viewname : [ 'image.png', '1', '1', '2', '400', '400']},
cinema_tracks = {},
cinema_arrays = {})
if filename:
f = open(filename, "w")
f.write(script)
f.close()
else:
print ("# *** Generated Script Begin ***")
print (script)
print ("# *** Generated Script End ***")
if __name__ == "__main__":
run()