• Skip to content
  • Skip to link menu
KDE 4.6 API Reference
  • KDE API Reference
  • KDE-PIM Libraries
  • KDE Home
  • Contact Us
 

akonadi

itemsync.cpp

00001 /*
00002     Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
00003     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00004 
00005     This library is free software; you can redistribute it and/or modify it
00006     under the terms of the GNU Library General Public License as published by
00007     the Free Software Foundation; either version 2 of the License, or (at your
00008     option) any later version.
00009 
00010     This library is distributed in the hope that it will be useful, but WITHOUT
00011     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00013     License for more details.
00014 
00015     You should have received a copy of the GNU Library General Public License
00016     along with this library; see the file COPYING.LIB.  If not, write to the
00017     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00018     02110-1301, USA.
00019 */
00020 
00021 #include "itemsync.h"
00022 
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031 
00032 #include <kdebug.h>
00033 
00034 #include <QtCore/QStringList>
00035 
00036 using namespace Akonadi;
00037 
00041 class ItemSync::Private
00042 {
00043   public:
00044     Private( ItemSync *parent ) :
00045       q( parent ),
00046       mTransactionMode( SingleTransaction ),
00047       mCurrentTransaction( 0 ),
00048       mTransactionJobs( 0 ),
00049       mPendingJobs( 0 ),
00050       mProgress( 0 ),
00051       mTotalItems( -1 ),
00052       mTotalItemsProcessed( 0 ),
00053       mStreaming( false ),
00054       mIncremental( false ),
00055       mLocalListDone( false ),
00056       mDeliveryDone( false ),
00057       mFinished( false )
00058     {
00059       // we want to fetch all data by default
00060       mFetchScope.fetchFullPayload();
00061       mFetchScope.fetchAllAttributes();
00062     }
00063 
00064     void createLocalItem( const Item &item );
00065     void checkDone();
00066     void slotLocalListDone( KJob* );
00067     void slotLocalDeleteDone( KJob* );
00068     void slotLocalChangeDone( KJob* );
00069     void execute();
00070     void processItems();
00071     void deleteItems( const Item::List &items );
00072     void slotTransactionResult( KJob *job );
00073     Job* subjobParent() const;
00074 
00075     ItemSync *q;
00076     Collection mSyncCollection;
00077     QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00078     QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00079     QSet<Akonadi::Item> mUnprocessedLocalItems;
00080 
00081     ItemSync::TransactionMode mTransactionMode;
00082     TransactionSequence *mCurrentTransaction;
00083     int mTransactionJobs;
00084 
00085     // fetch scope for initial item listing
00086     ItemFetchScope mFetchScope;
00087 
00088     // remote items
00089     Akonadi::Item::List mRemoteItems;
00090 
00091     // removed remote items
00092     Item::List mRemovedRemoteItems;
00093 
00094     // create counter
00095     int mPendingJobs;
00096     int mProgress;
00097     int mTotalItems;
00098     int mTotalItemsProcessed;
00099 
00100     bool mStreaming;
00101     bool mIncremental;
00102     bool mLocalListDone;
00103     bool mDeliveryDone;
00104     bool mFinished;
00105 };
00106 
00107 void ItemSync::Private::createLocalItem( const Item & item )
00108 {
00109   // don't try to do anything in error state
00110   if ( q->error() )
00111     return;
00112   mPendingJobs++;
00113   ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00114   q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00115 }
00116 
00117 void ItemSync::Private::checkDone()
00118 {
00119   q->setProcessedAmount( KJob::Bytes, mProgress );
00120   if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00121     return;
00122 
00123   if ( !mFinished ) { // prevent double result emission, can happen since checkDone() is called from all over the place
00124     mFinished = true;
00125     q->emitResult();
00126   }
00127 }
00128 
00129 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00130     Job( parent ),
00131     d( new Private( this ) )
00132 {
00133   d->mSyncCollection = collection;
00134 }
00135 
00136 ItemSync::~ItemSync()
00137 {
00138   delete d;
00139 }
00140 
00141 void ItemSync::setFullSyncItems( const Item::List &items )
00142 {
00143   Q_ASSERT( !d->mIncremental );
00144   if ( !d->mStreaming )
00145     d->mDeliveryDone = true;
00146   d->mRemoteItems += items;
00147   d->mTotalItemsProcessed += items.count();
00148   kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00149   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00150   if ( d->mTotalItemsProcessed == d->mTotalItems )
00151     d->mDeliveryDone = true;
00152   d->execute();
00153 }
00154 
00155 void ItemSync::setTotalItems( int amount )
00156 {
00157   Q_ASSERT( !d->mIncremental );
00158   Q_ASSERT( amount >= 0 );
00159   setStreamingEnabled( true );
00160   kDebug() << amount;
00161   d->mTotalItems = amount;
00162   setTotalAmount( KJob::Bytes, amount );
00163   if ( d->mTotalItems == 0 ) {
00164     d->mDeliveryDone = true;
00165     d->execute();
00166   }
00167 }
00168 
00169 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00170 {
00171   d->mIncremental = true;
00172   if ( !d->mStreaming )
00173     d->mDeliveryDone = true;
00174   d->mRemoteItems += changedItems;
00175   d->mRemovedRemoteItems += removedItems;
00176   d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00177   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00178   if ( d->mTotalItemsProcessed == d->mTotalItems )
00179     d->mDeliveryDone = true;
00180   d->execute();
00181 }
00182 
00183 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00184 {
00185   d->mFetchScope = fetchScope;
00186 }
00187 
00188 ItemFetchScope &ItemSync::fetchScope()
00189 {
00190   return d->mFetchScope;
00191 }
00192 
00193 void ItemSync::doStart()
00194 {
00195   ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00196   job->setFetchScope( d->mFetchScope );
00197 
00198   // we only can fetch parts already in the cache, otherwise this will deadlock
00199   job->fetchScope().setCacheOnly( true );
00200 
00201   connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00202 }
00203 
00204 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00205 {
00206   // we are in error state, better not change anything at all anymore
00207   if ( error() )
00208     return false;
00209 
00210   /*
00211    * We know that this item has changed (as it is part of the
00212    * incremental changed list), so we just put it into the
00213    * storage.
00214    */
00215   if ( d->mIncremental )
00216     return true;
00217 
00218   // Check whether the flags differ
00219   if ( storedItem.flags() != newItem.flags() ) {
00220     kDebug() << "Stored flags "  << storedItem.flags()
00221              << "new flags " << newItem.flags();
00222     return true;
00223   }
00224 
00225   // Check whether the new item contains unknown parts
00226   QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
00227   missingParts.subtract( storedItem.loadedPayloadParts() );
00228   if ( !missingParts.isEmpty() )
00229     return true;
00230 
00231   // ### FIXME SLOW!!!
00232   // If the available part identifiers don't differ, check
00233   // whether the content of the payload differs
00234   if ( newItem.hasPayload()
00235     && storedItem.payloadData() != newItem.payloadData() )
00236     return true;
00237 
00238   // check if remote attributes have been changed
00239   foreach ( Attribute* attr, newItem.attributes() ) {
00240     if ( !storedItem.hasAttribute( attr->type() ) )
00241       return true;
00242     if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00243       return true;
00244   }
00245 
00246   return false;
00247 }
00248 
00249 void ItemSync::Private::slotLocalListDone( KJob * job )
00250 {
00251   if ( !job->error() ) {
00252     const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00253     foreach ( const Item &item, list ) {
00254       if ( item.remoteId().isEmpty() )
00255         continue;
00256       mLocalItemsById.insert( item.id(), item );
00257       mLocalItemsByRemoteId.insert( item.remoteId(), item );
00258       mUnprocessedLocalItems.insert( item );
00259     }
00260   }
00261 
00262   mLocalListDone = true;
00263   execute();
00264 }
00265 
00266 void ItemSync::Private::execute()
00267 {
00268   if ( !mLocalListDone )
00269     return;
00270 
00271   // early exit to avoid unnecessary TransactionSequence creation in MultipleTransactions mode
00272   // TODO: do the transaction handling in a nicer way instead, only creating TransactionSequences when really needed
00273   if ( !mDeliveryDone && mRemoteItems.isEmpty() )
00274     return;
00275 
00276   if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) {
00277     ++mTransactionJobs;
00278     mCurrentTransaction = new TransactionSequence( q );
00279     mCurrentTransaction->setAutomaticCommittingEnabled( false );
00280     connect( mCurrentTransaction, SIGNAL( result( KJob* ) ), q, SLOT( slotTransactionResult( KJob* ) ) );
00281   }
00282 
00283   processItems();
00284   if ( !mDeliveryDone ) {
00285     if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) {
00286       mCurrentTransaction->commit();
00287       mCurrentTransaction = 0;
00288     }
00289     return;
00290   }
00291 
00292   // removed
00293   if ( !mIncremental ) {
00294     mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00295     mUnprocessedLocalItems.clear();
00296   }
00297 
00298   deleteItems( mRemovedRemoteItems );
00299   mLocalItemsById.clear();
00300   mLocalItemsByRemoteId.clear();
00301   mRemovedRemoteItems.clear();
00302 
00303   if ( mCurrentTransaction ) {
00304     mCurrentTransaction->commit();
00305     mCurrentTransaction = 0;
00306   }
00307 
00308   checkDone();
00309 }
00310 
00311 void ItemSync::Private::processItems()
00312 {
00313   // added / updated
00314   foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here
00315 #ifndef NDEBUG
00316     if ( remoteItem.remoteId().isEmpty() ) {
00317       kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00318     }
00319 #endif
00320 
00321     Item localItem = mLocalItemsById.value( remoteItem.id() );
00322     if ( !localItem.isValid() )
00323       localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00324     mUnprocessedLocalItems.remove( localItem );
00325     // missing locally
00326     if ( !localItem.isValid() ) {
00327       createLocalItem( remoteItem );
00328       continue;
00329     }
00330 
00331     if ( q->updateItem( localItem, remoteItem ) ) {
00332       mPendingJobs++;
00333 
00334       remoteItem.setId( localItem.id() );
00335       remoteItem.setRevision( localItem.revision() );
00336       remoteItem.setSize( localItem.size() );
00337       remoteItem.setRemoteId( localItem.remoteId() );  // in case someone clears remoteId by accident
00338       ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00339       mod->disableRevisionCheck();
00340       q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00341     } else {
00342       mProgress++;
00343     }
00344   }
00345   mRemoteItems.clear();
00346 }
00347 
00348 void ItemSync::Private::deleteItems( const Item::List &items )
00349 {
00350   // if in error state, better not change anything anymore
00351   if ( q->error() )
00352     return;
00353 
00354   Item::List itemsToDelete;
00355   foreach ( const Item &item, items ) {
00356     Item delItem( item );
00357     if ( !item.isValid() ) {
00358       delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00359     }
00360 
00361     if ( !delItem.isValid() ) {
00362 #ifndef NDEBUG
00363       kWarning() << "Delete item (remoteeId=" << item.remoteId()
00364                  << "mimeType=" << item.mimeType()
00365                  << ") does not have a valid UID and no item with that remote ID exists either";
00366 #endif
00367       continue;
00368     }
00369 
00370     if ( delItem.remoteId().isEmpty() ) {
00371       // don't attempt to remove items that never were written to the backend
00372       continue;
00373     }
00374 
00375     itemsToDelete.append ( delItem );
00376   }
00377 
00378   if ( !itemsToDelete.isEmpty() ) {
00379     mPendingJobs++;
00380     ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
00381     q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalDeleteDone( KJob* ) ) );
00382 
00383     // It can happen that the groupware servers report us deleted items
00384     // twice, in this case this item delete job will fail on the second try.
00385     // To avoid a rollback of the complete transaction we gracefully allow the job
00386     // to fail :)
00387     TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
00388     if ( transaction )
00389       transaction->setIgnoreJobFailure( job );
00390   }
00391 }
00392 
00393 void ItemSync::Private::slotLocalDeleteDone( KJob* )
00394 {
00395   mPendingJobs--;
00396   mProgress++;
00397 
00398   checkDone();
00399 }
00400 
00401 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00402 {
00403   Q_UNUSED( job );
00404   mPendingJobs--;
00405   mProgress++;
00406 
00407   checkDone();
00408 }
00409 
00410 void ItemSync::Private::slotTransactionResult( KJob *job )
00411 {
00412   --mTransactionJobs;
00413   if ( mCurrentTransaction == job )
00414     mCurrentTransaction = 0;
00415 
00416   checkDone();
00417 }
00418 
00419 Job * ItemSync::Private::subjobParent() const
00420 {
00421   if ( mCurrentTransaction && mTransactionMode != NoTransaction )
00422     return mCurrentTransaction;
00423   return q;
00424 }
00425 
00426 void ItemSync::setStreamingEnabled(bool enable)
00427 {
00428   d->mStreaming = enable;
00429 }
00430 
00431 void ItemSync::deliveryDone()
00432 {
00433   Q_ASSERT( d->mStreaming );
00434   d->mDeliveryDone = true;
00435   d->execute();
00436 }
00437 
00438 void ItemSync::slotResult(KJob* job)
00439 {
00440   if ( job->error() ) {
00441     // pretent there were no errors
00442     Akonadi::Job::removeSubjob( job );
00443     // propagate the first error we got but continue, we might still be fed with stuff from a resource
00444     if ( !error() ) {
00445       setError( job->error() );
00446       setErrorText( job->errorText() );
00447     }
00448   } else {
00449     Akonadi::Job::slotResult( job );
00450   }
00451 }
00452 
00453 void ItemSync::rollback()
00454 {
00455   setError( UserCanceled );
00456   if ( d->mCurrentTransaction )
00457     d->mCurrentTransaction->rollback();
00458   d->mDeliveryDone = true; // user wont deliver more data
00459   d->execute(); // end this in an ordered way, since we have an error set no real change will be done
00460 }
00461 
00462 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
00463 {
00464   d->mTransactionMode = mode;
00465 }
00466 
00467 
00468 #include "itemsync.moc"

akonadi

Skip menu "akonadi"
  • Main Page
  • Modules
  • Namespace List
  • Class Hierarchy
  • Alphabetical List
  • Class List
  • File List
  • Namespace Members
  • Class Members
  • Related Pages

KDE-PIM Libraries

Skip menu "KDE-PIM Libraries"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Generated for KDE-PIM Libraries by doxygen 1.7.3
This website is maintained by Adriaan de Groot and Allen Winter.
KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal