21 #include "session_p.h"
23 #include "imapparser_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "protocolhelper_p.h"
29 #include "xdgbasedirs_p.h"
32 #include <klocalizedstring.h>
34 #include <QCoreApplication>
35 #include <QtCore/QDir>
36 #include <QtCore/QQueue>
37 #include <QtCore/QThreadStorage>
38 #include <QtCore/QTimer>
39 #include <QtCore/QThread>
42 #include <QtNetwork/QLocalSocket>
43 #include <QtNetwork/QTcpSocket>
44 #include <QtNetwork/QHostAddress>
45 #include <QApplication>
50 #define PIPELINE_LENGTH 0
53 using namespace Akonadi;
57 static const QList<QByteArray> sCapabilities = QList<QByteArray>()
60 <<
"AKAPPENDSTREAMING"
63 void SessionPrivate::startNext()
65 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
70 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
71 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
72 || localSocket->state() == QLocalSocket::ConnectingState ) ) {
77 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
78 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
79 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
85 QString serverAddress;
90 const QByteArray serverAddressEnvVar = qgetenv(
"AKONADI_SERVER_ADDRESS" );
91 if ( !serverAddressEnvVar.isEmpty() ) {
92 const int pos = serverAddressEnvVar.indexOf(
':' );
93 const QByteArray protocol = serverAddressEnvVar.left( pos );
94 QMap<QString, QString> options;
95 foreach (
const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(
',') ) ) {
96 const QStringList pair = entry.split( QLatin1Char(
'=') );
97 if ( pair.size() != 2 )
99 options.insert( pair.first(), pair.last() );
101 kDebug() << protocol << options;
103 if ( protocol ==
"tcp" ) {
104 serverAddress = options.value( QLatin1String(
"host" ) );
105 port = options.value( QLatin1String(
"port" ) ).toUInt();
107 }
else if ( protocol ==
"unix" ) {
108 serverAddress = options.value( QLatin1String(
"path" ) );
109 }
else if ( protocol ==
"pipe" ) {
110 serverAddress = options.value( QLatin1String(
"name" ) );
115 if ( serverAddress.isEmpty() ) {
117 const QFileInfo fileInfo( connectionConfigFile );
118 if ( !fileInfo.exists() ) {
119 kDebug() <<
"Akonadi Client Session: connection config file '"
120 "akonadi/akonadiconnectionrc' can not be found in"
121 << XdgBaseDirs::homePath(
"config" ) <<
"nor in any of"
122 << XdgBaseDirs::systemPathList(
"config" );
124 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
126 #ifdef Q_OS_WIN //krazy:exclude=cpp
127 serverAddress = connectionSettings.value( QLatin1String(
"Data/NamedPipe" ), QLatin1String(
"Akonadi" ) ).toString();
129 const QString defaultSocketDir = Internal::xdgSaveDir(
"data" );
130 serverAddress = connectionSettings.value( QLatin1String(
"Data/UnixPath" ), QString(defaultSocketDir + QLatin1String(
"/akonadiserver.socket" )) ).toString();
138 socket = localSocket =
new QLocalSocket( mParent );
139 mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
141 socket = tcpSocket =
new QTcpSocket( mParent );
142 mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
144 mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
145 mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
149 kDebug() <<
"connectToServer" << serverAddress;
151 localSocket->connectToServer( serverAddress );
153 tcpSocket->connectToHost( serverAddress, port );
161 return Internal::xdgSaveDir(
"config" ) + QLatin1String(
"/akonadiconnectionrc");
164 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
166 Q_ASSERT( mParent->sender() == socket );
167 kWarning() <<
"Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
168 socketDisconnected();
171 void SessionPrivate::socketError( QAbstractSocket::SocketError )
173 Q_ASSERT( mParent->sender() == socket );
174 kWarning() <<
"Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
175 socketDisconnected();
178 void SessionPrivate::socketDisconnected()
181 currentJob->d_ptr->lostConnection();
185 void SessionPrivate::dataReceived()
187 while ( socket->bytesAvailable() > 0 ) {
188 if ( parser->continuationSize() > 1 ) {
189 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
190 parser->parseBlock( data );
191 }
else if ( socket->canReadLine() ) {
192 if ( !parser->parseNextLine( socket->readLine() ) )
196 logFile->write(
"S: " + parser->data() );
200 if ( parser->tag() == QByteArray(
"0" ) ) {
201 if ( parser->data().startsWith(
"OK" ) ) {
202 writeData(
"1 CAPABILITY (" + ImapParser::join( sCapabilities,
" " ) +
")");
204 kWarning() <<
"Unable to login to Akonadi server:" << parser->data();
206 QTimer::singleShot( 1000, mParent, SLOT(
reconnect()) );
211 if ( parser->tag() == QByteArray(
"1") ) {
212 if ( parser->data().startsWith(
"OK") ) {
216 kDebug() <<
"Unhandled server capability response:" << parser->data();
221 if ( parser->tag() ==
"*" && parser->data().startsWith(
"OK Akonadi" ) ) {
222 const int pos = parser->data().indexOf(
"[PROTOCOL" );
225 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
226 protocolVersion = tmp;
227 Internal::setServerProtocolVersion( tmp );
229 kDebug() <<
"Server protocol version is:" << protocolVersion;
231 writeData(
"0 LOGIN " + ImapParser::quote( sessionId ) +
'\n' );
236 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
247 bool SessionPrivate::canPipelineNext()
249 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
251 if ( pipeline.isEmpty() && currentJob )
252 return currentJob->d_ptr->mWriteFinished;
253 if ( !pipeline.isEmpty() )
254 return pipeline.last()->d_ptr->mWriteFinished;
258 void SessionPrivate::doStartNext()
260 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
262 if ( canPipelineNext() ) {
264 pipeline.enqueue( nextJob );
270 if ( !pipeline.isEmpty() ) {
271 currentJob = pipeline.dequeue();
273 currentJob = queue.dequeue();
274 startJob( currentJob );
278 void SessionPrivate::startJob(
Job *job )
280 if ( protocolVersion < minimumProtocolVersion() ) {
282 job->setErrorText( i18n(
"Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
285 job->d_ptr->startQueued();
294 void SessionPrivate::jobDone(KJob * job)
298 if ( job == currentJob ) {
299 if ( pipeline.isEmpty() ) {
303 currentJob = pipeline.dequeue();
308 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
310 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
314 void SessionPrivate::jobWriteFinished(
Akonadi::Job* job )
316 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
322 void SessionPrivate::jobDestroyed(QObject * job)
325 jobDone( static_cast<KJob*>( job ) );
331 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
333 QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
345 logFile->write(
"C: " + data );
346 if ( !data.endsWith(
'\n' ) ) {
347 logFile->write(
"\n" );
353 socket->write( data );
355 kWarning() <<
"Trying to write while session is disconnected!" << kBacktrace();
366 Q_FOREACH (
Job *job, queue ) {
368 job->kill( KJob::EmitResult );
377 foreach (
Job *job, queue )
378 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
383 SessionPrivate::SessionPrivate(
Session *parent )
384 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 ), logFile( 0 )
388 void SessionPrivate::init(
const QByteArray &
id )
391 parser =
new ImapParser();
393 if ( !
id.isEmpty() ) {
396 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
397 +
'-' + QByteArray::number( qrand() );
409 const QByteArray sessionLogFile = qgetenv(
"AKONADI_SESSION_LOGFILE" );
410 if ( !sessionLogFile.isEmpty() ) {
411 logFile =
new QFile( QString::fromLatin1(
"%1.%2.%3" ).arg( QString::fromLatin1( sessionLogFile ) )
412 .arg( QString::number( QApplication::applicationPid() ) )
413 .arg( QString::fromLatin1( sessionId ) ),
415 if ( !logFile->open( QIODevice::WriteOnly | QIODevice::Truncate ) ) {
416 kWarning() <<
"Failed to open Akonadi Session log file" << logFile->fileName();
430 socket->disconnect( mParent );
434 QMetaObject::invokeMethod( mParent,
"reconnect", Qt::QueuedConnection );
441 d->init( sessionId );
448 d->init( sessionId );
462 static QThreadStorage<Session*> instances;
466 Q_ASSERT_X( !sessionId.isEmpty(),
"SessionPrivate::createDefaultSession",
467 "You tried to create a default session with empty session id!" );
468 Q_ASSERT_X( !instances.hasLocalData(),
"SessionPrivate::createDefaultSession",
469 "You tried to create a default session twice!" );
471 instances.setLocalData(
new Session( sessionId ) );
476 instances.setLocalData( session );
481 if ( !instances.hasLocalData() )
482 instances.setLocalData(
new Session() );
483 return instances.localData();
488 foreach (
Job* job, d->queue )
489 job->kill( KJob::EmitResult );
491 foreach (
Job* job, d->pipeline ) {
492 job->d_ptr->mStarted =
false;
493 job->kill( KJob::EmitResult );
496 if ( d->currentJob ) {
497 d->currentJob->d_ptr->mStarted =
false;
498 d->currentJob->kill( KJob::EmitResult );
503 #include "moc_session.cpp"
The server protocol version is too old or too new.
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
int nextTag()
Returns the next IMAP tag.
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
Base class for all actions in the Akonadi storage.
The connection to the Akonadi server failed.
static void setDefaultSession(Session *session)
Sets the default session.
static Session * defaultSession()
Returns the default session for this thread.
~Session()
Destroys the session.
QByteArray sessionId() const
Returns the session identifier.
void clear()
Stops all jobs queued for execution.
static State state()
Returns the state of the server.
A communication session with the Akonadi storage.
Server is not running, could be no one started it yet or it failed to start.
static bool start()
Starts the server.
virtual void addJob(Job *job)
Associates the given Job object with this session.
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
Server is running and operational.
void writeData(const QByteArray &data)
Sends the given raw data.
static QString connectionFile()
Default location for akonadiconnectionrc.
State
Enum for the various states the server can be in.
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.
Server is not operational and an error has been detected.