r"""Module for generating a Python state for ParaView.
This module uses paraview.smtrace to generate a trace for a selected set of
proxies my mimicking the creating of various pipeline components in sequence.
Typical usage of this module is as follows::
from paraview import smstate
state = smstate.get_state()
print (state)
Note, this cannot be called when Python tracing is active.
"""
from paraview import servermanager as sm
from paraview import smtrace
from paraview import simple
[docs]class supported_proxies(object):
"""filter object used to hide proxies that are currently not supported by
the state saving mechanism or those that are generally skipped in state e.g.
animation proxies and time keeper."""
def __call__(self, proxy):
return proxy and \
not proxy.GetXMLGroup() == "animation" and \
not proxy.GetXMLName() == "TimeKeeper"
[docs]class visible_representations(object):
"""filter object to skip hidden representations from being saved in state file"""
def __call__(self, proxy):
if not supported_proxies()(proxy): return False
try:
return proxy.Visibility
except AttributeError:
pass
return True
def __toposort(input_set):
"""implementation of Tarjan topological sort to sort proxies using consumer
dependencies as graph edges."""
result = []
marked_set = set()
while marked_set != input_set:
unmarked_node = (input_set - marked_set).pop()
__toposort_visit(result, unmarked_node, input_set, marked_set)
result.reverse()
return result
def __toposort_visit(result, proxy, input_set, marked_set, t_marked_set=None):
if t_marked_set is None:
temporarily_marked_set = set()
else:
temporarily_marked_set = t_marked_set
if proxy in temporarily_marked_set:
raise RuntimeError ("Cycle detected in pipeline! %r" % proxy)
if not proxy in marked_set:
temporarily_marked_set.add(proxy)
consumers = set()
get_consumers(proxy, lambda x: x in input_set, consumer_set=consumers, recursive=False)
for x in consumers:
__toposort_visit(result, x, input_set, marked_set, temporarily_marked_set)
marked_set.add(proxy)
temporarily_marked_set.discard(proxy)
result.append(proxy)
[docs]def get_consumers(proxy, filter, consumer_set, recursive=True):
"""Returns the consumers for a proxy iteratively. If filter is non-None,
filter is used to cull consumers."""
for i in xrange(proxy.GetNumberOfConsumers()):
consumer = proxy.GetConsumerProxy(i)
consumer = consumer.GetTrueParentProxy() if consumer else None
consumer = sm._getPyProxy(consumer)
if not consumer or consumer.IsPrototype() or consumer in consumer_set:
continue
if filter(consumer):
consumer_set.add(consumer)
if recursive: get_consumers(consumer, filter, consumer_set)
[docs]def get_producers(proxy, filter, producer_set):
"""Returns the producers for a proxy iteratively. If filter is non-None,
filter is used to cull producers."""
for i in xrange(proxy.GetNumberOfProducers()):
producer = proxy.GetProducerProxy(i)
producer = producer.GetTrueParentProxy() if producer else None
producer = sm._getPyProxy(producer)
if not producer or producer.IsPrototype() or producer in producer_set:
continue
if filter(producer):
producer_set.add(producer)
get_producers(producer, filter, producer_set)
# FIXME: LookupTable is missed :/, darn subproxies!
try:
if proxy.LookupTable and filter(proxy.LookupTable):
producer_set.add(proxy.LookupTable)
get_producers(proxy.LookupTable, filter, producer_set)
except AttributeError: pass
try:
if proxy.ScalarOpacityFunction and filter(proxy.ScalarOpacityFunction):
producer_set.add(proxy.ScalarOpacityFunction)
get_producers(proxy.ScalarOpacityFunction, filter, producer_set)
except AttributeError: pass
[docs]def get_state(propertiesToTraceOnCreate=1, # sm.vtkSMTrace.RECORD_MODIFIED_PROPERTIES,
skipHiddenRepresentations=True, source_set=[], filter=None, raw=False):
"""Returns the state string"""
if sm.vtkSMTrace.GetActiveTracer():
raise RuntimeError ("Cannot generate Python state when tracing is active.")
if filter is None:
filter = visible_representations() if skipHiddenRepresentations else supported_proxies()
# build a set of proxies of interest
if source_set:
start_set = source_set
else:
# if nothing is specified, we save all views and sources.
start_set = simple.GetSources().values() + simple.GetViews()
start_set = [x for x in start_set if filter(x)]
# now, locate dependencies for the start_set, pruning irrelevant branches
consumers = set(start_set)
for proxy in start_set:
get_consumers(proxy, filter, consumers)
producers = set()
for proxy in consumers:
get_producers(proxy, filter, producers)
# proxies_of_interest is set of all proxies that we should trace.
proxies_of_interest = producers.union(consumers)
#print ("proxies_of_interest", proxies_of_interest)
trace_config = smtrace.start_trace()
# this ensures that lookup tables/scalar bars etc. are fully traced.
trace_config.SetFullyTraceSupplementalProxies(True)
trace = smtrace.TraceOutput()
trace.append("# state file generated using %s" % simple.GetParaViewSourceVersion())
#--------------------------------------------------------------------------
# First, we trace the views and layouts, if any.
# TODO: add support for layouts.
views = [x for x in proxies_of_interest if smtrace.Trace.get_registered_name(x, "views")]
if views:
# sort views by their names, so the state has some structure to it.
views = sorted(views, cmp=lambda x,y:\
cmp(smtrace.Trace.get_registered_name(x, "views"),
smtrace.Trace.get_registered_name(y, "views")))
trace.append_separated([\
"# ----------------------------------------------------------------",
"# setup views used in the visualization",
"# ----------------------------------------------------------------"])
for view in views:
# FIXME: save view camera positions and size.
traceitem = smtrace.RegisterViewProxy(view)
traceitem.finalize()
del traceitem
trace.append_separated(smtrace.get_current_trace_output_and_reset(raw=True))
#--------------------------------------------------------------------------
# Next, trace data processing pipelines.
sorted_proxies_of_interest = __toposort(proxies_of_interest)
sorted_sources = [x for x in sorted_proxies_of_interest \
if smtrace.Trace.get_registered_name(x, "sources")]
if sorted_sources:
trace.append_separated([\
"# ----------------------------------------------------------------",
"# setup the data processing pipelines",
"# ----------------------------------------------------------------"])
for source in sorted_sources:
traceitem = smtrace.RegisterPipelineProxy(source)
traceitem.finalize()
del traceitem
trace.append_separated(smtrace.get_current_trace_output_and_reset(raw=True))
#--------------------------------------------------------------------------
# Now, trace the transfer functions (color maps and opacity maps) used.
ctfs = set([x for x in proxies_of_interest \
if smtrace.Trace.get_registered_name(x, "lookup_tables")])
if ctfs:
trace.append_separated([\
"# ----------------------------------------------------------------",
"# setup color maps and opacity mapes used in the visualization",
"# note: the Get..() functions create a new object, if needed",
"# ----------------------------------------------------------------"])
for ctf in ctfs:
smtrace.Trace.get_accessor(ctf)
if ctf.ScalarOpacityFunction in proxies_of_interest:
smtrace.Trace.get_accessor(ctf.ScalarOpacityFunction)
trace.append_separated(smtrace.get_current_trace_output_and_reset(raw=True))
#--------------------------------------------------------------------------
# Can't decide if the representations should be saved with the pipeline
# objects or afterwords, opting for afterwords for now since the topological
# sort doesn't guarantee that the representations will follow their sources
# anyways.
sorted_representations = [x for x in sorted_proxies_of_interest \
if smtrace.Trace.get_registered_name(x, "representations")]
scalarbar_representations = [x for x in sorted_proxies_of_interest\
if smtrace.Trace.get_registered_name(x, "scalar_bars")]
# print ("sorted_representations", sorted_representations)
# print ("scalarbar_representations", scalarbar_representations)
if sorted_representations or scalarbar_representations:
for view in views:
view_representations = [x for x in view.Representations if x in sorted_representations]
view_scalarbars = [x for x in view.Representations if x in scalarbar_representations]
if view_representations or view_scalarbars:
trace.append_separated([\
"# ----------------------------------------------------------------",
"# setup the visualization in view '%s'" % smtrace.Trace.get_accessor(view),
"# ----------------------------------------------------------------"])
for rep in view_representations:
try:
producer = rep.Input
port = rep.Input.Port
traceitem = smtrace.Show(producer, port, view, rep,
comment="show data from %s" % smtrace.Trace.get_accessor(producer))
traceitem.finalize()
del traceitem
trace.append_separated(smtrace.get_current_trace_output_and_reset(raw=True))
if rep.IsScalarBarVisible(view):
# FIXME: this will save this multiple times, right now,
# if two representations use the same LUT.
trace.append_separated([\
"# show color legend",
"%s.SetScalarBarVisibility(%s, True)" % (\
smtrace.Trace.get_accessor(rep),
smtrace.Trace.get_accessor(view))])
except AttributeError: pass
# save the scalar bar properties themselves.
if view_scalarbars:
trace.append_separated("# setup the color legend parameters for each legend in this view")
for rep in view_scalarbars:
smtrace.Trace.get_accessor(rep)
trace.append_separated(smtrace.get_current_trace_output_and_reset(raw=True))
# restore the active source since the order in which the pipeline is created
# in the state file can end up changing the active source to be different
# than what it was when the state is being saved.
trace.append_separated([\
"# ----------------------------------------------------------------",
"# finally, restore active source",
"SetActiveSource(%s)" % smtrace.Trace.get_accessor(simple.GetActiveSource()),
"# ----------------------------------------------------------------"])
del trace_config
smtrace.stop_trace()
#print (trace)
return str(trace) if not raw else trace.raw_data()
if __name__ == "__main__":
print ( "Running test")
simple.Mandelbrot()
simple.Show()
simple.Hide()
simple.Shrink().ShrinkFactor = 0.4
simple.UpdatePipeline()
simple.Clip().ClipType.Normal[1] = 1
rep = simple.Show()
view = simple.Render()
view.ViewSize=[500, 500]
rep.SetScalarBarVisibility(view, True)
simple.Render()
# rep.SetScalarBarVisibility(view, False)
print ("====================================================================")
print (get_state())