21 #include "session_p.h"
23 #include "imapparser_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
48 #define PIPELINE_LENGTH 0
51 using namespace Akonadi;
56 void SessionPrivate::startNext()
58 QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
63 QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
64 if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
65 || localSocket->state() == QLocalSocket::ConnectingState ) ) {
70 QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
71 if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
72 || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
78 QString serverAddress;
83 const QByteArray serverAddressEnvVar = qgetenv(
"AKONADI_SERVER_ADDRESS" );
84 if ( !serverAddressEnvVar.isEmpty() ) {
85 const int pos = serverAddressEnvVar.indexOf(
':' );
86 const QByteArray protocol = serverAddressEnvVar.left( pos );
87 QMap<QString, QString> options;
88 foreach (
const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(
',') ) ) {
89 const QStringList pair = entry.split( QLatin1Char(
'=') );
90 if ( pair.size() != 2 )
92 options.insert( pair.first(), pair.last() );
94 kDebug() << protocol << options;
96 if ( protocol ==
"tcp" ) {
97 serverAddress = options.value( QLatin1String(
"host" ) );
98 port = options.value( QLatin1String(
"port" ) ).toUInt();
100 }
else if ( protocol ==
"unix" ) {
101 serverAddress = options.value( QLatin1String(
"path" ) );
102 }
else if ( protocol ==
"pipe" ) {
103 serverAddress = options.value( QLatin1String(
"name" ) );
108 if ( serverAddress.isEmpty() ) {
110 const QFileInfo fileInfo( connectionConfigFile );
111 if ( !fileInfo.exists() ) {
112 kDebug() <<
"Akonadi Client Session: connection config file '"
113 "akonadi/akonadiconnectionrc' can not be found in"
114 << XdgBaseDirs::homePath(
"config" ) <<
"nor in any of"
115 << XdgBaseDirs::systemPathList(
"config" );
117 const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
119 #ifdef Q_OS_WIN //krazy:exclude=cpp
120 serverAddress = connectionSettings.value( QLatin1String(
"Data/NamedPipe" ), QLatin1String(
"Akonadi" ) ).toString();
122 const QString defaultSocketDir = Internal::xdgSaveDir(
"data" );
123 serverAddress = connectionSettings.value( QLatin1String(
"Data/UnixPath" ), QString(defaultSocketDir + QLatin1String(
"/akonadiserver.socket" )) ).toString();
134 socket = localSocket =
new QLocalSocket( mParent );
135 mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
137 socket = tcpSocket =
new QTcpSocket( mParent );
138 mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
140 mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
141 mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
145 kDebug() <<
"connectToServer" << serverAddress;
147 tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
150 localSocket->connectToServer( serverAddress );
152 tcpSocket->connectToHost( serverAddress, port );
161 return Internal::xdgSaveDir(
"config" ) + QLatin1String(
"/akonadiconnectionrc");
164 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
166 Q_ASSERT( mParent->sender() == socket );
167 kWarning() <<
"Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
168 socketDisconnected();
171 void SessionPrivate::socketError( QAbstractSocket::SocketError )
173 Q_ASSERT( mParent->sender() == socket );
174 kWarning() <<
"Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
175 socketDisconnected();
178 void SessionPrivate::socketDisconnected()
181 currentJob->d_ptr->lostConnection();
185 void SessionPrivate::dataReceived()
187 while ( socket->bytesAvailable() > 0 ) {
188 if ( parser->continuationSize() > 1 ) {
189 const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
190 parser->parseBlock( data );
191 }
else if ( socket->canReadLine() ) {
192 if ( !parser->parseNextLine( socket->readLine() ) )
196 if ( parser->tag() == QByteArray(
"0" ) ) {
197 if ( parser->data().startsWith(
"OK" ) ) {
201 kWarning() <<
"Unable to login to Akonadi server:" << parser->data();
203 QTimer::singleShot( 1000, mParent, SLOT(
reconnect()) );
208 if ( parser->tag() ==
"*" && parser->data().startsWith(
"OK Akonadi" ) ) {
209 const int pos = parser->data().indexOf(
"[PROTOCOL" );
212 ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
213 protocolVersion = tmp;
214 Internal::setServerProtocolVersion( tmp );
216 kDebug() <<
"Server protocol version is:" << protocolVersion;
218 writeData(
"0 LOGIN " + ImapParser::quote( sessionId ) +
'\n' );
223 currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
234 bool SessionPrivate::canPipelineNext()
236 if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
238 if ( pipeline.isEmpty() && currentJob )
239 return currentJob->d_ptr->mWriteFinished;
240 if ( !pipeline.isEmpty() )
241 return pipeline.last()->d_ptr->mWriteFinished;
245 void SessionPrivate::doStartNext()
247 if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
249 if ( canPipelineNext() ) {
251 pipeline.enqueue( nextJob );
257 if ( !pipeline.isEmpty() ) {
258 currentJob = pipeline.dequeue();
260 currentJob = queue.dequeue();
261 startJob( currentJob );
265 void SessionPrivate::startJob(
Job *job )
267 if ( protocolVersion < minimumProtocolVersion() ) {
269 job->setErrorText( i18n(
"Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
272 job->d_ptr->startQueued();
281 void SessionPrivate::jobDone(KJob * job)
285 if ( job == currentJob ) {
286 if ( pipeline.isEmpty() ) {
290 currentJob = pipeline.dequeue();
295 queue.removeAll( static_cast<Akonadi::Job*>( job ) );
297 pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
301 void SessionPrivate::jobWriteFinished(
Akonadi::Job* job )
303 Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
309 void SessionPrivate::jobDestroyed(QObject * job)
312 jobDone( static_cast<KJob*>( job ) );
318 QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
320 QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
332 socket->write( data );
334 kWarning() <<
"Trying to write while session is disconnected!" << kBacktrace();
347 foreach (
Job *job, queue )
348 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
354 SessionPrivate::SessionPrivate(
Session *parent )
355 : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
359 void SessionPrivate::init(
const QByteArray &
id )
362 parser =
new ImapParser();
364 if ( !
id.isEmpty() ) {
367 sessionId = QCoreApplication::instance()->applicationName().toUtf8()
368 +
'-' + QByteArray::number( qrand() );
388 socket->disconnect( mParent );
392 QMetaObject::invokeMethod( mParent,
"reconnect", Qt::QueuedConnection );
400 d->init( sessionId );
407 d->init( sessionId );
421 static QThreadStorage<Session*> instances;
425 Q_ASSERT_X( !sessionId.isEmpty(),
"SessionPrivate::createDefaultSession",
426 "You tried to create a default session with empty session id!" );
427 Q_ASSERT_X( !instances.hasLocalData(),
"SessionPrivate::createDefaultSession",
428 "You tried to create a default session twice!" );
430 instances.setLocalData(
new Session( sessionId ) );
435 instances.setLocalData( session );
440 if ( !instances.hasLocalData() )
441 instances.setLocalData(
new Session() );
442 return instances.localData();
447 foreach (
Job* job, d->queue )
448 job->kill( KJob::EmitResult );
450 foreach (
Job* job, d->pipeline ) {
451 job->d_ptr->mStarted =
false;
452 job->kill( KJob::EmitResult );
455 if ( d->currentJob ) {
456 d->currentJob->d_ptr->mStarted =
false;
457 d->currentJob->kill( KJob::EmitResult );
462 #include "moc_session.cpp"