00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include "session.h"
00021 #include "session_p.h"
00022
00023 #include "imapparser_p.h"
00024 #include "job.h"
00025 #include "job_p.h"
00026 #include "servermanager.h"
00027 #include "servermanager_p.h"
00028 #include "xdgbasedirs_p.h"
00029
00030 #include <kdebug.h>
00031 #include <klocale.h>
00032
00033 #include <QCoreApplication>
00034 #include <QtCore/QDir>
00035 #include <QtCore/QQueue>
00036 #include <QtCore/QThreadStorage>
00037 #include <QtCore/QTimer>
00038 #include <QtCore/QThread>
00039 #include <QSettings>
00040
00041 #include <QtNetwork/QLocalSocket>
00042 #include <QtNetwork/QTcpSocket>
00043 #include <QtNetwork/QHostAddress>
00044
00045
00046
00047
00048 #define PIPELINE_LENGTH 0
00049
00050
00051 using namespace Akonadi;
00052
00053
00054
00055
00056 void SessionPrivate::startNext()
00057 {
00058 QTimer::singleShot( 0, mParent, SLOT( doStartNext() ) );
00059 }
00060
00061 void SessionPrivate::reconnect()
00062 {
00063 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
00064 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
00065 || localSocket->state() == QLocalSocket::ConnectingState ) ) {
00066
00067 return;
00068 }
00069
00070 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
00071 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
00072 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
00073
00074 return;
00075 }
00076
00077
00078 QString serverAddress;
00079 quint16 port = 0;
00080 bool useTcp = false;
00081
00082
00083 const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
00084 if ( !serverAddressEnvVar.isEmpty() ) {
00085 const int pos = serverAddressEnvVar.indexOf( ':' );
00086 const QByteArray protocol = serverAddressEnvVar.left( pos );
00087 QMap<QString, QString> options;
00088 foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
00089 const QStringList pair = entry.split( QLatin1Char('=') );
00090 if ( pair.size() != 2 )
00091 continue;
00092 options.insert( pair.first(), pair.last() );
00093 }
00094 kDebug() << protocol << options;
00095
00096 if ( protocol == "tcp" ) {
00097 serverAddress = options.value( QLatin1String( "host" ) );
00098 port = options.value( QLatin1String( "port" ) ).toUInt();
00099 useTcp = true;
00100 } else if ( protocol == "unix" ) {
00101 serverAddress = options.value( QLatin1String( "path" ) );
00102 } else if ( protocol == "pipe" ) {
00103 serverAddress = options.value( QLatin1String( "name" ) );
00104 }
00105 }
00106
00107
00108 if ( serverAddress.isEmpty() ) {
00109 const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile();
00110 const QFileInfo fileInfo( connectionConfigFile );
00111 if ( !fileInfo.exists() ) {
00112 kDebug() << "Akonadi Client Session: connection config file '"
00113 "akonadi/akonadiconnectionrc' can not be found in"
00114 << XdgBaseDirs::homePath( "config" ) << "nor in any of"
00115 << XdgBaseDirs::systemPathList( "config" );
00116 }
00117 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
00118
00119 #ifdef Q_OS_WIN //krazy:exclude=cpp
00120 serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
00121 #else
00122 const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) );
00123 serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), defaultSocketDir + QLatin1String( "/akonadiserver.socket" ) ).toString();
00124 #endif
00125 }
00126 #ifdef Q_OS_WINCE
00127 useTcp = true;
00128 #endif
00129
00130
00131
00132 if ( !socket ) {
00133 if ( !useTcp ) {
00134 socket = localSocket = new QLocalSocket( mParent );
00135 mParent->connect( localSocket, SIGNAL( error( QLocalSocket::LocalSocketError ) ), SLOT( socketError( QLocalSocket::LocalSocketError ) ) );
00136 } else {
00137 socket = tcpSocket = new QTcpSocket( mParent );
00138 mParent->connect( tcpSocket, SIGNAL( error( QAbstractSocket::SocketError ) ), SLOT( socketError( QAbstractSocket::SocketError ) ) );
00139 }
00140 mParent->connect( socket, SIGNAL( disconnected() ), SLOT( socketDisconnected() ) );
00141 mParent->connect( socket, SIGNAL( readyRead() ), SLOT( dataReceived() ) );
00142 }
00143
00144
00145 kDebug() << "connectToServer" << serverAddress;
00146 #ifdef Q_OS_WINCE
00147 tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
00148 #else
00149 if ( !useTcp ) {
00150 localSocket->connectToServer( serverAddress );
00151 } else {
00152 tcpSocket->connectToHost( serverAddress, port );
00153 }
00154 #endif
00155
00156 emit mParent->reconnected();
00157 }
00158
00159 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
00160 {
00161 Q_ASSERT( mParent->sender() == socket );
00162 kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
00163 socketDisconnected();
00164 }
00165
00166 void SessionPrivate::socketError( QAbstractSocket::SocketError )
00167 {
00168 Q_ASSERT( mParent->sender() == socket );
00169 kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
00170 socketDisconnected();
00171 }
00172
00173 void SessionPrivate::socketDisconnected()
00174 {
00175 if ( currentJob )
00176 currentJob->d_ptr->lostConnection();
00177 connected = false;
00178 }
00179
00180 void SessionPrivate::dataReceived()
00181 {
00182 while ( socket->bytesAvailable() > 0 ) {
00183 if ( parser->continuationSize() > 1 ) {
00184 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
00185 parser->parseBlock( data );
00186 } else if ( socket->canReadLine() ) {
00187 if ( !parser->parseNextLine( socket->readLine() ) )
00188 continue;
00189
00190
00191 if ( parser->tag() == QByteArray( "0" ) ) {
00192 if ( parser->data().startsWith( "OK" ) ) {
00193 connected = true;
00194 startNext();
00195 } else {
00196 kWarning() << "Unable to login to Akonadi server:" << parser->data();
00197 socket->close();
00198 QTimer::singleShot( 1000, mParent, SLOT( reconnect() ) );
00199 }
00200 }
00201
00202
00203 if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
00204 const int pos = parser->data().indexOf( "[PROTOCOL" );
00205 if ( pos > 0 ) {
00206 qint64 tmp = 0;
00207 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
00208 protocolVersion = tmp;
00209 Internal::setServerProtocolVersion( tmp );
00210 }
00211 kDebug() << "Server protocol version is:" << protocolVersion;
00212
00213 writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
00214
00215
00216 } else {
00217 if ( currentJob )
00218 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
00219 }
00220
00221
00222 parser->reset();
00223 } else {
00224 break;
00225 }
00226 }
00227 }
00228
00229 bool SessionPrivate::canPipelineNext()
00230 {
00231 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
00232 return false;
00233 if ( pipeline.isEmpty() && currentJob )
00234 return currentJob->d_ptr->mWriteFinished;
00235 if ( !pipeline.isEmpty() )
00236 return pipeline.last()->d_ptr->mWriteFinished;
00237 return false;
00238 }
00239
00240 void SessionPrivate::doStartNext()
00241 {
00242 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
00243 return;
00244 if ( canPipelineNext() ) {
00245 Akonadi::Job *nextJob = queue.dequeue();
00246 pipeline.enqueue( nextJob );
00247 startJob( nextJob );
00248 }
00249 if ( jobRunning )
00250 return;
00251 jobRunning = true;
00252 if ( !pipeline.isEmpty() ) {
00253 currentJob = pipeline.dequeue();
00254 } else {
00255 currentJob = queue.dequeue();
00256 startJob( currentJob );
00257 }
00258 }
00259
00260 void SessionPrivate::startJob( Job *job )
00261 {
00262 if ( protocolVersion < minimumProtocolVersion() ) {
00263 job->setError( Job::ProtocolVersionMismatch );
00264 job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
00265 job->emitResult();
00266 } else {
00267 job->d_ptr->startQueued();
00268 }
00269 }
00270
00271 void SessionPrivate::endJob( Job *job )
00272 {
00273 job->emitResult();
00274 }
00275
00276 void SessionPrivate::jobDone(KJob * job)
00277 {
00278
00279
00280 if ( job == currentJob ) {
00281 if ( pipeline.isEmpty() ) {
00282 jobRunning = false;
00283 currentJob = 0;
00284 } else {
00285 currentJob = pipeline.dequeue();
00286 }
00287 startNext();
00288 } else {
00289
00290 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
00291
00292 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
00293 }
00294 }
00295
00296 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
00297 {
00298 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
00299 Q_UNUSED( job );
00300
00301 startNext();
00302 }
00303
00304 void SessionPrivate::jobDestroyed(QObject * job)
00305 {
00306
00307 jobDone( static_cast<KJob*>( job ) );
00308 }
00309
00310 void SessionPrivate::addJob(Job * job)
00311 {
00312 queue.append( job );
00313 QObject::connect( job, SIGNAL( result( KJob* ) ), mParent, SLOT( jobDone( KJob* ) ) );
00314 QObject::connect( job, SIGNAL( writeFinished( Akonadi::Job* ) ), mParent, SLOT( jobWriteFinished( Akonadi::Job* ) ) );
00315 QObject::connect( job, SIGNAL( destroyed( QObject* ) ), mParent, SLOT( jobDestroyed( QObject* ) ) );
00316 startNext();
00317 }
00318
00319 int SessionPrivate::nextTag()
00320 {
00321 return theNextTag++;
00322 }
00323
00324 void SessionPrivate::writeData(const QByteArray & data)
00325 {
00326 if ( socket )
00327 socket->write( data );
00328 else
00329 kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
00330 }
00331
00332 void SessionPrivate::serverStateChanged( ServerManager::State state )
00333 {
00334 if ( state == ServerManager::Running && !connected )
00335 reconnect();
00336 }
00337
00338 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
00339 {
00340
00341
00342 foreach ( Job *job, queue )
00343 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
00344 }
00345
00346
00347
00348
00349 SessionPrivate::SessionPrivate( Session *parent )
00350 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
00351 {
00352 }
00353
00354 void SessionPrivate::init( const QByteArray &id )
00355 {
00356 kDebug() << id;
00357 parser = new ImapParser();
00358
00359 if ( !id.isEmpty() ) {
00360 sessionId = id;
00361 } else {
00362 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
00363 + '-' + QByteArray::number( qrand() );
00364 }
00365
00366 connected = false;
00367 theNextTag = 1;
00368 jobRunning = false;
00369
00370 if ( ServerManager::state() == ServerManager::NotRunning )
00371 ServerManager::start();
00372 mParent->connect( ServerManager::self(), SIGNAL( stateChanged( Akonadi::ServerManager::State ) ),
00373 SLOT( serverStateChanged( Akonadi::ServerManager::State ) ) );
00374
00375 reconnect();
00376 }
00377
00378 Session::Session(const QByteArray & sessionId, QObject * parent) :
00379 QObject( parent ),
00380 d( new SessionPrivate( this ) )
00381 {
00382 d->init( sessionId );
00383 }
00384
00385 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
00386 : QObject( parent ),
00387 d( dd )
00388 {
00389 d->init( sessionId );
00390 }
00391
00392 Session::~Session()
00393 {
00394 clear();
00395 delete d;
00396 }
00397
00398 QByteArray Session::sessionId() const
00399 {
00400 return d->sessionId;
00401 }
00402
00403 static QThreadStorage<Session*> instances;
00404
00405 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
00406 {
00407 Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
00408 "You tried to create a default session with empty session id!" );
00409 Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
00410 "You tried to create a default session twice!" );
00411
00412 instances.setLocalData( new Session( sessionId ) );
00413 }
00414
00415 Session* Session::defaultSession()
00416 {
00417 if ( !instances.hasLocalData() )
00418 instances.setLocalData( new Session() );
00419 return instances.localData();
00420 }
00421
00422 void Session::clear()
00423 {
00424 foreach ( Job* job, d->queue )
00425 job->kill( KJob::EmitResult );
00426 d->queue.clear();
00427 foreach ( Job* job, d->pipeline )
00428 job->kill( KJob::EmitResult );
00429 d->pipeline.clear();
00430 if ( d->currentJob )
00431 d->currentJob->kill( KJob::EmitResult );
00432 d->jobRunning = false;
00433 d->connected = false;
00434 if ( d->socket )
00435 d->socket->disconnect( this );
00436 delete d->socket;
00437 d->socket = 0;
00438 QMetaObject::invokeMethod( this, "reconnect", Qt::QueuedConnection );
00439 }
00440
00441 #include "session.moc"