00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "resourcescheduler_p.h"
00021
00022 #include "dbusconnectionpool.h"
00023
00024 #include <kdebug.h>
00025 #include <klocale.h>
00026
00027 #include <QtCore/QTimer>
00028 #include <QtDBus/QDBusInterface>
00029 #include <QtDBus/QDBusConnectionInterface>
00030 #include <boost/graph/graph_concepts.hpp>
00031
00032 using namespace Akonadi;
00033
00034 qint64 ResourceScheduler::Task::latestSerial = 0;
00035 static QDBusAbstractInterface *s_resourcetracker = 0;
00036
00037
00038
00039 ResourceScheduler::ResourceScheduler( QObject *parent ) :
00040 QObject( parent ),
00041 mCurrentTasksQueue( -1 ),
00042 mOnline( false )
00043 {
00044 }
00045
00046 void ResourceScheduler::scheduleFullSync()
00047 {
00048 Task t;
00049 t.type = SyncAll;
00050 TaskList& queue = queueForTaskType( t.type );
00051 if ( queue.contains( t ) || mCurrentTask == t )
00052 return;
00053 queue << t;
00054 signalTaskToTracker( t, "SyncAll" );
00055 scheduleNext();
00056 }
00057
00058 void ResourceScheduler::scheduleCollectionTreeSync()
00059 {
00060 Task t;
00061 t.type = SyncCollectionTree;
00062 TaskList& queue = queueForTaskType( t.type );
00063 if ( queue.contains( t ) || mCurrentTask == t )
00064 return;
00065 queue << t;
00066 signalTaskToTracker( t, "SyncCollectionTree" );
00067 scheduleNext();
00068 }
00069
00070 void ResourceScheduler::scheduleSync(const Collection & col)
00071 {
00072 Task t;
00073 t.type = SyncCollection;
00074 t.collection = col;
00075 TaskList& queue = queueForTaskType( t.type );
00076 if ( queue.contains( t ) || mCurrentTask == t )
00077 return;
00078 queue << t;
00079 signalTaskToTracker( t, "SyncCollection" );
00080 scheduleNext();
00081 }
00082
00083 void ResourceScheduler::scheduleAttributesSync( const Collection &collection )
00084 {
00085 Task t;
00086 t.type = SyncCollectionAttributes;
00087 t.collection = collection;
00088
00089 TaskList& queue = queueForTaskType( t.type );
00090 if ( queue.contains( t ) || mCurrentTask == t )
00091 return;
00092 queue << t;
00093 signalTaskToTracker( t, "SyncCollectionAttributes" );
00094 scheduleNext();
00095 }
00096
00097 void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet<QByteArray> &parts, const QDBusMessage & msg)
00098 {
00099 Task t;
00100 t.type = FetchItem;
00101 t.item = item;
00102 t.itemParts = parts;
00103
00104
00105
00106 if ( mCurrentTask == t ) {
00107 mCurrentTask.dbusMsgs << msg;
00108 return;
00109 }
00110
00111
00112 TaskList& queue = queueForTaskType( t.type );
00113 const int idx = queue.indexOf( t );
00114 if ( idx != -1 ) {
00115 queue[ idx ].dbusMsgs << msg;
00116 return;
00117 }
00118
00119 t.dbusMsgs << msg;
00120 queue << t;
00121 signalTaskToTracker( t, "FetchItem" );
00122 scheduleNext();
00123 }
00124
00125 void ResourceScheduler::scheduleResourceCollectionDeletion()
00126 {
00127 Task t;
00128 t.type = DeleteResourceCollection;
00129 TaskList& queue = queueForTaskType( t.type );
00130 if ( queue.contains( t ) || mCurrentTask == t )
00131 return;
00132 queue << t;
00133 signalTaskToTracker( t, "DeleteResourceCollection" );
00134 scheduleNext();
00135 }
00136
00137 void ResourceScheduler::scheduleChangeReplay()
00138 {
00139 Task t;
00140 t.type = ChangeReplay;
00141 TaskList& queue = queueForTaskType( t.type );
00142
00143 if ( queue.contains( t ) )
00144 return;
00145 queue << t;
00146 signalTaskToTracker( t, "ChangeReplay" );
00147 scheduleNext();
00148 }
00149
00150 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
00151 {
00152 Task t;
00153 t.type = SyncAllDone;
00154 TaskList& queue = queueForTaskType( t.type );
00155
00156 queue << t;
00157 signalTaskToTracker( t, "SyncAllDone" );
00158 scheduleNext();
00159 }
00160
00161 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver, const char* methodName, const QVariant &argument, ResourceBase::SchedulePriority priority )
00162 {
00163 Task t;
00164 t.type = Custom;
00165 t.receiver = receiver;
00166 t.methodName = methodName;
00167 t.argument = argument;
00168 QueueType queueType = GenericTaskQueue;
00169 if ( priority == ResourceBase::AfterChangeReplay )
00170 queueType = AfterChangeReplayQueue;
00171 else if ( priority == ResourceBase::Prepend )
00172 queueType = PrependTaskQueue;
00173 TaskList& queue = mTaskList[ queueType ];
00174
00175 if ( queue.contains( t ) )
00176 return;
00177
00178 switch (priority) {
00179 case ResourceBase::Prepend:
00180 queue.prepend( t );
00181 break;
00182 default:
00183 queue.append(t);
00184 break;
00185 }
00186
00187 signalTaskToTracker( t, "Custom-" + t.methodName );
00188 scheduleNext();
00189 }
00190
00191 void ResourceScheduler::taskDone()
00192 {
00193 if ( isEmpty() )
00194 emit status( AgentBase::Idle, i18nc( "@info:status Application ready for work", "Ready" ) );
00195
00196 if ( s_resourcetracker ) {
00197 QList<QVariant> argumentList;
00198 argumentList << QString::number( mCurrentTask.serial )
00199 << QString();
00200 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00201 }
00202
00203 mCurrentTask = Task();
00204 mCurrentTasksQueue = -1;
00205 scheduleNext();
00206 }
00207
00208 void ResourceScheduler::deferTask()
00209 {
00210 if ( mCurrentTask.type == Invalid )
00211 return;
00212
00213 if ( s_resourcetracker ) {
00214 QList<QVariant> argumentList;
00215 argumentList << QString::number( mCurrentTask.serial )
00216 << QString();
00217 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00218 }
00219
00220 Task t = mCurrentTask;
00221 mCurrentTask = Task();
00222
00223 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
00224 mTaskList[mCurrentTasksQueue].prepend( t );
00225 mCurrentTasksQueue = -1;
00226
00227 signalTaskToTracker( t, "DeferedTask" );
00228
00229 scheduleNext();
00230 }
00231
00232 bool ResourceScheduler::isEmpty()
00233 {
00234 for ( int i = 0; i < NQueueCount; ++i ) {
00235 if ( !mTaskList[i].isEmpty() )
00236 return false;
00237 }
00238 return true;
00239 }
00240
00241 void ResourceScheduler::scheduleNext()
00242 {
00243 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
00244 return;
00245 QTimer::singleShot( 0, this, SLOT( executeNext() ) );
00246 }
00247
00248 void ResourceScheduler::executeNext()
00249 {
00250 if ( mCurrentTask.type != Invalid || isEmpty() )
00251 return;
00252
00253 for ( int i = 0; i < NQueueCount; ++i ) {
00254 if ( !mTaskList[ i ].isEmpty() ) {
00255 mCurrentTask = mTaskList[ i ].takeFirst();
00256 mCurrentTasksQueue = i;
00257 break;
00258 }
00259 }
00260
00261 if ( s_resourcetracker ) {
00262 QList<QVariant> argumentList;
00263 argumentList << QString::number( mCurrentTask.serial );
00264 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobStarted" ), argumentList);
00265 }
00266
00267 switch ( mCurrentTask.type ) {
00268 case SyncAll:
00269 emit executeFullSync();
00270 break;
00271 case SyncCollectionTree:
00272 emit executeCollectionTreeSync();
00273 break;
00274 case SyncCollection:
00275 emit executeCollectionSync( mCurrentTask.collection );
00276 break;
00277 case SyncCollectionAttributes:
00278 emit executeCollectionAttributesSync( mCurrentTask.collection );
00279 break;
00280 case FetchItem:
00281 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
00282 break;
00283 case DeleteResourceCollection:
00284 emit executeResourceCollectionDeletion();
00285 break;
00286 case ChangeReplay:
00287 emit executeChangeReplay();
00288 break;
00289 case SyncAllDone:
00290 emit fullSyncComplete();
00291 break;
00292 case Custom:
00293 {
00294 bool success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
00295 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(), "ResourceScheduler::executeNext", "Valid argument was provided but the method wasn't found" );
00296 if ( !success )
00297 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
00298
00299 if ( !success )
00300 kError() << "Could not invoke slot" << mCurrentTask.methodName << "on" << mCurrentTask.receiver << "with argument" << mCurrentTask.argument;
00301 break;
00302 }
00303 default: {
00304 kError() << "Unhandled task type" << mCurrentTask.type;
00305 dump();
00306 Q_ASSERT( false );
00307 }
00308 }
00309 }
00310
00311 ResourceScheduler::Task ResourceScheduler::currentTask() const
00312 {
00313 return mCurrentTask;
00314 }
00315
00316 void ResourceScheduler::setOnline(bool state)
00317 {
00318 if ( mOnline == state )
00319 return;
00320 mOnline = state;
00321 if ( mOnline ) {
00322 scheduleNext();
00323 } else {
00324 if ( mCurrentTask.type != Invalid ) {
00325
00326 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
00327 mCurrentTask = Task();
00328 mCurrentTasksQueue = -1;
00329 }
00330
00331 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
00332 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
00333 if ( (*it).type == FetchItem ) {
00334 (*it).sendDBusReplies( false );
00335 it = itemFetchQueue.erase( it );
00336 if ( s_resourcetracker ) {
00337 QList<QVariant> argumentList;
00338 argumentList << QString::number( mCurrentTask.serial )
00339 << QLatin1String( "Job canceled." );
00340 s_resourcetracker->asyncCallWithArgumentList( QLatin1String( "jobEnded" ), argumentList );
00341 }
00342 } else {
00343 ++it;
00344 }
00345 }
00346 }
00347 }
00348
00349 void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType )
00350 {
00351
00352 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
00353 s_resourcetracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
00354 QLatin1String( "/resourcesJobtracker" ),
00355 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
00356 DBusConnectionPool::threadConnection(), 0 );
00357 }
00358
00359 if ( s_resourcetracker ) {
00360 QList<QVariant> argumentList;
00361 argumentList << static_cast<AgentBase*>( parent() )->identifier()
00362 << QString::number( task.serial )
00363 << QString()
00364 << QString::fromLatin1( taskType );
00365 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobCreated" ), argumentList);
00366 }
00367 }
00368
00369 void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection )
00370 {
00371 if ( !collection.isValid() )
00372 return;
00373 TaskList& queue = queueForTaskType( SyncCollection );
00374 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
00375 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
00376 it = queue.erase( it );
00377 kDebug() << " erasing";
00378 } else
00379 ++it;
00380 }
00381 }
00382
00383 void ResourceScheduler::Task::sendDBusReplies( bool success )
00384 {
00385 Q_FOREACH( const QDBusMessage &msg, dbusMsgs ) {
00386 QDBusMessage reply( msg );
00387 reply << success;
00388 DBusConnectionPool::threadConnection().send( reply );
00389 }
00390 }
00391
00392 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
00393 {
00394 switch( type ) {
00395 case ChangeReplay:
00396 return ChangeReplayQueue;
00397 case FetchItem:
00398 return ItemFetchQueue;
00399 default:
00400 return GenericTaskQueue;
00401 }
00402 }
00403
00404 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
00405 {
00406 const QueueType qt = queueTypeForTaskType( type );
00407 return mTaskList[ qt ];
00408 }
00409
00410 void ResourceScheduler::dump()
00411 {
00412 kDebug() << "ResourceScheduler: Online:" << mOnline;
00413 kDebug() << " current task:" << mCurrentTask;
00414 for ( int i = 0; i < NQueueCount; ++i ) {
00415 const TaskList& queue = mTaskList[i];
00416 kDebug() << " queue" << i << queue.size() << "tasks:";
00417 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
00418 kDebug() << " " << (*it);
00419 }
00420 }
00421 }
00422
00423 void ResourceScheduler::clear()
00424 {
00425 kDebug() << "Clearing ResourceScheduler queues:";
00426 for ( int i = 0; i < NQueueCount; ++i ) {
00427 TaskList& queue = mTaskList[i];
00428 queue.clear();
00429 }
00430 mCurrentTask = Task();
00431 mCurrentTasksQueue = -1;
00432 }
00433
00434 void Akonadi::ResourceScheduler::cancelQueues()
00435 {
00436 for ( int i = 0; i < NQueueCount; ++i ) {
00437 TaskList& queue = mTaskList[i];
00438 if ( s_resourcetracker ) {
00439 foreach ( const Task &t, queue ) {
00440 QList<QVariant> argumentList;
00441 argumentList << QString::number( t.serial ) << QString();
00442 s_resourcetracker->asyncCallWithArgumentList(QLatin1String( "jobEnded" ), argumentList);
00443 }
00444 }
00445 queue.clear();
00446 }
00447 }
00448
00449 static const char s_taskTypes[][25] = {
00450 "Invalid",
00451 "SyncAll",
00452 "SyncCollectionTree",
00453 "SyncCollection",
00454 "FetchItem",
00455 "ChangeReplay",
00456 "DeleteResourceCollection",
00457 "SyncAllDone",
00458 "Custom"
00459 };
00460
00461 QDebug Akonadi::operator<<( QDebug d, const ResourceScheduler::Task& task )
00462 {
00463 d << task.serial << s_taskTypes[task.type];
00464 if ( task.type != ResourceScheduler::Invalid ) {
00465 if ( task.collection.id() != -1 )
00466 d << "collection" << task.collection.id();
00467 if ( task.item.id() != -1 )
00468 d << "item" << task.item.id();
00469 if ( !task.methodName.isEmpty() )
00470 d << task.methodName << task.argument;
00471 }
00472 return d;
00473 }
00474
00475
00476
00477 #include "resourcescheduler_p.moc"