• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.8.3 API Reference
  • KDE Home
  • Contact Us
 

akonadi

resourcescheduler.cpp
00001 /*
00002     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00003 
00004     This library is free software; you can redistribute it and/or modify it
00005     under the terms of the GNU Library General Public License as published by
00006     the Free Software Foundation; either version 2 of the License, or (at your
00007     option) any later version.
00008 
00009     This library is distributed in the hope that it will be useful, but WITHOUT
00010     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00011     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00012     License for more details.
00013 
00014     You should have received a copy of the GNU Library General Public License
00015     along with this library; see the file COPYING.LIB.  If not, write to the
00016     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00017     02110-1301, USA.
00018 */
00019 
00020 #include "resourcescheduler_p.h"
00021 
00022 #include "dbusconnectionpool.h"
00023 
00024 #include <kdebug.h>
00025 #include <klocale.h>
00026 
00027 #include <QtCore/QTimer>
00028 #include <QtDBus/QDBusInterface>
00029 #include <QtDBus/QDBusConnectionInterface>
00030 #include <boost/graph/graph_concepts.hpp>
00031 
00032 using namespace Akonadi;
00033 
00034 qint64 ResourceScheduler::Task::latestSerial = 0;
00035 static QDBusAbstractInterface *s_resourcetracker = 0;
00036 
00037 //@cond PRIVATE
00038 
00039 ResourceScheduler::ResourceScheduler( QObject *parent ) :
00040     QObject( parent ),
00041     mCurrentTasksQueue( -1 ),
00042     mOnline( false )
00043 {
00044 }
00045 
00046 void ResourceScheduler::scheduleFullSync()
00047 {
00048   Task t;
00049   t.type = SyncAll;
00050   TaskList& queue = queueForTaskType( t.type );
00051   if ( queue.contains( t ) || mCurrentTask == t )
00052     return;
00053   queue << t;
00054   signalTaskToTracker( t, "SyncAll" );
00055   scheduleNext();
00056 }
00057 
00058 void ResourceScheduler::scheduleCollectionTreeSync()
00059 {
00060   Task t;
00061   t.type = SyncCollectionTree;
00062   TaskList& queue = queueForTaskType( t.type );
00063   if ( queue.contains( t ) || mCurrentTask == t )
00064     return;
00065   queue << t;
00066   signalTaskToTracker( t, "SyncCollectionTree" );
00067   scheduleNext();
00068 }
00069 
00070 void ResourceScheduler::scheduleSync(const Collection & col)
00071 {
00072   Task t;
00073   t.type = SyncCollection;
00074   t.collection = col;
00075   TaskList& queue = queueForTaskType( t.type );
00076   if ( queue.contains( t ) || mCurrentTask == t )
00077     return;
00078   queue << t;
00079   signalTaskToTracker( t, "SyncCollection" );
00080   scheduleNext();
00081 }
00082 
00083 void ResourceScheduler::scheduleAttributesSync( const Collection &collection )
00084 {
00085   Task t;
00086   t.type = SyncCollectionAttributes;
00087   t.collection = collection;
00088 
00089   TaskList& queue = queueForTaskType( t.type );
00090   if ( queue.contains( t ) || mCurrentTask == t )
00091     return;
00092   queue << t;
00093   signalTaskToTracker( t, "SyncCollectionAttributes" );
00094   scheduleNext();
00095 }
00096 
00097 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
00098 {
00099   Task t;
00100   t.type = FetchItem;
00101   t.item = item;
00102   t.itemParts = parts;
00103 
00104   // if the current task does already fetch the requested item, break here but
00105   // keep the dbus message, so we can send the reply later on
00106   if ( mCurrentTask == t ) {
00107     mCurrentTask.dbusMsgs << msg;
00108     return;
00109   }
00110 
00111   // If this task is already in the queue, merge with it.
00112   TaskList& queue = queueForTaskType( t.type );
00113   const int idx = queue.indexOf( t );
00114   if ( idx != -1 ) {
00115     queue[ idx ].dbusMsgs << msg;
00116     return;
00117   }
00118 
00119   t.dbusMsgs << msg;
00120   queue << t;
00121   signalTaskToTracker( t, "FetchItem" );
00122   scheduleNext();
00123 }
00124 
00125 void ResourceScheduler::scheduleResourceCollectionDeletion()
00126 {
00127   Task t;
00128   t.type = DeleteResourceCollection;
00129   TaskList& queue = queueForTaskType( t.type );
00130   if ( queue.contains( t ) || mCurrentTask == t )
00131     return;
00132   queue << t;
00133   signalTaskToTracker( t, "DeleteResourceCollection" );
00134   scheduleNext();
00135 }
00136 
00137 void ResourceScheduler::scheduleCacheInvalidation( const Collection &collection )
00138 {
00139   Task t;
00140   t.type = InvalideCacheForCollection;
00141   t.collection = collection;
00142   TaskList& queue = queueForTaskType( t.type );
00143   if ( queue.contains( t ) || mCurrentTask == t )
00144     return;
00145   queue << t;
00146   signalTaskToTracker( t, "InvalideCacheForCollection" );
00147   scheduleNext();
00148 }
00149 
00150 void ResourceScheduler::scheduleChangeReplay()
00151 {
00152   Task t;
00153   t.type = ChangeReplay;
00154   TaskList& queue = queueForTaskType( t.type );
00155   // see ResourceBase::changeProcessed() for why we do not check for mCurrentTask == t here like in the other tasks
00156   if ( queue.contains( t ) )
00157     return;
00158   queue << t;
00159   signalTaskToTracker( t, "ChangeReplay" );
00160   scheduleNext();
00161 }
00162 
00163 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
00164 {
00165   Task t;
00166   t.type = SyncAllDone;
00167   TaskList& queue = queueForTaskType( t.type );
00168   // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
00169   queue << t;
00170   signalTaskToTracker( t, "SyncAllDone" );
00171   scheduleNext();
00172 }
00173 
00174 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
00175 {
00176   Task t;
00177   t.type = SyncCollectionTreeDone;
00178   TaskList& queue = queueForTaskType( t.type );
00179   // no compression here, all this does is emitting a D-Bus signal anyway, and compression can trigger races on the receiver side with the signal being lost
00180   queue << t;
00181   signalTaskToTracker( t, "SyncCollectionTreeDone" );
00182   scheduleNext();
00183 }
00184 
00185 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
00186 {
00187   Task t;
00188   t.type = Custom;
00189   t.receiver = receiver;
00190   t.methodName = methodName;
00191   t.argument = argument;
00192   QueueType queueType = GenericTaskQueue;
00193   if ( priority == ResourceBase::AfterChangeReplay )
00194     queueType = AfterChangeReplayQueue;
00195   else if ( priority == ResourceBase::Prepend )
00196     queueType = PrependTaskQueue;
00197   TaskList& queue = mTaskList[ queueType ];
00198 
00199   if ( queue.contains( t ) )
00200     return;
00201 
00202   switch (priority) {
00203   case ResourceBase::Prepend:
00204     queue.prepend( t );
00205     break;
00206   default:
00207     queue.append(t);
00208     break;
00209   }
00210 
00211   signalTaskToTracker( t, "Custom-" + t.methodName );
00212   scheduleNext();
00213 }
00214 
00215 void ResourceScheduler::taskDone()
00216 {
00217   if ( isEmpty() )
00218     emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
00219 
00220   if ( s_resourcetracker ) {
00221     QList<QVariant> argumentList;
00222     argumentList << QString::number( mCurrentTask.serial )
00223                  << QString();
00224     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00225   }
00226 
00227   mCurrentTask = Task();
00228   mCurrentTasksQueue = -1;
00229   scheduleNext();
00230 }
00231 
00232 void ResourceScheduler::deferTask()
00233 {
00234   if ( mCurrentTask.type == Invalid )
00235       return;
00236 
00237   if ( s_resourcetracker ) {
00238     QList<QVariant> argumentList;
00239     argumentList << QString::number( mCurrentTask.serial )
00240                  << QString();
00241     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00242   }
00243 
00244   Task t = mCurrentTask;
00245   mCurrentTask = Task();
00246 
00247   Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
00248   mTaskList[mCurrentTasksQueue].prepend( t );
00249   mCurrentTasksQueue = -1;
00250 
00251   signalTaskToTracker( t, "DeferedTask" );
00252 
00253   scheduleNext();
00254 }
00255 
00256 bool ResourceScheduler::isEmpty()
00257 {
00258   for ( int i = 0; i < NQueueCount; ++i ) {
00259     if ( !mTaskList[i].isEmpty() )
00260       return false;
00261   }
00262   return true;
00263 }
00264 
00265 void ResourceScheduler::scheduleNext()
00266 {
00267   if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
00268     return;
00269   QTimer::singleShot( 0, this, SLOT(executeNext()) );
00270 }
00271 
00272 void ResourceScheduler::executeNext()
00273 {
00274   if ( mCurrentTask.type != Invalid || isEmpty() )
00275     return;
00276 
00277   for ( int i = 0; i < NQueueCount; ++i ) {
00278     if ( !mTaskList[ i ].isEmpty() ) {
00279       mCurrentTask = mTaskList[ i ].takeFirst();
00280       mCurrentTasksQueue = i;
00281       break;
00282     }
00283   }
00284 
00285   if ( s_resourcetracker ) {
00286     QList<QVariant> argumentList;
00287     argumentList << QString::number( mCurrentTask.serial );
00288     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
00289   }
00290 
00291   switch ( mCurrentTask.type ) {
00292     case SyncAll:
00293       emit executeFullSync();
00294       break;
00295     case SyncCollectionTree:
00296       emit executeCollectionTreeSync();
00297       break;
00298     case SyncCollection:
00299       emit executeCollectionSync( mCurrentTask.collection );
00300       break;
00301     case SyncCollectionAttributes:
00302       emit executeCollectionAttributesSync( mCurrentTask.collection );
00303       break;
00304     case FetchItem:
00305       emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
00306       break;
00307     case DeleteResourceCollection:
00308       emit executeResourceCollectionDeletion();
00309       break;
00310     case InvalideCacheForCollection:
00311       emit executeCacheInvalidation( mCurrentTask.collection );
00312       break;
00313     case ChangeReplay:
00314       emit executeChangeReplay();
00315       break;
00316     case SyncAllDone:
00317       emit fullSyncComplete();
00318       break;
00319     case SyncCollectionTreeDone:
00320       emit collectionTreeSyncComplete();
00321       break;
00322     case Custom:
00323     {
00324       const QByteArray methodSig = mCurrentTask.methodName + "(QVariant)";
00325       const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
00326       bool success = false;
00327       if ( hasSlotWithVariant ) {
00328         success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
00329         Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" );
00330       }
00331       if ( !success )
00332         success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
00333 
00334       if ( !success )
00335         kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
00336       break;
00337     }
00338     default: {
00339       kError() << "Unhandled task type" << mCurrentTask.type;
00340       dump();
00341       Q_ASSERT( false );
00342     }
00343   }
00344 }
00345 
00346 ResourceScheduler::Task ResourceScheduler::currentTask() const
00347 {
00348   return mCurrentTask;
00349 }
00350 
00351 void ResourceScheduler::setOnline(bool state)
00352 {
00353   if ( mOnline == state )
00354     return;
00355   mOnline = state;
00356   if ( mOnline ) {
00357     scheduleNext();
00358   } else {
00359     if ( mCurrentTask.type != Invalid ) {
00360       // abort running task
00361       queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
00362       mCurrentTask = Task();
00363       mCurrentTasksQueue = -1;
00364     }
00365     // abort pending synchronous tasks, might take longer until the resource goes online again
00366     TaskList& itemFetchQueue = queueForTaskType( FetchItem );
00367     for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
00368       if ( (*it).type == FetchItem ) {
00369         (*it).sendDBusReplies( false );
00370         it = itemFetchQueue.erase( it );
00371         if ( s_resourcetracker ) {
00372           QList<QVariant> argumentList;
00373           argumentList << QString::number( mCurrentTask.serial )
00374                        << QLatin1String( "Job canceled." );
00375           s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
00376         }
00377       } else {
00378         ++it;
00379       }
00380     }
00381   }
00382 }
00383 
00384 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType )
00385 {
00386   // if there's a job tracer running, tell it about the new job
00387   if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
00388     s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
00389                                        QLatin1String( "/resourcesJobtracker" ),
00390                                        QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
00391                                        DBusConnectionPool::threadConnection(), 0 );
00392   }
00393 
00394   if ( s_resourcetracker ) {
00395     QList<QVariant> argumentList;
00396     argumentList << static_cast<AgentBase*>(  parent() )->identifier()
00397                  << QString::number( task.serial )
00398                  << QString()
00399                  << QString::fromLatin1( taskType );
00400     s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
00401   }
00402 }
00403 
00404 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
00405 {
00406   if ( !collection.isValid() ) // should not happen, but you never know...
00407     return;
00408   TaskList& queue = queueForTaskType( SyncCollection );
00409   for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
00410     if ( (*it).type == SyncCollection && (*it).collection == collection ) {
00411       it = queue.erase( it );
00412       kDebug() << " erasing";
00413     } else
00414       ++it;
00415   }
00416 }
00417 
00418 void ResourceScheduler::Task::sendDBusReplies( bool success )
00419 {
00420   Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
00421     QDBusMessage reply( msg );
00422     reply << success;
00423     DBusConnectionPool::threadConnection().send( reply );
00424   }
00425 }
00426 
00427 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
00428 {
00429   switch( type ) {
00430   case ChangeReplay:
00431     return ChangeReplayQueue;
00432   case FetchItem:
00433     return ItemFetchQueue;
00434   default:
00435     return GenericTaskQueue;
00436   }
00437 }
00438 
00439 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
00440 {
00441   const QueueType qt = queueTypeForTaskType( type );
00442   return mTaskList[ qt ];
00443 }
00444 
00445 void ResourceScheduler::dump()
00446 {
00447   kDebug() << dumpToString();
00448 }
00449 
00450 QString ResourceScheduler::dumpToString() const
00451 {
00452   QString ret;
00453   QTextStream str( &ret );
00454   str << "ResourceScheduler: " << (mOnline?"Online":"Offline") << endl;
00455   str << " current task: " << mCurrentTask << endl;
00456   for ( int i = 0; i < NQueueCount; ++i ) {
00457     const TaskList& queue = mTaskList[i];
00458     if (queue.isEmpty()) {
00459       str << " queue " << i << " is empty" << endl;
00460     } else {
00461       str << " queue " << i << " " << queue.size() << " tasks:" << endl;
00462       for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
00463         str << "  " << (*it) << endl;
00464       }
00465     }
00466   }
00467   return ret;
00468 }
00469 
00470 void ResourceScheduler::clear()
00471 {
00472   kDebug() << "Clearing ResourceScheduler queues:";
00473   for ( int i = 0; i < NQueueCount; ++i ) {
00474     TaskList& queue = mTaskList[i];
00475     queue.clear();
00476   }
00477   mCurrentTask = Task();
00478   mCurrentTasksQueue = -1;
00479 }
00480 
00481 void Akonadi::ResourceScheduler::cancelQueues()
00482 {
00483   for ( int i = 0; i < NQueueCount; ++i ) {
00484     TaskList& queue = mTaskList[i];
00485     if ( s_resourcetracker ) {
00486       foreach ( const Task &t, queue ) {
00487         QList<QVariant> argumentList;
00488         argumentList << QString::number( t.serial ) << QString();
00489         s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00490       }
00491     }
00492     queue.clear();
00493   }
00494 }
00495 
00496 static const char s_taskTypes[][27] = {
00497       "Invalid (no task)",
00498       "SyncAll",
00499       "SyncCollectionTree",
00500       "SyncCollection",
00501       "SyncCollectionAttributes",
00502       "FetchItem",
00503       "ChangeReplay",
00504       "DeleteResourceCollection",
00505       "InvalideCacheForCollection",
00506       "SyncAllDone",
00507       "SyncCollectionTreeDone",
00508       "Custom"
00509 };
00510 
00511 QTextStream& Akonadi::operator<<( QTextStream& d, const ResourceScheduler::Task& task )
00512 {
00513   d << task.serial << " " << s_taskTypes[task.type] << " ";
00514   if ( task.type != ResourceScheduler::Invalid ) {
00515     if ( task.collection.isValid() )
00516       d << "collection " << task.collection.id() << " ";
00517     if ( task.item.id() != -1 )
00518       d << "item " << task.item.id() << " ";
00519     if ( !task.methodName.isEmpty() )
00520       d << task.methodName << " " << task.argument.toString();
00521   }
00522   return d;
00523 }
00524 
00525 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
00526 {
00527   QString s;
00528   QTextStream str( &s );
00529   str << task;
00530   d << s;
00531   return d;
00532 }
00533 
00534 //@endcond
00535 
00536 #include "resourcescheduler_p.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2012 The KDE developers.
Generated on Thu May 10 2012 22:18:35 by doxygen 1.8.0 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.8.3 API Reference

Skip menu "kdepimlibs-4.8.3 API Reference"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal