00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include "itemsync.h"
00022
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "itemcreatejob.h"
00026 #include "itemdeletejob.h"
00027 #include "itemfetchjob.h"
00028 #include "itemmodifyjob.h"
00029 #include "transactionsequence.h"
00030 #include "itemfetchscope.h"
00031
00032 #include <kdebug.h>
00033
00034 #include <QtCore/QStringList>
00035
00036 using namespace Akonadi;
00037
00041 class ItemSync::Private
00042 {
00043 public:
00044 Private( ItemSync *parent ) :
00045 q( parent ),
00046 mTransactionMode( Single ),
00047 mCurrentTransaction( 0 ),
00048 mTransactionJobs( 0 ),
00049 mPendingJobs( 0 ),
00050 mProgress( 0 ),
00051 mTotalItems( -1 ),
00052 mTotalItemsProcessed( 0 ),
00053 mStreaming( false ),
00054 mIncremental( false ),
00055 mLocalListDone( false ),
00056 mDeliveryDone( false )
00057 {
00058
00059 mFetchScope.fetchFullPayload();
00060 mFetchScope.fetchAllAttributes();
00061 }
00062
00063 void createLocalItem( const Item &item );
00064 void checkDone();
00065 void slotLocalListDone( KJob* );
00066 void slotLocalChangeDone( KJob* );
00067 void execute();
00068 void processItems();
00069 void deleteItems( const Item::List &items );
00070 void slotTransactionResult( KJob *job );
00071 Job* subjobParent() const;
00072
00073 ItemSync *q;
00074 Collection mSyncCollection;
00075 QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00076 QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00077 QSet<Akonadi::Item> mUnprocessedLocalItems;
00078
00079
00080 enum TransactionMode {
00081 Single,
00082 Chunkwise,
00083 None
00084 };
00085 TransactionMode mTransactionMode;
00086 TransactionSequence *mCurrentTransaction;
00087 int mTransactionJobs;
00088
00089
00090 ItemFetchScope mFetchScope;
00091
00092
00093 Akonadi::Item::List mRemoteItems;
00094
00095
00096 Item::List mRemovedRemoteItems;
00097
00098
00099 int mPendingJobs;
00100 int mProgress;
00101 int mTotalItems;
00102 int mTotalItemsProcessed;
00103
00104 bool mStreaming;
00105 bool mIncremental;
00106 bool mLocalListDone;
00107 bool mDeliveryDone;
00108 };
00109
00110 void ItemSync::Private::createLocalItem( const Item & item )
00111 {
00112 mPendingJobs++;
00113 ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00114 q->connect( create, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00115 }
00116
00117 void ItemSync::Private::checkDone()
00118 {
00119 q->setProcessedAmount( KJob::Bytes, mProgress );
00120 if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00121 return;
00122
00123 q->emitResult();
00124 }
00125
00126 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00127 Job( parent ),
00128 d( new Private( this ) )
00129 {
00130 d->mSyncCollection = collection;
00131 }
00132
00133 ItemSync::~ItemSync()
00134 {
00135 delete d;
00136 }
00137
00138 void ItemSync::setFullSyncItems( const Item::List &items )
00139 {
00140 Q_ASSERT( !d->mIncremental );
00141 if ( !d->mStreaming )
00142 d->mDeliveryDone = true;
00143 d->mRemoteItems += items;
00144 d->mTotalItemsProcessed += items.count();
00145 kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00146 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00147 if ( d->mTotalItemsProcessed == d->mTotalItems )
00148 d->mDeliveryDone = true;
00149 d->execute();
00150 }
00151
00152 void ItemSync::setTotalItems( int amount )
00153 {
00154 Q_ASSERT( !d->mIncremental );
00155 Q_ASSERT( amount >= 0 );
00156 setStreamingEnabled( true );
00157 kDebug() << amount;
00158 d->mTotalItems = amount;
00159 setTotalAmount( KJob::Bytes, amount );
00160 if ( d->mTotalItems == 0 ) {
00161 d->mDeliveryDone = true;
00162 d->execute();
00163 }
00164 }
00165
00166 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00167 {
00168 d->mIncremental = true;
00169 if ( !d->mStreaming )
00170 d->mDeliveryDone = true;
00171 d->mRemoteItems += changedItems;
00172 d->mRemovedRemoteItems += removedItems;
00173 d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00174 setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00175 if ( d->mTotalItemsProcessed == d->mTotalItems )
00176 d->mDeliveryDone = true;
00177 d->execute();
00178 }
00179
00180 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00181 {
00182 d->mFetchScope = fetchScope;
00183 }
00184
00185 ItemFetchScope &ItemSync::fetchScope()
00186 {
00187 return d->mFetchScope;
00188 }
00189
00190 void ItemSync::doStart()
00191 {
00192 ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00193 job->setFetchScope( d->mFetchScope );
00194
00195
00196 job->fetchScope().setCacheOnly( true );
00197
00198 connect( job, SIGNAL( result( KJob* ) ), SLOT( slotLocalListDone( KJob* ) ) );
00199 }
00200
00201 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00202 {
00203
00204
00205
00206
00207
00208 if ( d->mIncremental )
00209 return true;
00210
00211
00212 if ( storedItem.flags() != newItem.flags() ) {
00213 kDebug( 5250 ) << "Stored flags " << storedItem.flags()
00214 << "new flags " << newItem.flags();
00215 return true;
00216 }
00217
00218
00219 QSet<QByteArray> missingParts = storedItem.loadedPayloadParts();
00220 missingParts.subtract( newItem.loadedPayloadParts() );
00221 if ( !missingParts.isEmpty() )
00222 return true;
00223
00224
00225
00226
00227 if ( storedItem.payloadData() != newItem.payloadData() )
00228 return true;
00229
00230
00231 foreach ( Attribute* attr, newItem.attributes() ) {
00232 if ( !storedItem.hasAttribute( attr->type() ) )
00233 return true;
00234 if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00235 return true;
00236 }
00237
00238 return false;
00239 }
00240
00241 void ItemSync::Private::slotLocalListDone( KJob * job )
00242 {
00243 if ( job->error() )
00244 return;
00245
00246 const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00247 foreach ( const Item &item, list ) {
00248 mLocalItemsById.insert( item.id(), item );
00249 mLocalItemsByRemoteId.insert( item.remoteId(), item );
00250 mUnprocessedLocalItems.insert( item );
00251 }
00252
00253 mLocalListDone = true;
00254 execute();
00255 }
00256
00257 void ItemSync::Private::execute()
00258 {
00259 if ( !mLocalListDone )
00260 return;
00261
00262 if ( (mTransactionMode == Single && !mCurrentTransaction) || mTransactionMode == Chunkwise ) {
00263 ++mTransactionJobs;
00264 mCurrentTransaction = new TransactionSequence( q );
00265 connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
00266 }
00267
00268 processItems();
00269 if ( !mDeliveryDone ) {
00270 if ( mTransactionMode == Chunkwise && mCurrentTransaction ) {
00271 mCurrentTransaction->commit();
00272 mCurrentTransaction = 0;
00273 }
00274 return;
00275 }
00276
00277
00278 if ( !mIncremental ) {
00279 mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00280 mUnprocessedLocalItems.clear();
00281 }
00282
00283 deleteItems( mRemovedRemoteItems );
00284 mLocalItemsById.clear();
00285 mLocalItemsByRemoteId.clear();
00286 mRemovedRemoteItems.clear();
00287
00288 if ( mCurrentTransaction ) {
00289 mCurrentTransaction->commit();
00290 mCurrentTransaction = 0;
00291 }
00292
00293 checkDone();
00294 }
00295
00296 void ItemSync::Private::processItems()
00297 {
00298
00299 foreach ( Item remoteItem, mRemoteItems ) {
00300 #ifndef NDEBUG
00301 if ( remoteItem.remoteId().isEmpty() ) {
00302 kWarning( 5250 ) << "Item " << remoteItem.id() << " does not have a remote identifier";
00303 }
00304 #endif
00305
00306 Item localItem = mLocalItemsById.value( remoteItem.id() );
00307 if ( !localItem.isValid() )
00308 localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00309 mUnprocessedLocalItems.remove( localItem );
00310
00311 if ( !localItem.isValid() ) {
00312 createLocalItem( remoteItem );
00313 continue;
00314 }
00315
00316 if ( q->updateItem( localItem, remoteItem ) ) {
00317 mPendingJobs++;
00318
00319 remoteItem.setId( localItem.id() );
00320 remoteItem.setRevision( localItem.revision() );
00321 remoteItem.setSize( localItem.size() );
00322 remoteItem.setRemoteId( localItem.remoteId() );
00323 ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00324 q->connect( mod, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00325 } else {
00326 mProgress++;
00327 }
00328 }
00329 mRemoteItems.clear();
00330 }
00331
00332 void ItemSync::Private::deleteItems( const Item::List &items )
00333 {
00334 foreach ( const Item &item, items ) {
00335 Item delItem( item );
00336 if ( !item.isValid() ) {
00337 delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00338 }
00339
00340 if ( !delItem.isValid() ) {
00341 #ifndef NDEBUG
00342 kWarning( 5250 ) << "Delete item (remoteeId=" << delItem.remoteId()
00343 << "mimeType=" << delItem.mimeType()
00344 << ") does not have a valid UID and no item with that remote ID exists either";
00345 #endif
00346 continue;
00347 }
00348
00349 mPendingJobs++;
00350 ItemDeleteJob *job = new ItemDeleteJob( delItem, subjobParent() );
00351 q->connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalChangeDone( KJob* ) ) );
00352 }
00353 }
00354
00355 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00356 {
00357 if ( job->error() )
00358 return;
00359
00360 mPendingJobs--;
00361 mProgress++;
00362
00363 checkDone();
00364 }
00365
00366 void ItemSync::Private::slotTransactionResult( KJob *job )
00367 {
00368 if ( job->error() )
00369 return;
00370
00371 --mTransactionJobs;
00372 if ( mCurrentTransaction == job )
00373 mCurrentTransaction = 0;
00374
00375 checkDone();
00376 }
00377
00378 Job * ItemSync::Private::subjobParent() const
00379 {
00380 if ( mCurrentTransaction && mTransactionMode != None )
00381 return mCurrentTransaction;
00382 return q;
00383 }
00384
00385 void ItemSync::setStreamingEnabled(bool enable)
00386 {
00387 d->mStreaming = enable;
00388 }
00389
00390 void ItemSync::deliveryDone()
00391 {
00392 Q_ASSERT( d->mStreaming );
00393 d->mDeliveryDone = true;
00394 d->execute();
00395 }
00396
00397 #include "itemsync.moc"