• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdelibs-4.11.5 API Reference
  • KDE Home
  • Contact Us
 

ThreadWeaver

  • threadweaver
  • Weaver
WeaverImpl.cpp
Go to the documentation of this file.
1 /* -*- C++ -*-
2 
3 This file implements the WeaverImpl class.
4 
5 
6 $ Author: Mirko Boehm $
7 $ Copyright: (C) 2005-2013 Mirko Boehm $
8 $ Contact: mirko@kde.org
9 http://www.kde.org
10 http://creative-destruction.me $
11 
12  This library is free software; you can redistribute it and/or
13  modify it under the terms of the GNU Library General Public
14  License as published by the Free Software Foundation; either
15  version 2 of the License, or (at your option) any later version.
16 
17  This library is distributed in the hope that it will be useful,
18  but WITHOUT ANY WARRANTY; without even the implied warranty of
19  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20  Library General Public License for more details.
21 
22  You should have received a copy of the GNU Library General Public License
23  along with this library; see the file COPYING.LIB. If not, write to
24  the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
25  Boston, MA 02110-1301, USA.
26 
27 $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $
28 
29 */
30 
31 #include "WeaverImpl.h"
32 
33 #include <QtCore/QObject>
34 #include <QtCore/QMutex>
35 #include <QtCore/QDebug>
36 
37 #include "Job.h"
38 #include "State.h"
39 #include "Thread.h"
40 #include "ThreadWeaver.h"
41 #include "DebuggingAids.h"
42 #include "WeaverObserver.h"
43 #include "SuspendedState.h"
44 #include "SuspendingState.h"
45 #include "DestructedState.h"
46 #include "WorkingHardState.h"
47 #include "ShuttingDownState.h"
48 #include "InConstructionState.h"
49 
50 using namespace ThreadWeaver;
51 
52 WeaverImpl::WeaverImpl( QObject* parent )
53  : WeaverInterface(parent)
54  , m_active(0)
55  , m_inventoryMax( qMax(4, 2 * QThread::idealThreadCount() ) )
56  , m_mutex ( new QMutex( QMutex::Recursive ) )
57  , m_finishMutex( new QMutex )
58  , m_jobAvailableMutex ( new QMutex )
59  , m_state (0)
60 {
61  QMutexLocker l(m_mutex); Q_UNUSED(l);
62  // initialize state objects:
63  m_states[InConstruction] = new InConstructionState( this );
64  setState ( InConstruction );
65  m_states[WorkingHard] = new WorkingHardState( this );
66  m_states[Suspending] = new SuspendingState( this );
67  m_states[Suspended] = new SuspendedState( this );
68  m_states[ShuttingDown] = new ShuttingDownState( this );
69  m_states[Destructed] = new DestructedState( this );
70 
71  // FIXME (0.7) this is supposedly unnecessary
72  connect ( this, SIGNAL (asyncThreadSuspended(ThreadWeaver::Thread*)),
73  SIGNAL (threadSuspended(ThreadWeaver::Thread*)),
74  Qt::QueuedConnection );
75  setState( WorkingHard );
76 }
77 
78 WeaverImpl::~WeaverImpl()
79 { // the constructor may only be called from the thread that owns this
80  // object (everything else would be what we professionals call "insane")
81  REQUIRE( QThread::currentThread() == thread() );
82  debug ( 3, "WeaverImpl dtor: destroying inventory.\n" );
83  setState ( ShuttingDown );
84 
85  m_jobAvailable.wakeAll();
86 
87  // problem: Some threads might not be asleep yet, just finding
88  // out if a job is available. Those threads will suspend
89  // waiting for their next job (a rare case, but not impossible).
90  // Therefore, if we encounter a thread that has not exited, we
91  // have to wake it again (which we do in the following for
92  // loop).
93 
94  for (;;) {
95  Thread* th = 0;
96  {
97  QMutexLocker l(m_mutex); Q_UNUSED(l);
98  if (m_inventory.isEmpty()) break;
99  th = m_inventory.takeFirst();
100  }
101  if ( !th->isFinished() )
102  {
103  for ( ;; )
104  {
105  m_jobAvailable.wakeAll();
106  if ( th->wait( 100 ) ) break;
107  debug ( 1, "WeaverImpl::~WeaverImpl: thread %i did not exit as expected, "
108  "retrying.\n", th->id() );
109  }
110  }
111  emit ( threadExited ( th ) );
112  delete th;
113  }
114  Q_ASSERT(m_inventory.isEmpty());
115  delete m_finishMutex;
116  delete m_jobAvailableMutex;
117  debug ( 3, "WeaverImpl dtor: done\n" );
118  setState ( Destructed ); // m_state = Halted;
119  // FIXME: delete state objects. what sense does DestructedState make then?
120  // FIXME: make state objects static, since they are
121  delete m_mutex;
122 }
123 
124 void WeaverImpl::setState ( StateId id )
125 {
126  QMutexLocker l(m_mutex); Q_UNUSED(l);
127  if ( m_state==0 || m_state->stateId() != id )
128  {
129  m_state = m_states[id];
130  debug ( 2, "WeaverImpl::setState: state changed to \"%s\".\n",
131  m_state->stateName().toLatin1().constData() );
132  if ( id == Suspended )
133  {
134  emit ( suspended() );
135  }
136 
137  m_state->activated();
138 
139  emit ( stateChanged ( m_state ) );
140  }
141 }
142 
143 const State& WeaverImpl::state() const
144 {
145  QMutexLocker l(m_mutex); Q_UNUSED(l);
146  return *m_state;
147 }
148 
149 void WeaverImpl::setMaximumNumberOfThreads( int cap )
150 {
151  Q_ASSERT_X ( cap > 0, "Weaver Impl", "Thread inventory size has to be larger than zero." );
152  QMutexLocker l (m_mutex);
153  m_inventoryMax = cap;
154 }
155 
156 int WeaverImpl::maximumNumberOfThreads() const
157 {
158  QMutexLocker l(m_mutex); Q_UNUSED(l);
159  return m_inventoryMax;
160 }
161 
162 int WeaverImpl::currentNumberOfThreads () const
163 {
164  QMutexLocker l(m_mutex); Q_UNUSED(l);
165  return m_inventory.count ();
166 }
167 
168 void WeaverImpl::registerObserver ( WeaverObserver *ext )
169 {
170  connect ( this, SIGNAL (stateChanged(ThreadWeaver::State*)),
171  ext, SIGNAL (weaverStateChanged(ThreadWeaver::State*)) );
172  connect ( this, SIGNAL (threadStarted(ThreadWeaver::Thread*)),
173  ext, SIGNAL (threadStarted(ThreadWeaver::Thread*)) );
174  connect ( this, SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)),
175  ext, SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)) );
176  connect ( this, SIGNAL (threadSuspended(ThreadWeaver::Thread*)),
177  ext, SIGNAL (threadSuspended(ThreadWeaver::Thread*)) );
178  connect ( this, SIGNAL (threadExited(ThreadWeaver::Thread*)) ,
179  ext, SIGNAL (threadExited(ThreadWeaver::Thread*)) );
180 }
181 
182 void WeaverImpl::enqueue(Job* job)
183 {
184  if (job) {
185  adjustInventory ( 1 );
186  debug ( 3, "WeaverImpl::enqueue: queueing job %p of type %s.\n",
187  (void*)job, job->metaObject()->className() );
188  QMutexLocker l (m_mutex); Q_UNUSED(l);
189  job->aboutToBeQueued ( this );
190  // find position for insertion:;
191  // FIXME (after 0.6) optimize: factor out queue management into own class,
192  // and use binary search for insertion (not done yet because
193  // refactoring already planned):
194  int i = m_assignments.size();
195  if (i > 0) {
196  while ( i > 0 && m_assignments.at(i - 1)->priority() < job->priority() ) --i;
197  m_assignments.insert( i, (job) );
198  } else {
199  m_assignments.append (job);
200  }
201  assignJobs();
202  }
203 }
204 
205 void WeaverImpl::adjustInventory ( int numberOfNewJobs )
206 {
207  QMutexLocker l(m_mutex); Q_UNUSED(l);
208 
209  // no of threads that can be created:
210  const int reserve = m_inventoryMax - m_inventory.count();
211 
212  if ( reserve > 0 )
213  {
214  for ( int i = 0; i < qMin ( reserve, numberOfNewJobs ); ++i )
215  {
216  Thread *th = createThread();
217  th->moveToThread( th ); // be sane from the start
218  m_inventory.append(th);
219  connect ( th, SIGNAL (jobStarted(ThreadWeaver::Thread*,ThreadWeaver::Job*)),
220  SIGNAL (threadBusy(ThreadWeaver::Thread*,ThreadWeaver::Job*)) );
221  connect ( th, SIGNAL (jobDone(ThreadWeaver::Job*)),
222  SIGNAL (jobDone(ThreadWeaver::Job*)) );
223  connect ( th, SIGNAL (started(ThreadWeaver::Thread*)),
224  SIGNAL (threadStarted(ThreadWeaver::Thread*)) );
225 
226  th->start ();
227  debug ( 2, "WeaverImpl::adjustInventory: thread created, "
228  "%i threads in inventory.\n", currentNumberOfThreads() );
229  }
230  }
231 }
232 
233 Thread* WeaverImpl::createThread()
234 {
235  return new Thread( this );
236 }
237 
238 bool WeaverImpl::dequeue ( Job* job )
239 {
240  bool result;
241  {
242  QMutexLocker l (m_mutex);
243 
244  int i = m_assignments.indexOf ( job );
245  if ( i != -1 )
246  {
247  job->aboutToBeDequeued( this );
248 
249  m_assignments.removeAt( i );
250  result = true;
251  debug( 3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n",
252  (void*)job, m_assignments.size() );
253  } else {
254  debug( 3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void*)job );
255  result = false;
256  }
257  }
258 
259  // from the queues point of view, a job is just as finished if
260  // it gets dequeued:
261  m_jobFinished.wakeOne();
262  return result;
263 }
264 
265 void WeaverImpl::dequeue ()
266 {
267  debug( 3, "WeaverImpl::dequeue: dequeueing all jobs.\n" );
268  QMutexLocker l (m_mutex);
269  for ( int index = 0; index < m_assignments.size(); ++index )
270  {
271  m_assignments.at( index )->aboutToBeDequeued( this );
272  }
273  m_assignments.clear();
274 
275  ENSURE ( m_assignments.isEmpty() );
276 }
277 
278 void WeaverImpl::suspend ()
279 {
280  m_state->suspend();
281 }
282 
283 void WeaverImpl::resume ( )
284 {
285  m_state->resume();
286 }
287 
288 void WeaverImpl::assignJobs()
289 {
290  m_jobAvailable.wakeAll();
291 }
292 
293 bool WeaverImpl::isEmpty() const
294 {
295  QMutexLocker l(m_mutex); Q_UNUSED(l);
296  return m_assignments.isEmpty();
297 }
298 
299 
300 void WeaverImpl::incActiveThreadCount()
301 {
302  adjustActiveThreadCount ( 1 );
303 }
304 
305 void WeaverImpl::decActiveThreadCount()
306 {
307  adjustActiveThreadCount ( -1 );
308  // the done job could have freed another set of jobs, and we do not know how
309  // many - therefore we need to wake all threads:
310  m_jobFinished.wakeAll();
311 }
312 
313 void WeaverImpl::adjustActiveThreadCount( int diff )
314 {
315  QMutexLocker l (m_mutex); Q_UNUSED(l);
316  m_active += diff;
317  debug ( 4, "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs"
318  " in queue).\n", m_active, queueLength() );
319 
320  if ( m_assignments.isEmpty() && m_active == 0)
321  {
322  P_ASSERT ( diff < 0 ); // cannot reach Zero otherwise
323  emit ( finished() );
324  }
325 }
326 
327 int WeaverImpl::activeThreadCount()
328 {
329  QMutexLocker l(m_mutex); Q_UNUSED(l);
330  return m_active;
331 }
332 
333 Job* WeaverImpl::takeFirstAvailableJob(Job *previous)
334 {
335  QMutexLocker l (m_mutex); Q_UNUSED(l);
336  if (previous) {
337  // cleanup and send events:
338  decActiveThreadCount();
339  }
340  Job *next = 0;
341  for (int index = 0; index < m_assignments.size(); ++index) {
342  if ( m_assignments.at(index)->canBeExecuted() ) {
343  next = m_assignments.at(index);
344  m_assignments.removeAt (index);
345  break;
346  }
347  }
348  if (next) {
349  incActiveThreadCount();
350  }
351  return next;
352 }
353 
354 Job* WeaverImpl::applyForWork(Thread *th, Job* previous)
355 {
356  return m_state->applyForWork ( th, previous );
357 }
358 
359 void WeaverImpl::waitForAvailableJob(Thread* th)
360 {
361  m_state->waitForAvailableJob ( th );
362 }
363 
364 void WeaverImpl::blockThreadUntilJobsAreBeingAssigned ( Thread *th )
365 { // th is the thread that calls this method:
366  Q_UNUSED ( th );
367  debug ( 4, "WeaverImpl::blockThread...: thread %i blocked.\n", th->id());
368  emit asyncThreadSuspended ( th );
369  QMutexLocker l( m_jobAvailableMutex );
370  m_jobAvailable.wait( m_jobAvailableMutex );
371  debug ( 4, "WeaverImpl::blockThread...: thread %i resumed.\n", th->id());
372 }
373 
374 int WeaverImpl::queueLength() const
375 {
376  QMutexLocker l(m_mutex); Q_UNUSED(l);
377  return m_assignments.count();
378 }
379 
380 bool WeaverImpl::isIdle () const
381 {
382  QMutexLocker l(m_mutex); Q_UNUSED(l);
383  return isEmpty() && m_active == 0;
384 }
385 
386 void WeaverImpl::finish()
387 {
388 #ifdef QT_NO_DEBUG
389  const int MaxWaitMilliSeconds = 50;
390 #else
391  const int MaxWaitMilliSeconds = 500;
392 #endif
393  while ( !isIdle() ) {
394  Q_ASSERT(state().stateId() == WorkingHard);
395  debug (2, "WeaverImpl::finish: not done, waiting.\n" );
396  QMutexLocker l( m_finishMutex );
397  if ( m_jobFinished.wait( l.mutex(), MaxWaitMilliSeconds ) == false ) {
398  debug ( 2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n",
399  queueLength() );
400  m_jobAvailable.wakeAll();
401  }
402  }
403  debug (2, "WeaverImpl::finish: done.\n\n\n" );
404 }
405 
406 void WeaverImpl::requestAbort()
407 {
408  QMutexLocker l(m_mutex); Q_UNUSED(l);
409  for ( int i = 0; i<m_inventory.size(); ++i ) {
410  m_inventory[i]->requestAbort();
411  }
412 }
413 
414 void WeaverImpl::dumpJobs()
415 {
416  QMutexLocker l(m_mutex); Q_UNUSED(l);
417  debug( 0, "WeaverImpl::dumpJobs: current jobs:\n" );
418  for ( int index = 0; index < m_assignments.size(); ++index ) {
419  debug( 0, "--> %4i: %p %s (priority %i, can be executed: %s)\n", index, (void*)m_assignments.at( index ),
420  m_assignments.at( index )->metaObject()->className(),
421  m_assignments.at(index)->priority(),
422  m_assignments.at(index)->canBeExecuted() ? "yes" : "no");
423  }
424 }
425 
426 #include "WeaverImpl.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2014 The KDE developers.
Generated on Tue Sep 23 2014 09:57:13 by doxygen 1.8.3.1 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

ThreadWeaver

Skip menu "ThreadWeaver"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • File Members
  • Related Pages

kdelibs-4.11.5 API Reference

Skip menu "kdelibs-4.11.5 API Reference"
  • DNSSD
  • Interfaces
  •   KHexEdit
  •   KMediaPlayer
  •   KSpeech
  •   KTextEditor
  • kconf_update
  • KDE3Support
  •   KUnitTest
  • KDECore
  • KDED
  • KDEsu
  • KDEUI
  • KDEWebKit
  • KDocTools
  • KFile
  • KHTML
  • KImgIO
  • KInit
  • kio
  • KIOSlave
  • KJS
  •   KJS-API
  •   WTF
  • kjsembed
  • KNewStuff
  • KParts
  • KPty
  • Kross
  • KUnitConversion
  • KUtils
  • Nepomuk
  • Plasma
  • Solid
  • Sonnet
  • ThreadWeaver
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal