23 #include "collection.h"
26 #include "itemcreatejob.h"
27 #include "itemdeletejob.h"
28 #include "itemfetchjob.h"
29 #include "itemmodifyjob.h"
30 #include "transactionsequence.h"
31 #include "itemfetchscope.h"
35 #include <QtCore/QStringList>
37 using namespace Akonadi;
42 class ItemSync::Private
45 Private( ItemSync *parent ) :
47 mTransactionMode( SingleTransaction ),
48 mCurrentTransaction( 0 ),
49 mTransactionJobs( 0 ),
53 mTotalItemsProcessed( 0 ),
55 mIncremental( false ),
56 mLocalListDone( false ),
57 mDeliveryDone( false ),
61 mFetchScope.fetchFullPayload();
62 mFetchScope.fetchAllAttributes();
65 void createLocalItem(
const Item &item );
67 void slotLocalListDone( KJob* );
68 void slotLocalDeleteDone( KJob* );
69 void slotLocalChangeDone( KJob* );
72 void deleteItems(
const Item::List &items );
73 void slotTransactionResult( KJob *job );
74 Job* subjobParent()
const;
77 Collection mSyncCollection;
78 QHash<Item::Id, Akonadi::Item> mLocalItemsById;
79 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
80 QSet<Akonadi::Item> mUnprocessedLocalItems;
82 ItemSync::TransactionMode mTransactionMode;
83 TransactionSequence *mCurrentTransaction;
87 ItemFetchScope mFetchScope;
93 Item::List mRemovedRemoteItems;
99 int mTotalItemsProcessed;
108 void ItemSync::Private::createLocalItem(
const Item & item )
114 ItemCreateJob *create =
new ItemCreateJob( item, mSyncCollection, subjobParent() );
115 q->connect( create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
118 void ItemSync::Private::checkDone()
120 q->setProcessedAmount( KJob::Bytes, mProgress );
121 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
130 ItemSync::ItemSync(
const Collection &collection, QObject *parent ) :
132 d( new Private( this ) )
134 d->mSyncCollection = collection;
137 ItemSync::~ItemSync()
142 void ItemSync::setFullSyncItems(
const Item::List &items )
144 Q_ASSERT( !d->mIncremental );
145 if ( !d->mStreaming )
146 d->mDeliveryDone =
true;
147 d->mRemoteItems += items;
148 d->mTotalItemsProcessed += items.count();
149 kDebug() <<
"Received: " << items.count() <<
"In total: " << d->mTotalItemsProcessed <<
" Wanted: " << d->mTotalItems;
150 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
151 if ( d->mTotalItemsProcessed == d->mTotalItems )
152 d->mDeliveryDone =
true;
156 void ItemSync::setTotalItems(
int amount )
158 Q_ASSERT( !d->mIncremental );
159 Q_ASSERT( amount >= 0 );
160 setStreamingEnabled(
true );
162 d->mTotalItems = amount;
163 setTotalAmount( KJob::Bytes, amount );
164 if ( d->mTotalItems == 0 ) {
165 d->mDeliveryDone =
true;
170 void ItemSync::setIncrementalSyncItems(
const Item::List &changedItems,
const Item::List &removedItems )
172 d->mIncremental =
true;
173 if ( !d->mStreaming )
174 d->mDeliveryDone =
true;
175 d->mRemoteItems += changedItems;
176 d->mRemovedRemoteItems += removedItems;
177 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
178 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
179 if ( d->mTotalItemsProcessed == d->mTotalItems )
180 d->mDeliveryDone =
true;
184 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
186 d->mFetchScope = fetchScope;
189 ItemFetchScope &ItemSync::fetchScope()
191 return d->mFetchScope;
194 void ItemSync::doStart()
196 ItemFetchJob* job =
new ItemFetchJob( d->mSyncCollection,
this );
197 job->setFetchScope( d->mFetchScope );
200 job->fetchScope().setCacheOnly(
true );
202 connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) );
205 bool ItemSync::updateItem(
const Item &storedItem, Item &newItem )
216 if ( d->mIncremental )
219 if ( newItem.d_func()->mClearPayload )
223 if ( storedItem.remoteRevision() != newItem.remoteRevision() )
227 if ( storedItem.flags() != newItem.flags() ) {
228 kDebug() <<
"Stored flags " << storedItem.flags()
229 <<
"new flags " << newItem.flags();
234 QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
235 missingParts.subtract( storedItem.loadedPayloadParts() );
236 if ( !missingParts.isEmpty() )
242 if ( newItem.hasPayload()
243 && storedItem.payloadData() != newItem.payloadData() )
247 foreach (
Attribute* attr, newItem.attributes() ) {
248 if ( !storedItem.hasAttribute( attr->
type() ) )
250 if ( attr->
serialized() != storedItem.attribute( attr->
type() )->serialized() )
257 void ItemSync::Private::slotLocalListDone( KJob * job )
259 if ( !job->error() ) {
260 const Item::List list =
static_cast<ItemFetchJob*
>( job )->items();
261 foreach (
const Item &item, list ) {
262 if ( item.remoteId().isEmpty() )
264 mLocalItemsById.insert( item.id(), item );
265 mLocalItemsByRemoteId.insert( item.remoteId(), item );
266 mUnprocessedLocalItems.insert( item );
270 mLocalListDone =
true;
274 void ItemSync::Private::execute()
276 if ( !mLocalListDone )
281 if ( !mDeliveryDone && mRemoteItems.isEmpty() )
284 if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) {
286 mCurrentTransaction =
new TransactionSequence( q );
287 mCurrentTransaction->setAutomaticCommittingEnabled(
false );
288 connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
292 if ( !mDeliveryDone ) {
293 if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) {
294 mCurrentTransaction->commit();
295 mCurrentTransaction = 0;
301 if ( !mIncremental ) {
302 mRemovedRemoteItems = mUnprocessedLocalItems.toList();
303 mUnprocessedLocalItems.clear();
306 deleteItems( mRemovedRemoteItems );
307 mLocalItemsById.clear();
308 mLocalItemsByRemoteId.clear();
309 mRemovedRemoteItems.clear();
311 if ( mCurrentTransaction ) {
312 mCurrentTransaction->commit();
313 mCurrentTransaction = 0;
319 void ItemSync::Private::processItems()
322 foreach ( Item remoteItem, mRemoteItems ) {
324 if ( remoteItem.remoteId().isEmpty() ) {
325 kWarning() <<
"Item " << remoteItem.id() <<
" does not have a remote identifier";
329 Item localItem = mLocalItemsById.value( remoteItem.id() );
330 if ( !localItem.isValid() )
331 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
332 mUnprocessedLocalItems.remove( localItem );
334 if ( !localItem.isValid() ) {
335 createLocalItem( remoteItem );
339 if ( q->updateItem( localItem, remoteItem ) ) {
342 remoteItem.setId( localItem.id() );
343 remoteItem.setRevision( localItem.revision() );
344 remoteItem.setSize( localItem.size() );
345 remoteItem.setRemoteId( localItem.remoteId() );
346 ItemModifyJob *mod =
new ItemModifyJob( remoteItem, subjobParent() );
347 mod->disableRevisionCheck();
348 q->connect( mod, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
353 mRemoteItems.clear();
356 void ItemSync::Private::deleteItems(
const Item::List &items )
362 Item::List itemsToDelete;
363 foreach (
const Item &item, items ) {
364 Item delItem( item );
365 if ( !item.isValid() ) {
366 delItem = mLocalItemsByRemoteId.value( item.remoteId() );
369 if ( !delItem.isValid() ) {
371 kWarning() <<
"Delete item (remoteeId=" << item.remoteId()
372 <<
"mimeType=" << item.mimeType()
373 <<
") does not have a valid UID and no item with that remote ID exists either";
378 if ( delItem.remoteId().isEmpty() ) {
383 itemsToDelete.append ( delItem );
386 if ( !itemsToDelete.isEmpty() ) {
388 ItemDeleteJob *job =
new ItemDeleteJob( itemsToDelete, subjobParent() );
389 q->connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)) );
395 TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
397 transaction->setIgnoreJobFailure( job );
401 void ItemSync::Private::slotLocalDeleteDone( KJob* )
409 void ItemSync::Private::slotLocalChangeDone( KJob * job )
418 void ItemSync::Private::slotTransactionResult( KJob *job )
421 if ( mCurrentTransaction == job )
422 mCurrentTransaction = 0;
427 Job * ItemSync::Private::subjobParent()
const
429 if ( mCurrentTransaction && mTransactionMode != NoTransaction )
430 return mCurrentTransaction;
434 void ItemSync::setStreamingEnabled(
bool enable)
436 d->mStreaming = enable;
439 void ItemSync::deliveryDone()
441 Q_ASSERT( d->mStreaming );
442 d->mDeliveryDone =
true;
446 void ItemSync::slotResult(KJob* job)
448 if ( job->error() ) {
453 setError( job->error() );
454 setErrorText( job->errorText() );
457 Akonadi::Job::slotResult( job );
461 void ItemSync::rollback()
463 setError( UserCanceled );
464 if ( d->mCurrentTransaction )
465 d->mCurrentTransaction->rollback();
466 d->mDeliveryDone =
true;
470 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
472 d->mTransactionMode = mode;
476 #include "itemsync.moc"