20 #include "resourcescheduler_p.h"
22 #include "dbusconnectionpool.h"
23 #include "recursivemover_p.h"
28 #include <QtCore/QTimer>
29 #include <QtDBus/QDBusInterface>
30 #include <QtDBus/QDBusConnectionInterface>
31 #include <boost/graph/graph_concepts.hpp>
33 using namespace Akonadi;
35 qint64 ResourceScheduler::Task::latestSerial = 0;
36 static QDBusAbstractInterface *s_resourcetracker = 0;
40 ResourceScheduler::ResourceScheduler( QObject *parent ) :
42 mCurrentTasksQueue( -1 ),
47 void ResourceScheduler::scheduleFullSync()
51 TaskList& queue = queueForTaskType( t.type );
52 if ( queue.contains( t ) || mCurrentTask == t )
55 signalTaskToTracker( t,
"SyncAll" );
59 void ResourceScheduler::scheduleCollectionTreeSync()
62 t.type = SyncCollectionTree;
63 TaskList& queue = queueForTaskType( t.type );
64 if ( queue.contains( t ) || mCurrentTask == t )
67 signalTaskToTracker( t,
"SyncCollectionTree" );
71 void ResourceScheduler::scheduleSync(
const Collection & col)
74 t.type = SyncCollection;
76 TaskList& queue = queueForTaskType( t.type );
77 if ( queue.contains( t ) || mCurrentTask == t )
80 signalTaskToTracker( t,
"SyncCollection" );
84 void ResourceScheduler::scheduleAttributesSync(
const Collection &collection )
87 t.type = SyncCollectionAttributes;
88 t.collection = collection;
90 TaskList& queue = queueForTaskType( t.type );
91 if ( queue.contains( t ) || mCurrentTask == t )
94 signalTaskToTracker( t,
"SyncCollectionAttributes" );
98 void ResourceScheduler::scheduleItemFetch(
const Item & item,
const QSet<QByteArray> &parts,
const QDBusMessage & msg)
107 if ( mCurrentTask == t ) {
108 mCurrentTask.dbusMsgs << msg;
113 TaskList& queue = queueForTaskType( t.type );
114 const int idx = queue.indexOf( t );
116 queue[ idx ].dbusMsgs << msg;
122 signalTaskToTracker( t,
"FetchItem" );
126 void ResourceScheduler::scheduleResourceCollectionDeletion()
129 t.type = DeleteResourceCollection;
130 TaskList& queue = queueForTaskType( t.type );
131 if ( queue.contains( t ) || mCurrentTask == t )
134 signalTaskToTracker( t,
"DeleteResourceCollection" );
138 void ResourceScheduler::scheduleCacheInvalidation(
const Collection &collection )
141 t.type = InvalideCacheForCollection;
142 t.collection = collection;
143 TaskList& queue = queueForTaskType( t.type );
144 if ( queue.contains( t ) || mCurrentTask == t )
147 signalTaskToTracker( t,
"InvalideCacheForCollection" );
151 void ResourceScheduler::scheduleChangeReplay()
154 t.type = ChangeReplay;
155 TaskList& queue = queueForTaskType( t.type );
157 if ( queue.contains( t ) )
160 signalTaskToTracker( t,
"ChangeReplay" );
167 t.type = RecursiveMoveReplay;
168 t.collection = movedCollection;
169 t.argument = QVariant::fromValue( mover );
170 TaskList &queue = queueForTaskType( t.type );
172 if ( queue.contains( t ) || mCurrentTask == t )
176 signalTaskToTracker( t,
"RecursiveMoveReplay" );
180 void Akonadi::ResourceScheduler::scheduleFullSyncCompletion()
183 t.type = SyncAllDone;
184 TaskList& queue = queueForTaskType( t.type );
187 signalTaskToTracker( t,
"SyncAllDone" );
191 void Akonadi::ResourceScheduler::scheduleCollectionTreeSyncCompletion()
194 t.type = SyncCollectionTreeDone;
195 TaskList& queue = queueForTaskType( t.type );
198 signalTaskToTracker( t,
"SyncCollectionTreeDone" );
202 void Akonadi::ResourceScheduler::scheduleCustomTask( QObject *receiver,
const char* methodName,
const QVariant &argument,
ResourceBase::SchedulePriority priority )
206 t.receiver = receiver;
207 t.methodName = methodName;
208 t.argument = argument;
209 QueueType queueType = GenericTaskQueue;
211 queueType = AfterChangeReplayQueue;
213 queueType = PrependTaskQueue;
214 TaskList& queue = mTaskList[ queueType ];
216 if ( queue.contains( t ) )
228 signalTaskToTracker( t,
"Custom-" + t.methodName );
232 void ResourceScheduler::taskDone()
235 emit status(
AgentBase::Idle, i18nc(
"@info:status Application ready for work",
"Ready" ) );
237 if ( s_resourcetracker ) {
238 QList<QVariant> argumentList;
239 argumentList << QString::number( mCurrentTask.serial )
241 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
244 mCurrentTask = Task();
245 mCurrentTasksQueue = -1;
249 void ResourceScheduler::deferTask()
251 if ( mCurrentTask.type == Invalid )
254 if ( s_resourcetracker ) {
255 QList<QVariant> argumentList;
256 argumentList << QString::number( mCurrentTask.serial )
258 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
261 Task t = mCurrentTask;
262 mCurrentTask = Task();
264 Q_ASSERT( mCurrentTasksQueue >= 0 && mCurrentTasksQueue < NQueueCount );
265 mTaskList[mCurrentTasksQueue].prepend( t );
266 mCurrentTasksQueue = -1;
268 signalTaskToTracker( t,
"DeferedTask" );
273 bool ResourceScheduler::isEmpty()
275 for (
int i = 0; i < NQueueCount; ++i ) {
276 if ( !mTaskList[i].isEmpty() )
282 void ResourceScheduler::scheduleNext()
284 if ( mCurrentTask.type != Invalid || isEmpty() || !mOnline )
286 QTimer::singleShot( 0,
this, SLOT(executeNext()) );
289 void ResourceScheduler::executeNext()
291 if ( mCurrentTask.type != Invalid || isEmpty() )
294 for (
int i = 0; i < NQueueCount; ++i ) {
295 if ( !mTaskList[ i ].isEmpty() ) {
296 mCurrentTask = mTaskList[ i ].takeFirst();
297 mCurrentTasksQueue = i;
302 if ( s_resourcetracker ) {
303 QList<QVariant> argumentList;
304 argumentList << QString::number( mCurrentTask.serial );
305 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobStarted" ), argumentList);
308 switch ( mCurrentTask.type ) {
310 emit executeFullSync();
312 case SyncCollectionTree:
313 emit executeCollectionTreeSync();
316 emit executeCollectionSync( mCurrentTask.collection );
318 case SyncCollectionAttributes:
319 emit executeCollectionAttributesSync( mCurrentTask.collection );
322 emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts );
324 case DeleteResourceCollection:
325 emit executeResourceCollectionDeletion();
327 case InvalideCacheForCollection:
328 emit executeCacheInvalidation( mCurrentTask.collection );
331 emit executeChangeReplay();
333 case RecursiveMoveReplay:
334 emit executeRecursiveMoveReplay( mCurrentTask.argument.value<
RecursiveMover*>() );
337 emit fullSyncComplete();
339 case SyncCollectionTreeDone:
340 emit collectionTreeSyncComplete();
344 const QByteArray methodSig = mCurrentTask.methodName +
"(QVariant)";
345 const bool hasSlotWithVariant = mCurrentTask.receiver->metaObject()->indexOfMethod(methodSig) != -1;
346 bool success =
false;
347 if ( hasSlotWithVariant ) {
348 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName, Q_ARG(QVariant, mCurrentTask.argument) );
349 Q_ASSERT_X( success || !mCurrentTask.argument.isValid(),
"ResourceScheduler::executeNext",
"Valid argument was provided but the method wasn't found" );
352 success = QMetaObject::invokeMethod( mCurrentTask.receiver, mCurrentTask.methodName );
355 kError() <<
"Could not invoke slot" << mCurrentTask.methodName <<
"on" << mCurrentTask.receiver <<
"with argument" << mCurrentTask.argument;
359 kError() <<
"Unhandled task type" << mCurrentTask.type;
366 ResourceScheduler::Task ResourceScheduler::currentTask()
const
371 void ResourceScheduler::setOnline(
bool state)
373 if ( mOnline == state )
379 if ( mCurrentTask.type != Invalid ) {
381 queueForTaskType( mCurrentTask.type ).prepend( mCurrentTask );
382 mCurrentTask = Task();
383 mCurrentTasksQueue = -1;
386 TaskList& itemFetchQueue = queueForTaskType( FetchItem );
387 for ( QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end(); ) {
388 if ( (*it).type == FetchItem ) {
389 (*it).sendDBusReplies( i18nc(
"@info",
"Job canceled." ) );
390 it = itemFetchQueue.erase( it );
391 if ( s_resourcetracker ) {
392 QList<QVariant> argumentList;
393 argumentList << QString::number( mCurrentTask.serial )
394 << i18nc(
"@info",
"Job canceled." );
395 s_resourcetracker->asyncCallWithArgumentList( QLatin1String(
"jobEnded" ), argumentList );
404 void ResourceScheduler::signalTaskToTracker(
const Task &task,
const QByteArray &taskType )
407 if ( !s_resourcetracker && DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
408 s_resourcetracker =
new QDBusInterface( QLatin1String(
"org.kde.akonadiconsole" ),
409 QLatin1String(
"/resourcesJobtracker" ),
410 QLatin1String(
"org.freedesktop.Akonadi.JobTracker" ),
411 DBusConnectionPool::threadConnection(), 0 );
414 if ( s_resourcetracker ) {
415 QList<QVariant> argumentList;
416 argumentList << static_cast<AgentBase*>( parent() )->identifier()
417 << QString::number( task.serial )
419 << QString::fromLatin1( taskType );
420 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobCreated" ), argumentList);
428 TaskList& queue = queueForTaskType( SyncCollection );
429 for ( QList<Task>::iterator it = queue.begin(); it != queue.end(); ) {
430 if ( (*it).type == SyncCollection && (*it).collection == collection ) {
431 it = queue.erase( it );
432 kDebug() <<
" erasing";
438 void ResourceScheduler::Task::sendDBusReplies(
const QString &errorMsg )
440 Q_FOREACH(
const QDBusMessage &msg, dbusMsgs ) {
441 QDBusMessage reply( msg.createReply() );
442 const QString methodName = msg.member();
443 if (methodName == QLatin1String(
"requestItemDelivery")) {
444 reply << errorMsg.isEmpty();
445 }
else if (methodName == QLatin1String(
"requestItemDeliveryV2")) {
447 }
else if (methodName.isEmpty()) {
450 kFatal() <<
"Got unexpected member:" << methodName;
452 DBusConnectionPool::threadConnection().send( reply );
456 ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType( TaskType type )
460 case RecursiveMoveReplay:
461 return ChangeReplayQueue;
463 return ItemFetchQueue;
465 return GenericTaskQueue;
469 ResourceScheduler::TaskList& ResourceScheduler::queueForTaskType( TaskType type )
471 const QueueType qt = queueTypeForTaskType( type );
472 return mTaskList[ qt ];
475 void ResourceScheduler::dump()
477 kDebug() << dumpToString();
480 QString ResourceScheduler::dumpToString()
const
483 QTextStream str( &ret );
484 str <<
"ResourceScheduler: " << (mOnline?
"Online":
"Offline") << endl;
485 str <<
" current task: " << mCurrentTask << endl;
486 for (
int i = 0; i < NQueueCount; ++i ) {
487 const TaskList& queue = mTaskList[i];
488 if (queue.isEmpty()) {
489 str <<
" queue " << i <<
" is empty" << endl;
491 str <<
" queue " << i <<
" " << queue.size() <<
" tasks:" << endl;
492 for ( QList<Task>::const_iterator it = queue.begin(); it != queue.end(); ++it ) {
493 str <<
" " << (*it) << endl;
500 void ResourceScheduler::clear()
502 kDebug() <<
"Clearing ResourceScheduler queues:";
503 for (
int i = 0; i < NQueueCount; ++i ) {
504 TaskList& queue = mTaskList[i];
507 mCurrentTask = Task();
508 mCurrentTasksQueue = -1;
511 void Akonadi::ResourceScheduler::cancelQueues()
513 for (
int i = 0; i < NQueueCount; ++i ) {
514 TaskList& queue = mTaskList[i];
515 if ( s_resourcetracker ) {
516 foreach (
const Task &t, queue ) {
517 QList<QVariant> argumentList;
518 argumentList << QString::number( t.serial ) << QString();
519 s_resourcetracker->asyncCallWithArgumentList(QLatin1String(
"jobEnded" ), argumentList);
526 static const char s_taskTypes[][27] = {
529 "SyncCollectionTree",
531 "SyncCollectionAttributes",
534 "RecursiveMoveReplay",
535 "DeleteResourceCollection",
536 "InvalideCacheForCollection",
538 "SyncCollectionTreeDone",
542 QTextStream& Akonadi::operator<<( QTextStream& d,
const ResourceScheduler::Task& task )
544 d << task.serial <<
" " << s_taskTypes[task.type] <<
" ";
545 if ( task.type != ResourceScheduler::Invalid ) {
546 if ( task.collection.isValid() )
547 d <<
"collection " << task.collection.id() <<
" ";
548 if ( task.item.id() != -1 )
549 d <<
"item " << task.item.id() <<
" ";
550 if ( !task.methodName.isEmpty() )
551 d << task.methodName <<
" " << task.argument.toString();
556 QDebug Akonadi::operator<<( QDebug d,
const ResourceScheduler::Task& task )
559 QTextStream str( &s );
567 #include "moc_resourcescheduler_p.cpp"