00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "resourcebase.h"
00022 #include "agentbase_p.h"
00023
00024 #include "resourceadaptor.h"
00025 #include "collectiondeletejob.h"
00026 #include "collectionsync_p.h"
00027 #include "dbusconnectionpool.h"
00028 #include "itemsync.h"
00029 #include "resourcescheduler_p.h"
00030 #include "tracerinterface.h"
00031 #include "xdgbasedirs_p.h"
00032
00033 #include "changerecorder.h"
00034 #include "collectionfetchjob.h"
00035 #include "collectionfetchscope.h"
00036 #include "collectionmodifyjob.h"
00037 #include "itemfetchjob.h"
00038 #include "itemfetchscope.h"
00039 #include "itemmodifyjob.h"
00040 #include "itemmodifyjob_p.h"
00041 #include "session.h"
00042 #include "resourceselectjob_p.h"
00043 #include "monitor_p.h"
00044 #include "servermanager_p.h"
00045
00046 #include <kaboutdata.h>
00047 #include <kcmdlineargs.h>
00048 #include <kdebug.h>
00049 #include <klocale.h>
00050
00051 #include <QtCore/QDebug>
00052 #include <QtCore/QDir>
00053 #include <QtCore/QHash>
00054 #include <QtCore/QSettings>
00055 #include <QtCore/QTimer>
00056 #include <QtGui/QApplication>
00057 #include <QtDBus/QtDBus>
00058
00059 using namespace Akonadi;
00060
00061 class Akonadi::ResourceBasePrivate : public AgentBasePrivate
00062 {
00063 Q_OBJECT
00064 Q_CLASSINFO( "D-Bus Interface", "org.kde.dfaure" )
00065
00066 public:
00067 ResourceBasePrivate( ResourceBase *parent )
00068 : AgentBasePrivate( parent ),
00069 scheduler( 0 ),
00070 mItemSyncer( 0 ),
00071 mItemSyncFetchScope( 0 ),
00072 mItemTransactionMode( ItemSync::SingleTransaction ),
00073 mCollectionSyncer( 0 ),
00074 mHierarchicalRid( false ),
00075 mUnemittedProgress( 0 )
00076 {
00077 Internal::setClientType( Internal::Resource );
00078 mStatusMessage = defaultReadyMessage();
00079 mProgressEmissionCompressor.setInterval( 1000 );
00080 mProgressEmissionCompressor.setSingleShot( true );
00081 }
00082
00083 ~ResourceBasePrivate()
00084 {
00085 delete mItemSyncFetchScope;
00086 }
00087
00088 Q_DECLARE_PUBLIC( ResourceBase )
00089
00090 void delayedInit()
00091 {
00092 if ( !DBusConnectionPool::threadConnection().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) {
00093 QString reason = DBusConnectionPool::threadConnection().lastError().message();
00094 if ( reason.isEmpty() ) {
00095 reason = QString::fromLatin1( "this service is probably running already." );
00096 }
00097 kError() << "Unable to register service at D-Bus: " << reason;
00098
00099 if ( QThread::currentThread() == QCoreApplication::instance()->thread() )
00100 QCoreApplication::instance()->exit(1);
00101
00102 } else {
00103 AgentBasePrivate::delayedInit();
00104 }
00105 }
00106
00107 virtual void changeProcessed()
00108 {
00109 mChangeRecorder->changeProcessed();
00110 if ( !mChangeRecorder->isEmpty() )
00111 scheduler->scheduleChangeReplay();
00112 scheduler->taskDone();
00113 }
00114
00115 void slotAbortRequested();
00116
00117 void slotDeliveryDone( KJob* job );
00118 void slotCollectionSyncDone( KJob *job );
00119 void slotLocalListDone( KJob *job );
00120 void slotSynchronizeCollection( const Collection &col );
00121 void slotCollectionListDone( KJob *job );
00122 void slotSynchronizeCollectionAttributes( const Collection &col );
00123 void slotCollectionListForAttributesDone( KJob *job );
00124 void slotCollectionAttributesSyncDone( KJob *job );
00125
00126 void slotItemSyncDone( KJob *job );
00127
00128 void slotPercent( KJob* job, unsigned long percent );
00129 void slotDelayedEmitProgress();
00130 void slotDeleteResourceCollection();
00131 void slotDeleteResourceCollectionDone( KJob *job );
00132 void slotCollectionDeletionDone( KJob *job );
00133
00134 void slotPrepareItemRetrieval( const Akonadi::Item &item );
00135 void slotPrepareItemRetrievalResult( KJob* job );
00136
00137 void changeCommittedResult( KJob* job );
00138
00139 void slotSessionReconnected()
00140 {
00141 Q_Q( ResourceBase );
00142
00143 new ResourceSelectJob( q->identifier() );
00144 }
00145
00146 void createItemSyncInstanceIfMissing()
00147 {
00148 Q_Q( ResourceBase );
00149 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::SyncCollection,
00150 "createItemSyncInstance", "Calling items retrieval methods although no item retrieval is in progress" );
00151 if ( !mItemSyncer ) {
00152 mItemSyncer = new ItemSync( q->currentCollection() );
00153 mItemSyncer->setTransactionMode( mItemTransactionMode );
00154 if ( mItemSyncFetchScope )
00155 mItemSyncer->setFetchScope( *mItemSyncFetchScope );
00156 mItemSyncer->setProperty( "collection", QVariant::fromValue( q->currentCollection() ) );
00157 connect( mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), q, SLOT( slotPercent( KJob*, unsigned long ) ) );
00158 connect( mItemSyncer, SIGNAL( result( KJob* ) ), q, SLOT( slotItemSyncDone( KJob* ) ) );
00159 }
00160 Q_ASSERT( mItemSyncer );
00161 }
00162
00163 public Q_SLOTS:
00164 Q_SCRIPTABLE void dump()
00165 {
00166 scheduler->dump();
00167 }
00168
00169 Q_SCRIPTABLE void clear()
00170 {
00171 scheduler->clear();
00172 }
00173
00174 public:
00175
00176 Collection currentCollection;
00177
00178 ResourceScheduler *scheduler;
00179 ItemSync *mItemSyncer;
00180 ItemFetchScope *mItemSyncFetchScope;
00181 ItemSync::TransactionMode mItemTransactionMode;
00182 CollectionSync *mCollectionSyncer;
00183 bool mHierarchicalRid;
00184 QTimer mProgressEmissionCompressor;
00185 int mUnemittedProgress;
00186 QMap<Akonadi::Collection::Id, QVariantMap> mUnemittedAdvancedStatus;
00187 };
00188
00189 ResourceBase::ResourceBase( const QString & id )
00190 : AgentBase( new ResourceBasePrivate( this ), id )
00191 {
00192 Q_D( ResourceBase );
00193
00194 new Akonadi__ResourceAdaptor( this );
00195
00196 d->scheduler = new ResourceScheduler( this );
00197
00198 d->mChangeRecorder->setChangeRecordingEnabled( true );
00199 connect( d->mChangeRecorder, SIGNAL( changesAdded() ),
00200 d->scheduler, SLOT( scheduleChangeReplay() ) );
00201
00202 d->mChangeRecorder->setResourceMonitored( d->mId.toLatin1() );
00203
00204 connect( d->scheduler, SIGNAL( executeFullSync() ),
00205 SLOT( retrieveCollections() ) );
00206 connect( d->scheduler, SIGNAL( executeCollectionTreeSync() ),
00207 SLOT( retrieveCollections() ) );
00208 connect( d->scheduler, SIGNAL( executeCollectionSync( const Akonadi::Collection& ) ),
00209 SLOT( slotSynchronizeCollection( const Akonadi::Collection& ) ) );
00210 connect( d->scheduler, SIGNAL( executeCollectionAttributesSync( const Akonadi::Collection& ) ),
00211 SLOT( slotSynchronizeCollectionAttributes(Akonadi::Collection)) );
00212 connect( d->scheduler, SIGNAL( executeItemFetch( const Akonadi::Item&, const QSet<QByteArray>& ) ),
00213 SLOT( slotPrepareItemRetrieval(Akonadi::Item)) );
00214 connect( d->scheduler, SIGNAL( executeResourceCollectionDeletion() ),
00215 SLOT( slotDeleteResourceCollection() ) );
00216 connect( d->scheduler, SIGNAL( status( int, const QString& ) ),
00217 SIGNAL( status( int, const QString& ) ) );
00218 connect( d->scheduler, SIGNAL( executeChangeReplay() ),
00219 d->mChangeRecorder, SLOT( replayNext() ) );
00220 connect( d->scheduler, SIGNAL( fullSyncComplete() ), SIGNAL( synchronized() ) );
00221 connect( d->mChangeRecorder, SIGNAL( nothingToReplay() ), d->scheduler, SLOT( taskDone() ) );
00222 connect( d->mChangeRecorder, SIGNAL( collectionRemoved( const Akonadi::Collection& ) ),
00223 d->scheduler, SLOT( collectionRemoved( const Akonadi::Collection& ) ) );
00224 connect( this, SIGNAL( abortRequested() ), this, SLOT( slotAbortRequested() ) );
00225 connect( this, SIGNAL( synchronized() ), d->scheduler, SLOT( taskDone() ) );
00226 connect( this, SIGNAL( agentNameChanged( const QString& ) ),
00227 this, SIGNAL( nameChanged( const QString& ) ) );
00228
00229 connect( &d->mProgressEmissionCompressor, SIGNAL( timeout() ),
00230 this, SLOT( slotDelayedEmitProgress() ) );
00231
00232 d->scheduler->setOnline( d->mOnline );
00233 if ( !d->mChangeRecorder->isEmpty() )
00234 d->scheduler->scheduleChangeReplay();
00235
00236 DBusConnectionPool::threadConnection().registerObject( QLatin1String( "/Debug" ), d, QDBusConnection::ExportScriptableSlots );
00237
00238 new ResourceSelectJob( identifier() );
00239
00240 connect( d->mChangeRecorder->session(), SIGNAL( reconnected() ), SLOT( slotSessionReconnected() ) );
00241 }
00242
00243 ResourceBase::~ResourceBase()
00244 {
00245 }
00246
00247 void ResourceBase::synchronize()
00248 {
00249 d_func()->scheduler->scheduleFullSync();
00250 }
00251
00252 void ResourceBase::setName( const QString &name )
00253 {
00254 AgentBase::setAgentName( name );
00255 }
00256
00257 QString ResourceBase::name() const
00258 {
00259 return AgentBase::agentName();
00260 }
00261
00262 QString ResourceBase::parseArguments( int argc, char **argv )
00263 {
00264 QString identifier;
00265 if ( argc < 3 ) {
00266 kDebug() << "Not enough arguments passed...";
00267 exit( 1 );
00268 }
00269
00270 for ( int i = 1; i < argc - 1; ++i ) {
00271 if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) )
00272 identifier = QLatin1String( argv[ i + 1 ] );
00273 }
00274
00275 if ( identifier.isEmpty() ) {
00276 kDebug() << "Identifier argument missing";
00277 exit( 1 );
00278 }
00279
00280 QByteArray catalog;
00281 char *p = strrchr( argv[0], '/' );
00282 if ( p )
00283 catalog = QByteArray( p + 1 );
00284 else
00285 catalog = QByteArray( argv[0] );
00286
00287 KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog,
00288 ki18nc( "@title application name", "Akonadi Resource" ), "0.1",
00289 ki18nc( "@title application description", "Akonadi Resource" ) );
00290
00291 KCmdLineOptions options;
00292 options.add( "identifier <argument>",
00293 ki18nc( "@label commandline option", "Resource identifier" ) );
00294 KCmdLineArgs::addCmdLineOptions( options );
00295
00296 return identifier;
00297 }
00298
00299 int ResourceBase::init( ResourceBase *r )
00300 {
00301 QApplication::setQuitOnLastWindowClosed( false );
00302 int rv = kapp->exec();
00303 delete r;
00304 return rv;
00305 }
00306
00307 void ResourceBasePrivate::slotAbortRequested()
00308 {
00309 Q_Q( ResourceBase );
00310
00311 scheduler->cancelQueues();
00312 QMetaObject::invokeMethod( q, "abortActivity" );
00313 }
00314
00315 void ResourceBase::itemRetrieved( const Item &item )
00316 {
00317 Q_D( ResourceBase );
00318 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem );
00319 if ( !item.isValid() ) {
00320 d->scheduler->currentTask().sendDBusReplies( false );
00321 d->scheduler->taskDone();
00322 return;
00323 }
00324
00325 Item i( item );
00326 QSet<QByteArray> requestedParts = d->scheduler->currentTask().itemParts;
00327 foreach ( const QByteArray &part, requestedParts ) {
00328 if ( !item.loadedPayloadParts().contains( part ) ) {
00329 kWarning() << "Item does not provide part" << part;
00330 }
00331 }
00332
00333 ItemModifyJob *job = new ItemModifyJob( i );
00334
00335 job->disableRevisionCheck();
00336 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotDeliveryDone( KJob* ) ) );
00337 }
00338
00339 void ResourceBasePrivate::slotDeliveryDone(KJob * job)
00340 {
00341 Q_Q( ResourceBase );
00342 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem );
00343 if ( job->error() ) {
00344 emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() );
00345 }
00346 scheduler->currentTask().sendDBusReplies( !job->error() );
00347 scheduler->taskDone();
00348 }
00349
00350 void ResourceBase::collectionAttributesRetrieved( const Collection &collection )
00351 {
00352 Q_D( ResourceBase );
00353 Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes );
00354 if ( !collection.isValid() ) {
00355 emit attributesSynchronized( d->scheduler->currentTask().collection.id() );
00356 d->scheduler->taskDone();
00357 return;
00358 }
00359
00360 CollectionModifyJob *job = new CollectionModifyJob( collection );
00361 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionAttributesSyncDone( KJob* ) ) );
00362 }
00363
00364 void ResourceBasePrivate::slotCollectionAttributesSyncDone(KJob * job)
00365 {
00366 Q_Q( ResourceBase );
00367 Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::SyncCollectionAttributes );
00368 if ( job->error() ) {
00369 emit q->error( QLatin1String( "Error while updating collection: " ) + job->errorString() );
00370 }
00371 emit q->attributesSynchronized( scheduler->currentTask().collection.id() );
00372 scheduler->taskDone();
00373 }
00374
00375 void ResourceBasePrivate::slotDeleteResourceCollection()
00376 {
00377 Q_Q( ResourceBase );
00378
00379 CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel );
00380 job->fetchScope().setResource( q->identifier() );
00381 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotDeleteResourceCollectionDone( KJob* ) ) );
00382 }
00383
00384 void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job )
00385 {
00386 Q_Q( ResourceBase );
00387 if ( job->error() ) {
00388 emit q->error( job->errorString() );
00389 scheduler->taskDone();
00390 } else {
00391 const CollectionFetchJob *fetchJob = static_cast<const CollectionFetchJob*>( job );
00392
00393 if ( !fetchJob->collections().isEmpty() ) {
00394 CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() );
00395 connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotCollectionDeletionDone( KJob* ) ) );
00396 } else {
00397
00398 scheduler->taskDone();
00399 }
00400 }
00401 }
00402
00403 void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job )
00404 {
00405 Q_Q( ResourceBase );
00406 if ( job->error() ) {
00407 emit q->error( job->errorString() );
00408 }
00409
00410 scheduler->taskDone();
00411 }
00412
00413 void ResourceBase::changeCommitted( const Item& item )
00414 {
00415 Q_D( ResourceBase );
00416 ItemModifyJob *job = new ItemModifyJob( item );
00417 job->d_func()->setClean();
00418 job->disableRevisionCheck();
00419 job->setIgnorePayload( true );
00420 d->changeProcessed();
00421 }
00422
00423 void ResourceBase::changeCommitted( const Collection &collection )
00424 {
00425 CollectionModifyJob *job = new CollectionModifyJob( collection );
00426 connect( job, SIGNAL( result( KJob* ) ), SLOT( changeCommittedResult( KJob* ) ) );
00427 }
00428
00429 void ResourceBasePrivate::changeCommittedResult( KJob *job )
00430 {
00431 Q_Q( ResourceBase );
00432 if ( job->error() )
00433 emit q->error( i18nc( "@info", "Updating local collection failed: %1.", job->errorText() ) );
00434 mChangeRecorder->d_ptr->invalidateCache( static_cast<CollectionModifyJob*>( job )->collection() );
00435 changeProcessed();
00436 }
00437
00438 bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId,
00439 const QString &mimeType, const QStringList &_parts )
00440 {
00441 Q_D( ResourceBase );
00442 if ( !isOnline() ) {
00443 emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) );
00444 return false;
00445 }
00446
00447 setDelayedReply( true );
00448
00449 Item item( uid );
00450 item.setMimeType( mimeType );
00451 item.setRemoteId( remoteId );
00452
00453 QSet<QByteArray> parts;
00454 Q_FOREACH( const QString &str, _parts )
00455 parts.insert( str.toLatin1() );
00456
00457 d->scheduler->scheduleItemFetch( item, parts, message().createReply() );
00458
00459 return true;
00460 }
00461
00462 void ResourceBase::collectionsRetrieved( const Collection::List & collections )
00463 {
00464 Q_D( ResourceBase );
00465 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00466 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00467 "ResourceBase::collectionsRetrieved()",
00468 "Calling collectionsRetrieved() although no collection retrieval is in progress" );
00469 if ( !d->mCollectionSyncer ) {
00470 d->mCollectionSyncer = new CollectionSync( identifier() );
00471 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid );
00472 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00473 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00474 }
00475 d->mCollectionSyncer->setRemoteCollections( collections );
00476 }
00477
00478 void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections,
00479 const Collection::List & removedCollections )
00480 {
00481 Q_D( ResourceBase );
00482 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00483 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00484 "ResourceBase::collectionsRetrievedIncremental()",
00485 "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" );
00486 if ( !d->mCollectionSyncer ) {
00487 d->mCollectionSyncer = new CollectionSync( identifier() );
00488 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid );
00489 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00490 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00491 }
00492 d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections );
00493 }
00494
00495 void ResourceBase::setCollectionStreamingEnabled( bool enable )
00496 {
00497 Q_D( ResourceBase );
00498 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00499 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00500 "ResourceBase::setCollectionStreamingEnabled()",
00501 "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" );
00502 if ( !d->mCollectionSyncer ) {
00503 d->mCollectionSyncer = new CollectionSync( identifier() );
00504 d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid );
00505 connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) );
00506 connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) );
00507 }
00508 d->mCollectionSyncer->setStreamingEnabled( enable );
00509 }
00510
00511 void ResourceBase::collectionsRetrievalDone()
00512 {
00513 Q_D( ResourceBase );
00514 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree ||
00515 d->scheduler->currentTask().type == ResourceScheduler::SyncAll,
00516 "ResourceBase::collectionsRetrievalDone()",
00517 "Calling collectionsRetrievalDone() although no collection retrieval is in progress" );
00518
00519 if ( d->mCollectionSyncer ) {
00520 d->mCollectionSyncer->retrievalDone();
00521 }
00522
00523 else {
00524
00525 d->scheduler->taskDone();
00526 }
00527 }
00528
00529 void ResourceBasePrivate::slotCollectionSyncDone( KJob * job )
00530 {
00531 Q_Q( ResourceBase );
00532 mCollectionSyncer = 0;
00533 if ( job->error() ) {
00534 if ( job->error() != Job::UserCanceled )
00535 emit q->error( job->errorString() );
00536 } else {
00537 if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) {
00538 CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive );
00539 list->setFetchScope( q->changeRecorder()->collectionFetchScope() );
00540 list->fetchScope().setResource( mId );
00541 q->connect( list, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalListDone( KJob* ) ) );
00542 return;
00543 }
00544 }
00545 scheduler->taskDone();
00546 }
00547
00548 void ResourceBasePrivate::slotLocalListDone( KJob * job )
00549 {
00550 Q_Q( ResourceBase );
00551 if ( job->error() ) {
00552 emit q->error( job->errorString() );
00553 } else {
00554 Collection::List cols = static_cast<CollectionFetchJob*>( job )->collections();
00555 foreach ( const Collection &col, cols ) {
00556 scheduler->scheduleSync( col );
00557 }
00558 scheduler->scheduleFullSyncCompletion();
00559 }
00560 scheduler->taskDone();
00561 }
00562
00563 void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col )
00564 {
00565 Q_Q( ResourceBase );
00566 currentCollection = col;
00567
00568 QStringList contentTypes = currentCollection.contentMimeTypes();
00569 contentTypes.removeAll( Collection::mimeType() );
00570 if ( !contentTypes.isEmpty() || (col.rights() & (Collection::CanLinkItem)) ) {
00571 emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) );
00572 q->retrieveItems( currentCollection );
00573 return;
00574 }
00575 scheduler->taskDone();
00576 }
00577
00578 void ResourceBasePrivate::slotSynchronizeCollectionAttributes( const Collection &col )
00579 {
00580 Q_Q( ResourceBase );
00581 QMetaObject::invokeMethod( q, "retrieveCollectionAttributes", Q_ARG( Akonadi::Collection, col ) );
00582 }
00583
00584 void ResourceBasePrivate::slotPrepareItemRetrieval( const Akonadi::Item &item )
00585 {
00586 Q_Q( ResourceBase );
00587 ItemFetchJob *fetch = new ItemFetchJob( item, this );
00588 fetch->fetchScope().setAncestorRetrieval( q->changeRecorder()->itemFetchScope().ancestorRetrieval() );
00589 fetch->fetchScope().setCacheOnly( true );
00590
00591
00592 const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
00593 foreach ( const QByteArray &attribute, attributes )
00594 fetch->fetchScope().fetchAttribute( attribute );
00595
00596 q->connect( fetch, SIGNAL( result( KJob* ) ), SLOT( slotPrepareItemRetrievalResult( KJob* ) ) );
00597 }
00598
00599 void ResourceBasePrivate::slotPrepareItemRetrievalResult( KJob* job )
00600 {
00601 Q_Q( ResourceBase );
00602 Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::FetchItem,
00603 "ResourceBasePrivate::slotPrepareItemRetrievalResult()",
00604 "Preparing item retrieval although no item retrieval is in progress" );
00605 if ( job->error() ) {
00606 q->cancelTask( job->errorText() );
00607 return;
00608 }
00609 ItemFetchJob *fetch = qobject_cast<ItemFetchJob*>( job );
00610 if ( fetch->items().count() != 1 ) {
00611 q->cancelTask( i18n( "The requested item no longer exists" ) );
00612 return;
00613 }
00614 const Item item = fetch->items().first();
00615 const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
00616 if ( !q->retrieveItem( item, parts ) )
00617 q->cancelTask();
00618 }
00619
00620 void ResourceBase::itemsRetrievalDone()
00621 {
00622 Q_D( ResourceBase );
00623
00624 if ( d->mItemSyncer ) {
00625 d->mItemSyncer->deliveryDone();
00626 }
00627
00628 else {
00629 d->scheduler->taskDone();
00630 }
00631 }
00632
00633 void ResourceBase::clearCache()
00634 {
00635 Q_D( ResourceBase );
00636 d->scheduler->scheduleResourceCollectionDeletion();
00637 }
00638
00639 Collection ResourceBase::currentCollection() const
00640 {
00641 Q_D( const ResourceBase );
00642 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection ,
00643 "ResourceBase::currentCollection()",
00644 "Trying to access current collection although no item retrieval is in progress" );
00645 return d->currentCollection;
00646 }
00647
00648 Item ResourceBase::currentItem() const
00649 {
00650 Q_D( const ResourceBase );
00651 Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ,
00652 "ResourceBase::currentItem()",
00653 "Trying to access current item although no item retrieval is in progress" );
00654 return d->scheduler->currentTask().item;
00655 }
00656
00657 void ResourceBase::synchronizeCollectionTree()
00658 {
00659 d_func()->scheduler->scheduleCollectionTreeSync();
00660 }
00661
00662 void ResourceBase::cancelTask()
00663 {
00664 Q_D( ResourceBase );
00665 switch ( d->scheduler->currentTask().type ) {
00666 case ResourceScheduler::FetchItem:
00667 itemRetrieved( Item() );
00668 break;
00669 case ResourceScheduler::ChangeReplay:
00670 d->changeProcessed();
00671 break;
00672 case ResourceScheduler::SyncCollectionTree:
00673 case ResourceScheduler::SyncAll:
00674 if ( d->mCollectionSyncer )
00675 d->mCollectionSyncer->rollback();
00676 else
00677 d->scheduler->taskDone();
00678 break;
00679 case ResourceScheduler::SyncCollection:
00680 if ( d->mItemSyncer )
00681 d->mItemSyncer->rollback();
00682 else
00683 d->scheduler->taskDone();
00684 break;
00685 default:
00686 d->scheduler->taskDone();
00687 }
00688 }
00689
00690 void ResourceBase::cancelTask( const QString &msg )
00691 {
00692 cancelTask();
00693
00694 emit error( msg );
00695 }
00696
00697 void ResourceBase::deferTask()
00698 {
00699 Q_D( ResourceBase );
00700 d->scheduler->deferTask();
00701 }
00702
00703 void ResourceBase::doSetOnline( bool state )
00704 {
00705 d_func()->scheduler->setOnline( state );
00706 }
00707
00708 void ResourceBase::synchronizeCollection( qint64 collectionId )
00709 {
00710 synchronizeCollection( collectionId, false );
00711 }
00712
00713 void ResourceBase::synchronizeCollection( qint64 collectionId, bool recursive )
00714 {
00715 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), recursive ? CollectionFetchJob::Recursive : CollectionFetchJob::Base );
00716 job->setFetchScope( changeRecorder()->collectionFetchScope() );
00717 job->fetchScope().setResource( identifier() );
00718 job->setProperty( "recursive", recursive );
00719 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListDone( KJob* ) ) );
00720 }
00721
00722 void ResourceBasePrivate::slotCollectionListDone( KJob *job )
00723 {
00724 if ( !job->error() ) {
00725 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections();
00726 if ( !list.isEmpty() ) {
00727 if ( job->property( "recursive" ).toBool() ) {
00728 Q_FOREACH ( const Collection &collection, list ) {
00729 scheduler->scheduleSync( collection );
00730 }
00731 } else {
00732 scheduler->scheduleSync( list.first() );
00733 }
00734 }
00735 }
00736
00737 }
00738
00739 void ResourceBase::synchronizeCollectionAttributes( qint64 collectionId )
00740 {
00741 CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base );
00742 job->setFetchScope( changeRecorder()->collectionFetchScope() );
00743 job->fetchScope().setResource( identifier() );
00744 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListForAttributesDone( KJob* ) ) );
00745 }
00746
00747 void ResourceBasePrivate::slotCollectionListForAttributesDone( KJob *job )
00748 {
00749 if ( !job->error() ) {
00750 Collection::List list = static_cast<CollectionFetchJob*>( job )->collections();
00751 if ( !list.isEmpty() ) {
00752 Collection col = list.first();
00753 scheduler->scheduleAttributesSync( col );
00754 }
00755 }
00756
00757 }
00758
00759 void ResourceBase::setTotalItems( int amount )
00760 {
00761 kDebug() << amount;
00762 Q_D( ResourceBase );
00763 setItemStreamingEnabled( true );
00764 d->mItemSyncer->setTotalItems( amount );
00765 }
00766
00767 void ResourceBase::setItemStreamingEnabled( bool enable )
00768 {
00769 Q_D( ResourceBase );
00770 d->createItemSyncInstanceIfMissing();
00771 d->mItemSyncer->setStreamingEnabled( enable );
00772 }
00773
00774 void ResourceBase::itemsRetrieved( const Item::List &items )
00775 {
00776 Q_D( ResourceBase );
00777 d->createItemSyncInstanceIfMissing();
00778 d->mItemSyncer->setFullSyncItems( items );
00779 }
00780
00781 void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems )
00782 {
00783 Q_D( ResourceBase );
00784 d->createItemSyncInstanceIfMissing();
00785 d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems );
00786 }
00787
00788 void ResourceBasePrivate::slotItemSyncDone( KJob *job )
00789 {
00790 mItemSyncer = 0;
00791 Q_Q( ResourceBase );
00792 if ( job->error() && job->error() != Job::UserCanceled ) {
00793 emit q->error( job->errorString() );
00794 }
00795 scheduler->taskDone();
00796 }
00797
00798
00799 void ResourceBasePrivate::slotDelayedEmitProgress()
00800 {
00801 Q_Q( ResourceBase );
00802 emit q->percent( mUnemittedProgress );
00803
00804 Q_FOREACH( const QVariantMap &statusMap, mUnemittedAdvancedStatus ) {
00805 emit q->advancedStatus( statusMap );
00806 }
00807 mUnemittedProgress = 0;
00808 mUnemittedAdvancedStatus.clear();
00809 }
00810
00811 void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent )
00812 {
00813 Q_Q( ResourceBase );
00814
00815 mUnemittedProgress = percent;
00816
00817 const Collection collection = job->property( "collection" ).value<Collection>();
00818 if ( collection.isValid() ) {
00819 QVariantMap statusMap;
00820 statusMap.insert( QLatin1String( "key" ), QString::fromLatin1( "collectionSyncProgress" ) );
00821 statusMap.insert( QLatin1String( "collectionId" ), collection.id() );
00822 statusMap.insert( QLatin1String( "percent" ), static_cast<unsigned int>( percent ) );
00823
00824 mUnemittedAdvancedStatus[collection.id()] = statusMap;
00825 }
00826
00827 if ( percent == 100 ) {
00828 mProgressEmissionCompressor.stop();
00829 slotDelayedEmitProgress();
00830 } else if ( !mProgressEmissionCompressor.isActive() ) {
00831 mProgressEmissionCompressor.start();
00832 }
00833 }
00834
00835 void ResourceBase::setHierarchicalRemoteIdentifiersEnabled( bool enable )
00836 {
00837 Q_D( ResourceBase );
00838 d->mHierarchicalRid = enable;
00839 }
00840
00841 void ResourceBase::scheduleCustomTask( QObject *receiver, const char *method, const QVariant &argument, SchedulePriority priority )
00842 {
00843 Q_D( ResourceBase );
00844 d->scheduler->scheduleCustomTask( receiver, method, argument, priority );
00845 }
00846
00847 void ResourceBase::taskDone()
00848 {
00849 Q_D( ResourceBase );
00850 d->scheduler->taskDone();
00851 }
00852
00853 void ResourceBase::retrieveCollectionAttributes( const Collection &collection )
00854 {
00855 collectionAttributesRetrieved( collection );
00856 }
00857
00858 void Akonadi::ResourceBase::abortActivity()
00859 {
00860
00861 }
00862
00863 void ResourceBase::setItemTransactionMode(ItemSync::TransactionMode mode)
00864 {
00865 Q_D( ResourceBase );
00866 d->mItemTransactionMode = mode;
00867 }
00868
00869 void ResourceBase::setItemSynchronizationFetchScope(const ItemFetchScope& fetchScope)
00870 {
00871 Q_D( ResourceBase );
00872 if ( !d->mItemSyncFetchScope )
00873 d->mItemSyncFetchScope = new ItemFetchScope;
00874 *(d->mItemSyncFetchScope) = fetchScope;
00875 }
00876
00877 #include "resourcebase.moc"
00878 #include "moc_resourcebase.cpp"