24 #include "dbusconnectionpool.h"
26 #include "imapparser_p.h"
28 #include "session_p.h"
31 #include <klocalizedstring.h>
33 #include <QtCore/QEventLoop>
34 #include <QtCore/QTimer>
35 #include <QtCore/QTextStream>
36 #include <QtDBus/QDBusInterface>
37 #include <QtDBus/QDBusConnectionInterface>
39 using namespace Akonadi;
41 static QDBusAbstractInterface *s_jobtracker = 0;
44 void JobPrivate::handleResponse(
const QByteArray & tag,
const QByteArray & data )
48 if ( mCurrentSubJob ) {
49 mCurrentSubJob->d_ptr->handleResponse( tag, data );
54 if ( data.startsWith(
"NO " ) || data.startsWith(
"BAD " ) ) {
55 QString msg = QString::fromUtf8( data );
57 msg.remove( 0, msg.startsWith( QLatin1String(
"NO " ) ) ? 3 : 4 );
59 if ( msg.endsWith( QLatin1String(
"\r\n" ) ) )
63 q->setErrorText( msg );
66 }
else if ( data.startsWith(
"OK" ) ) {
73 QTimer::singleShot( 0, q, SLOT(delayedEmitResult()) );
78 q->doHandleResponse( tag, data );
81 void JobPrivate::init( QObject *parent )
85 mParentJob =
dynamic_cast<Job*
>( parent );
86 mSession =
dynamic_cast<Session*
>( parent );
92 mSession = mParentJob->d_ptr->mSession;
96 mSession->d->addJob( q );
101 if ( !s_jobtracker ) {
104 static QTime s_lastTime;
105 if ( s_lastTime.isNull() || s_lastTime.elapsed() > 3000 ) {
106 if ( s_lastTime.isNull() )
108 if ( DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
109 s_jobtracker =
new QDBusInterface( QLatin1String(
"org.kde.akonadiconsole" ),
110 QLatin1String(
"/jobtracker" ),
111 QLatin1String(
"org.freedesktop.Akonadi.JobTracker" ),
112 DBusConnectionPool::threadConnection(), 0 );
114 s_lastTime.restart();
120 QMetaObject::invokeMethod( q,
"signalCreationToJobTracker", Qt::QueuedConnection );
123 void JobPrivate::signalCreationToJobTracker()
126 if ( s_jobtracker ) {
131 QList<QVariant> argumentList;
132 argumentList << QLatin1String( mSession->
sessionId() )
133 << QString::number(reinterpret_cast<quintptr>( q ), 16)
134 << ( mParentJob ? QString::number( reinterpret_cast<quintptr>( mParentJob ), 16) : QString() )
135 << QString::fromLatin1( q->metaObject()->className() )
136 << jobDebuggingString();
137 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobCreated" ), argumentList);
141 void JobPrivate::signalStartedToJobTracker()
144 if ( s_jobtracker ) {
146 QList<QVariant> argumentList;
147 argumentList << QString::number(reinterpret_cast<quintptr>( q ), 16);
148 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobStarted" ), argumentList);
157 void JobPrivate::delayedEmitResult()
164 void JobPrivate::startQueued()
169 emit q->aboutToStart( q );
171 QTimer::singleShot( 0, q, SLOT(startNext()) );
172 QMetaObject::invokeMethod( q,
"signalStartedToJobTracker", Qt::QueuedConnection );
175 void JobPrivate::lostConnection()
179 if ( mCurrentSubJob ) {
180 mCurrentSubJob->d_ptr->lostConnection();
187 void JobPrivate::slotSubJobAboutToStart(
Job * job )
189 Q_ASSERT( mCurrentSubJob == 0 );
190 mCurrentSubJob = job;
193 void JobPrivate::startNext()
197 if ( mStarted && !mCurrentSubJob && q->hasSubjobs() ) {
200 job->d_ptr->startQueued();
207 mTag = mParentJob->d_ptr->newTag();
209 mTag = QByteArray::number( mSession->d->nextTag() );
220 Q_ASSERT_X( !mWriteFinished,
"Job::writeData()",
"Calling writeData() after emitting writeFinished()" );
221 mSession->d->writeData( data );
226 mSession->d->itemRevisionChanged( itemId, oldRevision, newRevision );
232 foreach ( KJob *j, q->subjobs() ) {
235 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
243 Q_UNUSED( oldRevision );
244 Q_UNUSED( newRevision );
247 int JobPrivate::protocolVersion()
const
249 return mSession->d->protocolVersion;
254 : KCompositeJob( parent ),
257 d_ptr->init( parent );
261 : KCompositeJob( parent ),
264 d_ptr->init( parent );
272 if ( s_jobtracker ) {
273 QList<QVariant> argumentList;
274 argumentList << QString::number(reinterpret_cast<quintptr>( this ), 16)
276 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobEnded" ), argumentList);
289 d->mSession->d->forceReconnect();
302 str = i18n(
"Cannot connect to the Akonadi service." );
305 str = i18n(
"The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed." );
308 str = i18n(
"User canceled operation." );
313 str = i18n(
"Unknown error." );
316 if ( !errorText().isEmpty() ) {
317 str += QString::fromLatin1(
" (%1)" ).arg( errorText() );
324 bool rv = KCompositeJob::addSubjob( job );
327 QTimer::singleShot( 0,
this, SLOT(startNext()) );
334 bool rv = KCompositeJob::removeSubjob( job );
335 if ( job == d_ptr->mCurrentSubJob ) {
336 d_ptr->mCurrentSubJob = 0;
337 QTimer::singleShot( 0,
this, SLOT(startNext()) );
344 kDebug() <<
"Unhandled response: " << tag << data;
347 void Job::slotResult(KJob * job)
349 if ( d_ptr->mCurrentSubJob == job ) {
351 d_ptr->mCurrentSubJob = 0;
352 KCompositeJob::slotResult( job );
354 QTimer::singleShot( 0,
this, SLOT(startNext()) );
359 KCompositeJob::removeSubjob( job );
365 d_ptr->mWriteFinished =
true;
369 #include "moc_job.cpp"
virtual void doUpdateItemRevision(Akonadi::Item::Id, int oldRevision, int newRevision)
Overwrite this if your job does operations with conflict detection and update the item revisions if y...
The server protocol version is too old or too new.
Base class for all actions in the Akonadi storage.
void updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to this job and its sub-jobs.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Notify following jobs about item revision changes.
The connection to the Akonadi server failed.
static Session * defaultSession()
Returns the default session for this thread.
void start()
Jobs are started automatically once entering the event loop again, no need to explicitly call this...
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
QByteArray sessionId() const
Returns the session identifier.
virtual void aboutToFinish()
This method is called right before result() and finished() signals are emitted.
QByteArray tag() const
Return the tag used for the request.
virtual bool addSubjob(KJob *job)
Adds the given job as a subjob to this job.
virtual bool doKill()
Kills the execution of the job.
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
A communication session with the Akonadi storage.
QByteArray newTag()
Returns a new unique command tag for communication with the backend.
virtual void doHandleResponse(const QByteArray &tag, const QByteArray &data)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data...
virtual ~Job()
Destroys the job.
void writeData(const QByteArray &data)
Sends raw data to the backend.
virtual bool removeSubjob(KJob *job)
Removes the given subjob of this job.
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
The user canceld this job.
virtual QString errorString() const
Returns the error string, if there has been an error, an empty string otherwise.
Job(QObject *parent=0)
Creates a new job.