00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "job.h"
00023 #include "job_p.h"
00024 #include "dbusconnectionpool.h"
00025 #include <QTime>
00026 #include "imapparser_p.h"
00027 #include "session.h"
00028 #include "session_p.h"
00029
00030 #include <kdebug.h>
00031 #include <klocale.h>
00032
00033 #include <QtCore/QEventLoop>
00034 #include <QtCore/QTimer>
00035 #include <QtCore/QTextStream>
00036 #include <QtNetwork/QHostAddress>
00037 #include <QtNetwork/QTcpSocket>
00038 #include <QtDBus/QDBusInterface>
00039 #include <QtDBus/QDBusConnectionInterface>
00040
00041 using namespace Akonadi;
00042
00043 static QDBusAbstractInterface *s_jobtracker = 0;
00044
00045
00046 void JobPrivate::handleResponse( const QByteArray & tag, const QByteArray & data )
00047 {
00048 Q_Q( Job );
00049
00050 if ( mCurrentSubJob ) {
00051 mCurrentSubJob->d_ptr->handleResponse( tag, data );
00052 return;
00053 }
00054
00055 if ( tag == mTag ) {
00056 if ( data.startsWith( "NO " ) || data.startsWith( "BAD " ) ) {
00057 QString msg = QString::fromUtf8( data );
00058
00059 msg.remove( 0, msg.startsWith( QLatin1String( "NO " ) ) ? 3 : 4 );
00060
00061 if ( msg.endsWith( QLatin1String( "\r\n" ) ) )
00062 msg.chop( 2 );
00063
00064 q->setError( Job::Unknown );
00065 q->setErrorText( msg );
00066 q->emitResult();
00067 return;
00068 } else if ( data.startsWith( "OK" ) ) {
00069
00070
00071
00072
00073
00074
00075 QTimer::singleShot( 0, q, SLOT( delayedEmitResult() ) );
00076 return;
00077 }
00078 }
00079
00080 q->doHandleResponse( tag, data );
00081 }
00082
00083 void JobPrivate::init( QObject *parent )
00084 {
00085 Q_Q( Job );
00086
00087 mParentJob = dynamic_cast<Job*>( parent );
00088 mSession = dynamic_cast<Session*>( parent );
00089
00090 if ( !mSession ) {
00091 if ( !mParentJob )
00092 mSession = Session::defaultSession();
00093 else
00094 mSession = mParentJob->d_ptr->mSession;
00095 }
00096
00097 if ( !mParentJob )
00098 mSession->d->addJob( q );
00099 else
00100 mParentJob->addSubjob( q );
00101
00102
00103 if ( !s_jobtracker ) {
00104
00105
00106 static QTime s_lastTime;
00107 if ( s_lastTime.isNull() || s_lastTime.elapsed() > 3000 ) {
00108 if ( s_lastTime.isNull() )
00109 s_lastTime.start();
00110 if ( DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String( "org.kde.akonadiconsole" ) ) ) {
00111 s_jobtracker = new QDBusInterface( QLatin1String( "org.kde.akonadiconsole" ),
00112 QLatin1String( "/jobtracker" ),
00113 QLatin1String( "org.freedesktop.Akonadi.JobTracker" ),
00114 DBusConnectionPool::threadConnection(), 0 );
00115 } else {
00116 s_lastTime.restart();
00117 }
00118 }
00119
00120
00121 }
00122 QMetaObject::invokeMethod( q, "signalCreationToJobTracker", Qt::QueuedConnection );
00123 }
00124
00125 void JobPrivate::signalCreationToJobTracker()
00126 {
00127 Q_Q( Job );
00128 if ( s_jobtracker ) {
00129
00130
00131
00132 QList<QVariant> argumentList;
00133 argumentList << QLatin1String( mSession->sessionId() )
00134 << QString::number(reinterpret_cast<unsigned long>( q ), 16)
00135 << ( mParentJob ? QString::number( reinterpret_cast<unsigned long>( mParentJob ), 16) : QString() )
00136 << QString::fromLatin1( q->metaObject()->className() );
00137 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String( "jobCreated" ), argumentList);
00138 }
00139 }
00140
00141 void JobPrivate::delayedEmitResult()
00142 {
00143 Q_Q( Job );
00144 q->emitResult();
00145 }
00146
00147 void JobPrivate::startQueued()
00148 {
00149 Q_Q( Job );
00150 mStarted = true;
00151
00152 emit q->aboutToStart( q );
00153 q->doStart();
00154 QTimer::singleShot( 0, q, SLOT( startNext() ) );
00155
00156
00157 if ( s_jobtracker ) {
00158 QList<QVariant> argumentList;
00159 argumentList << QString::number(reinterpret_cast<unsigned long>( q ), 16);
00160 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String( "jobStarted" ), argumentList);
00161 }
00162 }
00163
00164 void JobPrivate::lostConnection()
00165 {
00166 Q_Q( Job );
00167
00168 if ( mCurrentSubJob ) {
00169 mCurrentSubJob->d_ptr->lostConnection();
00170 } else {
00171 q->setError( Job::ConnectionFailed );
00172 q->kill( KJob::EmitResult );
00173 }
00174 }
00175
00176 void JobPrivate::slotSubJobAboutToStart( Job * job )
00177 {
00178 Q_ASSERT( mCurrentSubJob == 0 );
00179 mCurrentSubJob = job;
00180 }
00181
00182 void JobPrivate::startNext()
00183 {
00184 Q_Q( Job );
00185
00186 if ( mStarted && !mCurrentSubJob && q->hasSubjobs() ) {
00187 Job *job = dynamic_cast<Akonadi::Job*>( q->subjobs().first() );
00188 Q_ASSERT( job );
00189 job->d_ptr->startQueued();
00190 }
00191 }
00192
00193 QByteArray JobPrivate::newTag( )
00194 {
00195 if ( mParentJob )
00196 mTag = mParentJob->d_ptr->newTag();
00197 else
00198 mTag = QByteArray::number( mSession->d->nextTag() );
00199 return mTag;
00200 }
00201
00202 QByteArray JobPrivate::tag() const
00203 {
00204 return mTag;
00205 }
00206
00207 void JobPrivate::writeData( const QByteArray & data )
00208 {
00209 Q_ASSERT_X( !mWriteFinished, "Job::writeData()", "Calling writeData() after emitting writeFinished()" );
00210 mSession->d->writeData( data );
00211 }
00212
00213 void JobPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
00214 {
00215 mSession->d->itemRevisionChanged( itemId, oldRevision, newRevision );
00216 }
00217
00218 void JobPrivate::updateItemRevision( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
00219 {
00220 Q_Q( Job );
00221 foreach ( KJob *j, q->subjobs() ) {
00222 Akonadi::Job *job = qobject_cast<Akonadi::Job*>( j );
00223 if ( job )
00224 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
00225 }
00226 doUpdateItemRevision( itemId, oldRevision, newRevision );
00227 }
00228
00229 void JobPrivate::doUpdateItemRevision( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
00230 {
00231 Q_UNUSED( itemId );
00232 Q_UNUSED( oldRevision );
00233 Q_UNUSED( newRevision );
00234 }
00235
00236
00237
00238 Job::Job( QObject *parent )
00239 : KCompositeJob( parent ),
00240 d_ptr( new JobPrivate( this ) )
00241 {
00242 d_ptr->init( parent );
00243 }
00244
00245 Job::Job( JobPrivate *dd, QObject *parent )
00246 : KCompositeJob( parent ),
00247 d_ptr( dd )
00248 {
00249 d_ptr->init( parent );
00250 }
00251
00252 Job::~Job()
00253 {
00254 delete d_ptr;
00255
00256
00257 if ( s_jobtracker ) {
00258 QList<QVariant> argumentList;
00259 argumentList << QString::number(reinterpret_cast<unsigned long>( this ), 16)
00260 << errorString();
00261 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String( "jobEnded" ), argumentList);
00262 }
00263 }
00264
00265 void Job::start()
00266 {
00267 }
00268
00269 bool Job::doKill()
00270 {
00271 Q_D( Job );
00272 d->mStarted = false;
00273 return true;
00274 }
00275
00276 QString Job::errorString() const
00277 {
00278 QString str;
00279 switch ( error() ) {
00280 case NoError:
00281 break;
00282 case ConnectionFailed:
00283 str = i18n( "Cannot connect to the Akonadi service." );
00284 break;
00285 case ProtocolVersionMismatch:
00286 str = i18n( "The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed." );
00287 break;
00288 case UserCanceled:
00289 str = i18n( "User canceled operation." );
00290 break;
00291 case Unknown:
00292 default:
00293 str = i18n( "Unknown error." );
00294 break;
00295 }
00296 if ( !errorText().isEmpty() ) {
00297 str += QString::fromLatin1( " (%1)" ).arg( errorText() );
00298 }
00299 return str;
00300 }
00301
00302 bool Job::addSubjob( KJob * job )
00303 {
00304 bool rv = KCompositeJob::addSubjob( job );
00305 if ( rv ) {
00306 connect( job, SIGNAL( aboutToStart( Akonadi::Job* ) ), SLOT( slotSubJobAboutToStart( Akonadi::Job* ) ) );
00307 QTimer::singleShot( 0, this, SLOT( startNext() ) );
00308 }
00309 return rv;
00310 }
00311
00312 bool Job::removeSubjob(KJob * job)
00313 {
00314 bool rv = KCompositeJob::removeSubjob( job );
00315 if ( job == d_ptr->mCurrentSubJob ) {
00316 d_ptr->mCurrentSubJob = 0;
00317 QTimer::singleShot( 0, this, SLOT( startNext() ) );
00318 }
00319 return rv;
00320 }
00321
00322 void Job::doHandleResponse(const QByteArray & tag, const QByteArray & data)
00323 {
00324 kDebug() << "Unhandled response: " << tag << data;
00325 }
00326
00327 void Job::slotResult(KJob * job)
00328 {
00329 if ( d_ptr->mCurrentSubJob == job ) {
00330
00331 d_ptr->mCurrentSubJob = 0;
00332 KCompositeJob::slotResult( job );
00333 if ( !job->error() )
00334 QTimer::singleShot( 0, this, SLOT( startNext() ) );
00335 } else {
00336
00337
00338
00339 KCompositeJob::removeSubjob( job );
00340 }
00341 }
00342
00343 void Job::emitWriteFinished()
00344 {
00345 d_ptr->mWriteFinished = true;
00346 emit writeFinished( this );
00347 }
00348
00349 #include "job.moc"