• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.13.3 API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • akonadi
itemsync.cpp
1 /*
2  Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
3  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
4 
5  This library is free software; you can redistribute it and/or modify it
6  under the terms of the GNU Library General Public License as published by
7  the Free Software Foundation; either version 2 of the License, or (at your
8  option) any later version.
9 
10  This library is distributed in the hope that it will be useful, but WITHOUT
11  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
13  License for more details.
14 
15  You should have received a copy of the GNU Library General Public License
16  along with this library; see the file COPYING.LIB. If not, write to the
17  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18  02110-1301, USA.
19 */
20 
21 #include "itemsync.h"
22 
23 #include "job_p.h"
24 #include "collection.h"
25 #include "item.h"
26 #include "item_p.h"
27 #include "itemcreatejob.h"
28 #include "itemdeletejob.h"
29 #include "itemfetchjob.h"
30 #include "itemmodifyjob.h"
31 #include "transactionsequence.h"
32 #include "itemfetchscope.h"
33 
34 #include <kdebug.h>
35 
36 #include <QtCore/QStringList>
37 
38 using namespace Akonadi;
39 
43 class Akonadi::ItemSyncPrivate : public JobPrivate
44 {
45  public:
46  ItemSyncPrivate( ItemSync *parent ) :
47  JobPrivate( parent ),
48  mTransactionMode( ItemSync::SingleTransaction ),
49  mCurrentTransaction( 0 ),
50  mTransactionJobs( 0 ),
51  mPendingJobs( 0 ),
52  mProgress( 0 ),
53  mTotalItems( -1 ),
54  mTotalItemsProcessed( 0 ),
55  mStreaming( false ),
56  mIncremental( false ),
57  mLocalListDone( false ),
58  mDeliveryDone( false ),
59  mFinished( false )
60  {
61  // we want to fetch all data by default
62  mFetchScope.fetchFullPayload();
63  mFetchScope.fetchAllAttributes();
64  }
65 
66  void createLocalItem( const Item &item );
67  void checkDone();
68  void slotLocalListDone( KJob* );
69  void slotLocalDeleteDone( KJob* );
70  void slotLocalChangeDone( KJob* );
71  void execute();
72  void processItems();
73  void deleteItems( const Item::List &items );
74  void slotTransactionResult( KJob *job );
75  Job* subjobParent() const;
76  QString jobDebuggingString() const /*Q_DECL_OVERRIDE*/;
77 
78  Q_DECLARE_PUBLIC( ItemSync )
79  Collection mSyncCollection;
80  QHash<Item::Id, Akonadi::Item> mLocalItemsById;
81  QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
82  QSet<Akonadi::Item> mUnprocessedLocalItems;
83 
84  ItemSync::TransactionMode mTransactionMode;
85  TransactionSequence *mCurrentTransaction;
86  int mTransactionJobs;
87 
88  // fetch scope for initial item listing
89  ItemFetchScope mFetchScope;
90 
91  // remote items
92  Akonadi::Item::List mRemoteItems;
93 
94  // removed remote items
95  Item::List mRemovedRemoteItems;
96 
97  // create counter
98  int mPendingJobs;
99  int mProgress;
100  int mTotalItems;
101  int mTotalItemsProcessed;
102 
103  bool mStreaming;
104  bool mIncremental;
105  bool mLocalListDone;
106  bool mDeliveryDone;
107  bool mFinished;
108 };
109 
110 void ItemSyncPrivate::createLocalItem( const Item & item )
111 {
112  Q_Q( ItemSync );
113  // don't try to do anything in error state
114  if ( q->error() )
115  return;
116  mPendingJobs++;
117  ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
118  q->connect( create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
119 }
120 
121 void ItemSyncPrivate::checkDone()
122 {
123  Q_Q( ItemSync );
124  q->setProcessedAmount( KJob::Bytes, mProgress );
125  if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
126  return;
127 
128  if ( !mFinished ) { // prevent double result emission, can happen since checkDone() is called from all over the place
129  mFinished = true;
130  q->emitResult();
131  }
132 }
133 
134 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
135  Job( new ItemSyncPrivate( this ), parent )
136 {
137  Q_D( ItemSync );
138  d->mSyncCollection = collection;
139 }
140 
141 ItemSync::~ItemSync()
142 {
143 }
144 
145 void ItemSync::setFullSyncItems( const Item::List &items )
146 {
147  Q_D( ItemSync );
148  Q_ASSERT( !d->mIncremental );
149  if ( !d->mStreaming )
150  d->mDeliveryDone = true;
151  d->mRemoteItems += items;
152  d->mTotalItemsProcessed += items.count();
153  kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
154  setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
155  if ( d->mTotalItemsProcessed == d->mTotalItems )
156  d->mDeliveryDone = true;
157  d->execute();
158 }
159 
160 void ItemSync::setTotalItems( int amount )
161 {
162  Q_D( ItemSync );
163  Q_ASSERT( !d->mIncremental );
164  Q_ASSERT( amount >= 0 );
165  setStreamingEnabled( true );
166  kDebug() << amount;
167  d->mTotalItems = amount;
168  setTotalAmount( KJob::Bytes, amount );
169  if ( d->mTotalItems == 0 ) {
170  d->mDeliveryDone = true;
171  d->execute();
172  }
173 }
174 
175 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
176 {
177  Q_D( ItemSync );
178  d->mIncremental = true;
179  if ( !d->mStreaming )
180  d->mDeliveryDone = true;
181  d->mRemoteItems += changedItems;
182  d->mRemovedRemoteItems += removedItems;
183  d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
184  setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
185  if ( d->mTotalItemsProcessed == d->mTotalItems )
186  d->mDeliveryDone = true;
187  d->execute();
188 }
189 
190 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
191 {
192  Q_D( ItemSync );
193  d->mFetchScope = fetchScope;
194 }
195 
196 ItemFetchScope &ItemSync::fetchScope()
197 {
198  Q_D( ItemSync );
199  return d->mFetchScope;
200 }
201 
202 void ItemSync::doStart()
203 {
204  Q_D( ItemSync );
205  ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
206  job->setFetchScope( d->mFetchScope );
207 
208  // we only can fetch parts already in the cache, otherwise this will deadlock
209  job->fetchScope().setCacheOnly( true );
210 
211  connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) );
212 }
213 
214 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
215 {
216  Q_D( ItemSync );
217  // we are in error state, better not change anything at all anymore
218  if ( error() )
219  return false;
220 
221  /*
222  * We know that this item has changed (as it is part of the
223  * incremental changed list), so we just put it into the
224  * storage.
225  */
226  if ( d->mIncremental )
227  return true;
228 
229  if ( newItem.d_func()->mClearPayload )
230  return true;
231 
232  // Check whether the remote revisions differ
233  if ( storedItem.remoteRevision() != newItem.remoteRevision() )
234  return true;
235 
236  // Check whether the flags differ
237  if ( storedItem.flags() != newItem.flags() ) {
238  kDebug() << "Stored flags " << storedItem.flags()
239  << "new flags " << newItem.flags();
240  return true;
241  }
242 
243  // Check whether the new item contains unknown parts
244  QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
245  missingParts.subtract( storedItem.loadedPayloadParts() );
246  if ( !missingParts.isEmpty() )
247  return true;
248 
249  // ### FIXME SLOW!!!
250  // If the available part identifiers don't differ, check
251  // whether the content of the payload differs
252  if ( newItem.hasPayload()
253  && storedItem.payloadData() != newItem.payloadData() )
254  return true;
255 
256  // check if remote attributes have been changed
257  foreach ( Attribute* attr, newItem.attributes() ) {
258  if ( !storedItem.hasAttribute( attr->type() ) )
259  return true;
260  if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
261  return true;
262  }
263 
264  return false;
265 }
266 
267 void ItemSyncPrivate::slotLocalListDone( KJob * job )
268 {
269  if ( !job->error() ) {
270  const Item::List list = static_cast<ItemFetchJob*>( job )->items();
271  foreach ( const Item &item, list ) {
272  if ( item.remoteId().isEmpty() )
273  continue;
274  mLocalItemsById.insert( item.id(), item );
275  mLocalItemsByRemoteId.insert( item.remoteId(), item );
276  mUnprocessedLocalItems.insert( item );
277  }
278  }
279 
280  mLocalListDone = true;
281  execute();
282 }
283 
284 QString ItemSyncPrivate::jobDebuggingString() const /*Q_DECL_OVERRIDE*/
285 {
286  // TODO: also print out mIncremental and mTotalItemsProcessed, but they are set after the job
287  // started, so this requires passing jobDebuggingString to jobEnded().
288  return QString::fromLatin1("Collection %1 (%2)").arg( mSyncCollection.id() ).arg( mSyncCollection.name() );
289 }
290 
291 void ItemSyncPrivate::execute()
292 {
293  Q_Q( ItemSync );
294  if ( !mLocalListDone )
295  return;
296 
297  // early exit to avoid unnecessary TransactionSequence creation in MultipleTransactions mode
298  // TODO: do the transaction handling in a nicer way instead, only creating TransactionSequences when really needed
299  if ( !mDeliveryDone && mRemoteItems.isEmpty() )
300  return;
301 
302  if ( (mTransactionMode == ItemSync::SingleTransaction && !mCurrentTransaction) || mTransactionMode == ItemSync::MultipleTransactions) {
303  ++mTransactionJobs;
304  mCurrentTransaction = new TransactionSequence( q );
305  mCurrentTransaction->setAutomaticCommittingEnabled( false );
306  QObject::connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
307  }
308 
309  processItems();
310  if ( !mDeliveryDone ) {
311  if ( mTransactionMode == ItemSync::MultipleTransactions && mCurrentTransaction ) {
312  mCurrentTransaction->commit();
313  mCurrentTransaction = 0;
314  }
315  return;
316  }
317 
318  // removed
319  if ( !mIncremental ) {
320  mRemovedRemoteItems = mUnprocessedLocalItems.toList();
321  mUnprocessedLocalItems.clear();
322  }
323 
324  deleteItems( mRemovedRemoteItems );
325  mLocalItemsById.clear();
326  mLocalItemsByRemoteId.clear();
327  mRemovedRemoteItems.clear();
328 
329  if ( mCurrentTransaction ) {
330  mCurrentTransaction->commit();
331  mCurrentTransaction = 0;
332  }
333 
334  checkDone();
335 }
336 
337 void ItemSyncPrivate::processItems()
338 {
339  Q_Q( ItemSync );
340  // added / updated
341  foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here
342 #ifndef NDEBUG
343  if ( remoteItem.remoteId().isEmpty() ) {
344  kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
345  }
346 #endif
347 
348  Item localItem = mLocalItemsById.value( remoteItem.id() );
349  if ( !localItem.isValid() )
350  localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
351  mUnprocessedLocalItems.remove( localItem );
352  // missing locally
353  if ( !localItem.isValid() ) {
354  createLocalItem( remoteItem );
355  continue;
356  }
357 
358  if ( q->updateItem( localItem, remoteItem ) ) {
359  mPendingJobs++;
360 
361  remoteItem.setId( localItem.id() );
362  remoteItem.setRevision( localItem.revision() );
363  remoteItem.setSize( localItem.size() );
364  remoteItem.setRemoteId( localItem.remoteId() ); // in case someone clears remoteId by accident
365  ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
366  mod->disableRevisionCheck();
367  q->connect( mod, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
368  } else {
369  mProgress++;
370  }
371  }
372  mRemoteItems.clear();
373 }
374 
375 void ItemSyncPrivate::deleteItems( const Item::List &items )
376 {
377  Q_Q( ItemSync );
378  // if in error state, better not change anything anymore
379  if ( q->error() )
380  return;
381 
382  Item::List itemsToDelete;
383  foreach ( const Item &item, items ) {
384  Item delItem( item );
385  if ( !item.isValid() ) {
386  delItem = mLocalItemsByRemoteId.value( item.remoteId() );
387  }
388 
389  if ( !delItem.isValid() ) {
390 #ifndef NDEBUG
391  kWarning() << "Delete item (remoteeId=" << item.remoteId()
392  << "mimeType=" << item.mimeType()
393  << ") does not have a valid UID and no item with that remote ID exists either";
394 #endif
395  continue;
396  }
397 
398  if ( delItem.remoteId().isEmpty() ) {
399  // don't attempt to remove items that never were written to the backend
400  continue;
401  }
402 
403  itemsToDelete.append ( delItem );
404  }
405 
406  if ( !itemsToDelete.isEmpty() ) {
407  mPendingJobs++;
408  ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
409  q->connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)) );
410 
411  // It can happen that the groupware servers report us deleted items
412  // twice, in this case this item delete job will fail on the second try.
413  // To avoid a rollback of the complete transaction we gracefully allow the job
414  // to fail :)
415  TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
416  if ( transaction )
417  transaction->setIgnoreJobFailure( job );
418  }
419 }
420 
421 void ItemSyncPrivate::slotLocalDeleteDone( KJob* )
422 {
423  mPendingJobs--;
424  mProgress++;
425 
426  checkDone();
427 }
428 
429 void ItemSyncPrivate::slotLocalChangeDone( KJob * job )
430 {
431  Q_UNUSED( job );
432  mPendingJobs--;
433  mProgress++;
434 
435  checkDone();
436 }
437 
438 void ItemSyncPrivate::slotTransactionResult( KJob *job )
439 {
440  --mTransactionJobs;
441  if ( mCurrentTransaction == job )
442  mCurrentTransaction = 0;
443 
444  checkDone();
445 }
446 
447 Job * ItemSyncPrivate::subjobParent() const
448 {
449  Q_Q( const ItemSync );
450  if ( mCurrentTransaction && mTransactionMode != ItemSync::NoTransaction )
451  return mCurrentTransaction;
452  return const_cast<ItemSync *>( q );
453 }
454 
455 void ItemSync::setStreamingEnabled(bool enable)
456 {
457  Q_D( ItemSync );
458  d->mStreaming = enable;
459 }
460 
461 void ItemSync::deliveryDone()
462 {
463  Q_D( ItemSync );
464  Q_ASSERT( d->mStreaming );
465  d->mDeliveryDone = true;
466  d->execute();
467 }
468 
469 void ItemSync::slotResult(KJob* job)
470 {
471  if ( job->error() ) {
472  // pretent there were no errors
473  Akonadi::Job::removeSubjob( job );
474  // propagate the first error we got but continue, we might still be fed with stuff from a resource
475  if ( !error() ) {
476  setError( job->error() );
477  setErrorText( job->errorText() );
478  }
479  } else {
480  Akonadi::Job::slotResult( job );
481  }
482 }
483 
484 void ItemSync::rollback()
485 {
486  Q_D( ItemSync );
487  setError( UserCanceled );
488  if ( d->mCurrentTransaction )
489  d->mCurrentTransaction->rollback();
490  d->mDeliveryDone = true; // user wont deliver more data
491  d->execute(); // end this in an ordered way, since we have an error set no real change will be done
492 }
493 
494 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
495 {
496  Q_D( ItemSync );
497  d->mTransactionMode = mode;
498 }
499 
500 #include "moc_itemsync.cpp"
Akonadi::ItemSync::doStart
void doStart()
This method must be reimplemented in the concrete jobs.
Definition: itemsync.cpp:202
Akonadi::ItemModifyJob::disableRevisionCheck
void disableRevisionCheck()
Disables the check of the revision number.
Definition: itemmodifyjob.cpp:398
Akonadi::ItemSync::updateItem
virtual bool updateItem(const Item &storedItem, Item &newItem)
Reimplement this method to customize the synchronization algorithm.
Definition: itemsync.cpp:214
Akonadi::ItemSync::setFullSyncItems
void setFullSyncItems(const Item::List &items)
Sets the full item list for the collection.
Definition: itemsync.cpp:145
Akonadi::ItemSync::NoTransaction
Use no transaction at all, provides highest responsiveness (might therefore feel faster even when act...
Definition: itemsync.h:162
Akonadi::ItemSync::setIncrementalSyncItems
void setIncrementalSyncItems(const Item::List &changedItems, const Item::List &removedItems)
Sets the item lists for incrementally syncing the collection.
Definition: itemsync.cpp:175
Akonadi::Collection
Represents a collection of PIM items.
Definition: collection.h:75
Akonadi::ItemSync::SingleTransaction
Use a single transaction for the entire sync process (default), provides maximum consistency ("all or...
Definition: itemsync.h:160
Akonadi::Job
Base class for all actions in the Akonadi storage.
Definition: job.h:86
Akonadi::ItemSync::fetchScope
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Definition: itemsync.cpp:196
Akonadi::Attribute
Provides interface for custom attributes for Entity.
Definition: attribute.h:138
Akonadi::TransactionSequence::setAutomaticCommittingEnabled
void setAutomaticCommittingEnabled(bool enable)
Disable automatic committing.
Definition: transactionsequence.cpp:197
Akonadi::ItemSync::~ItemSync
~ItemSync()
Destroys the item synchronizer.
Definition: itemsync.cpp:141
Akonadi::ItemSync
Syncs between items known to a client (usually a resource) and the Akonadi storage.
Definition: itemsync.h:54
Akonadi::ItemDeleteJob
Job that deletes items from the Akonadi storage.
Definition: itemdeletejob.h:62
Akonadi::ItemFetchJob::fetchScope
ItemFetchScope & fetchScope()
Returns the item fetch scope.
Definition: itemfetchjob.cpp:273
Akonadi::ItemSync::setFetchScope
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
Definition: itemsync.cpp:190
Akonadi::ItemFetchJob::setFetchScope
void setFetchScope(ItemFetchScope &fetchScope)
Sets the item fetch scope.
Definition: itemfetchjob.cpp:259
Akonadi::TransactionSequence::setIgnoreJobFailure
void setIgnoreJobFailure(KJob *job)
Sets which job of the sequence might fail without rolling back the complete transaction.
Definition: transactionsequence.cpp:175
Akonadi::ItemSync::deliveryDone
void deliveryDone()
Notify ItemSync that all remote items have been delivered.
Definition: itemsync.cpp:461
Akonadi::ItemSync::ItemSync
ItemSync(const Collection &collection, QObject *parent=0)
Creates a new item synchronizer.
Definition: itemsync.cpp:134
Akonadi::TransactionSequence::commit
void commit()
Commits the transaction as soon as all pending sub-jobs finished successfully.
Definition: transactionsequence.cpp:144
Akonadi::ItemCreateJob
Job that creates a new item in the Akonadi storage.
Definition: itemcreatejob.h:73
Akonadi::ItemFetchScope
Specifies which parts of an item should be fetched from the Akonadi storage.
Definition: itemfetchscope.h:68
Akonadi::ItemSync::MultipleTransactions
Use one transaction per chunk of delivered items, good compromise between the other two when using st...
Definition: itemsync.h:161
Akonadi::ItemSync::setTotalItems
void setTotalItems(int amount)
Set the amount of items which you are going to return in total by using the setFullSyncItems() method...
Definition: itemsync.cpp:160
Akonadi::ItemSync::setStreamingEnabled
void setStreamingEnabled(bool enable)
Enable item streaming.
Definition: itemsync.cpp:455
Akonadi::ItemSync::TransactionMode
TransactionMode
Transaction mode used by ItemSync.
Definition: itemsync.h:159
Akonadi::TransactionSequence
Base class for jobs that need to run a sequence of sub-jobs in a transaction.
Definition: transactionsequence.h:69
Akonadi::ItemSync::rollback
void rollback()
Aborts the sync process and rolls back all not yet committed transactions.
Definition: itemsync.cpp:484
Akonadi::ItemModifyJob
Job that modifies an existing item in the Akonadi storage.
Definition: itemmodifyjob.h:97
Akonadi::ItemFetchJob
Job that fetches items from the Akonadi storage.
Definition: itemfetchjob.h:82
Akonadi::JobPrivate
Definition: job_p.h:31
Akonadi::Job::removeSubjob
virtual bool removeSubjob(KJob *job)
Removes the given subjob of this job.
Definition: job.cpp:332
Akonadi::Attribute::serialized
virtual QByteArray serialized() const =0
Returns a QByteArray representation of the attribute which will be storaged.
Akonadi::Job::UserCanceled
The user canceld this job.
Definition: job.h:108
Akonadi::ItemSync::setTransactionMode
void setTransactionMode(TransactionMode mode)
Set the transaction mode to use for this sync.
Definition: itemsync.cpp:494
Akonadi::Attribute::type
virtual QByteArray type() const =0
Returns the type of the attribute.
Akonadi::ItemFetchScope::setCacheOnly
void setCacheOnly(bool cacheOnly)
Sets whether payload data should be requested from remote sources or just from the local cache...
Definition: itemfetchscope.cpp:106
This file is part of the KDE documentation.
Documentation copyright © 1996-2014 The KDE developers.
Generated on Mon Jul 21 2014 08:03:54 by doxygen 1.8.6 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.13.3 API Reference

Skip menu "kdepimlibs-4.13.3 API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal