• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.13.3 API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • akonadi
session.cpp
1 /*
2  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "session.h"
21 #include "session_p.h"
22 
23 #include "imapparser_p.h"
24 #include "job.h"
25 #include "job_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "protocolhelper_p.h"
29 #include "xdgbasedirs_p.h"
30 
31 #include <kdebug.h>
32 #include <klocalizedstring.h>
33 
34 #include <QCoreApplication>
35 #include <QtCore/QDir>
36 #include <QtCore/QQueue>
37 #include <QtCore/QThreadStorage>
38 #include <QtCore/QTimer>
39 #include <QtCore/QThread>
40 #include <QSettings>
41 
42 #include <QtNetwork/QLocalSocket>
43 #include <QtNetwork/QTcpSocket>
44 #include <QtNetwork/QHostAddress>
45 #include <QApplication>
46 
47 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
48 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
49 // sends responses for the next one to the already finished one
50 #define PIPELINE_LENGTH 0
51 //#define PIPELINE_LENGTH 2
52 
53 using namespace Akonadi;
54 
55 //@cond PRIVATE
56 
57 static const QList<QByteArray> sCapabilities = QList<QByteArray>()
58  << "NOTIFY 3"
59  << "NOPAYLOADPATH"
60  << "AKAPPENDSTREAMING"
61  << "SERVERSEARCH";
62 
63 void SessionPrivate::startNext()
64 {
65  QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
66 }
67 
68 void SessionPrivate::reconnect()
69 {
70  QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
71  if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
72  || localSocket->state() == QLocalSocket::ConnectingState ) ) {
73  // nothing to do, we are still/already connected
74  return;
75  }
76 
77  QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
78  if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
79  || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
80  // same here, but for TCP
81  return;
82  }
83 
84  // try to figure out where to connect to
85  QString serverAddress;
86  quint16 port = 0;
87  bool useTcp = false;
88 
89  // env var has precedence
90  const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
91  if ( !serverAddressEnvVar.isEmpty() ) {
92  const int pos = serverAddressEnvVar.indexOf( ':' );
93  const QByteArray protocol = serverAddressEnvVar.left( pos );
94  QMap<QString, QString> options;
95  foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
96  const QStringList pair = entry.split( QLatin1Char('=') );
97  if ( pair.size() != 2 )
98  continue;
99  options.insert( pair.first(), pair.last() );
100  }
101  kDebug() << protocol << options;
102 
103  if ( protocol == "tcp" ) {
104  serverAddress = options.value( QLatin1String( "host" ) );
105  port = options.value( QLatin1String( "port" ) ).toUInt();
106  useTcp = true;
107  } else if ( protocol == "unix" ) {
108  serverAddress = options.value( QLatin1String( "path" ) );
109  } else if ( protocol == "pipe" ) {
110  serverAddress = options.value( QLatin1String( "name" ) );
111  }
112  }
113 
114  // try config file next, fall back to defaults if that fails as well
115  if ( serverAddress.isEmpty() ) {
116  const QString connectionConfigFile = connectionFile();
117  const QFileInfo fileInfo( connectionConfigFile );
118  if ( !fileInfo.exists() ) {
119  kDebug() << "Akonadi Client Session: connection config file '"
120  "akonadi/akonadiconnectionrc' can not be found in"
121  << XdgBaseDirs::homePath( "config" ) << "nor in any of"
122  << XdgBaseDirs::systemPathList( "config" );
123  }
124  const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
125 
126 #ifdef Q_OS_WIN //krazy:exclude=cpp
127  serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
128 #else
129  const QString defaultSocketDir = Internal::xdgSaveDir( "data" );
130  serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), QString(defaultSocketDir + QLatin1String( "/akonadiserver.socket" )) ).toString();
131 #endif
132  }
133 
134  // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
135  // but that's probably not something we need to support anyway
136  if ( !socket ) {
137  if ( !useTcp ) {
138  socket = localSocket = new QLocalSocket( mParent );
139  mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
140  } else {
141  socket = tcpSocket = new QTcpSocket( mParent );
142  mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
143  }
144  mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
145  mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
146  }
147 
148  // actually do connect
149  kDebug() << "connectToServer" << serverAddress;
150  if ( !useTcp ) {
151  localSocket->connectToServer( serverAddress );
152  } else {
153  tcpSocket->connectToHost( serverAddress, port );
154  }
155 
156  emit mParent->reconnected();
157 }
158 
159 QString SessionPrivate::connectionFile()
160 {
161  return Internal::xdgSaveDir( "config" ) + QLatin1String("/akonadiconnectionrc");
162 }
163 
164 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
165 {
166  Q_ASSERT( mParent->sender() == socket );
167  kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
168  socketDisconnected();
169 }
170 
171 void SessionPrivate::socketError( QAbstractSocket::SocketError )
172 {
173  Q_ASSERT( mParent->sender() == socket );
174  kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
175  socketDisconnected();
176 }
177 
178 void SessionPrivate::socketDisconnected()
179 {
180  if ( currentJob )
181  currentJob->d_ptr->lostConnection();
182  connected = false;
183 }
184 
185 void SessionPrivate::dataReceived()
186 {
187  while ( socket->bytesAvailable() > 0 ) {
188  if ( parser->continuationSize() > 1 ) {
189  const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
190  parser->parseBlock( data );
191  } else if ( socket->canReadLine() ) {
192  if ( !parser->parseNextLine( socket->readLine() ) )
193  continue; // response not yet completed
194 
195  if ( logFile ) {
196  logFile->write( "S: " + parser->data() );
197  }
198 
199  // handle login response
200  if ( parser->tag() == QByteArray( "0" ) ) {
201  if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings
202  writeData("1 CAPABILITY (" + ImapParser::join( sCapabilities, " " ) + ")");
203  } else {
204  kWarning() << "Unable to login to Akonadi server:" << parser->data();
205  socket->close();
206  QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
207  }
208  }
209 
210  // handle capability response
211  if ( parser->tag() == QByteArray("1") ) {
212  if ( parser->data().startsWith("OK") ) {
213  connected = true;
214  startNext();
215  } else {
216  kDebug() << "Unhandled server capability response:" << parser->data();
217  }
218  }
219 
220  // send login command
221  if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
222  const int pos = parser->data().indexOf( "[PROTOCOL" );
223  if ( pos > 0 ) {
224  qint64 tmp = 0;
225  ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
226  protocolVersion = tmp;
227  Internal::setServerProtocolVersion( tmp );
228  }
229  kDebug() << "Server protocol version is:" << protocolVersion;
230 
231  writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
232 
233  // work for the current job
234  } else {
235  if ( currentJob )
236  currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
237  }
238 
239  // reset parser stuff
240  parser->reset();
241  } else {
242  break; // nothing we can do for now
243  }
244  }
245 }
246 
247 bool SessionPrivate::canPipelineNext()
248 {
249  if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
250  return false;
251  if ( pipeline.isEmpty() && currentJob )
252  return currentJob->d_ptr->mWriteFinished;
253  if ( !pipeline.isEmpty() )
254  return pipeline.last()->d_ptr->mWriteFinished;
255  return false;
256 }
257 
258 void SessionPrivate::doStartNext()
259 {
260  if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
261  return;
262  if ( canPipelineNext() ) {
263  Akonadi::Job *nextJob = queue.dequeue();
264  pipeline.enqueue( nextJob );
265  startJob( nextJob );
266  }
267  if ( jobRunning )
268  return;
269  jobRunning = true;
270  if ( !pipeline.isEmpty() ) {
271  currentJob = pipeline.dequeue();
272  } else {
273  currentJob = queue.dequeue();
274  startJob( currentJob );
275  }
276 }
277 
278 void SessionPrivate::startJob( Job *job )
279 {
280  if ( protocolVersion < minimumProtocolVersion() ) {
281  job->setError( Job::ProtocolVersionMismatch );
282  job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
283  job->emitResult();
284  } else {
285  job->d_ptr->startQueued();
286  }
287 }
288 
289 void SessionPrivate::endJob( Job *job )
290 {
291  job->emitResult();
292 }
293 
294 void SessionPrivate::jobDone(KJob * job)
295 {
296  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
297  // so don't call any methods on job itself
298  if ( job == currentJob ) {
299  if ( pipeline.isEmpty() ) {
300  jobRunning = false;
301  currentJob = 0;
302  } else {
303  currentJob = pipeline.dequeue();
304  }
305  startNext();
306  } else {
307  // non-current job finished, likely canceled while still in the queue
308  queue.removeAll( static_cast<Akonadi::Job*>( job ) );
309  // ### likely not enough to really cancel already running jobs
310  pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
311  }
312 }
313 
314 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
315 {
316  Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
317  Q_UNUSED( job );
318 
319  startNext();
320 }
321 
322 void SessionPrivate::jobDestroyed(QObject * job)
323 {
324  // careful, accessing non-QObject methods of job will fail here already
325  jobDone( static_cast<KJob*>( job ) );
326 }
327 
328 void SessionPrivate::addJob(Job * job)
329 {
330  queue.append( job );
331  QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
332  QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
333  QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
334  startNext();
335 }
336 
337 int SessionPrivate::nextTag()
338 {
339  return theNextTag++;
340 }
341 
342 void SessionPrivate::writeData(const QByteArray & data)
343 {
344  if ( logFile ) {
345  logFile->write( "C: " + data );
346  if ( !data.endsWith( '\n' ) ) {
347  logFile->write( "\n" );
348  }
349  logFile->flush();
350  }
351 
352  if ( socket ) {
353  socket->write( data );
354  } else {
355  kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
356  }
357 }
358 
359 void SessionPrivate::serverStateChanged( ServerManager::State state )
360 {
361  if ( state == ServerManager::Running && !connected ) {
362  reconnect();
363  } else if ( !connected && state == ServerManager::Broken ) {
364  // If the server is broken, cancel all pending jobs, otherwise they will be
365  // blocked forever and applications waiting for them to finish would be stuck
366  Q_FOREACH ( Job *job, queue ) {
367  job->setError( Job::ConnectionFailed );
368  job->kill( KJob::EmitResult );
369  }
370  }
371 }
372 
373 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
374 {
375  // only deal with the queue, for the guys in the pipeline it's too late already anyway
376  // and they shouldn't have gotten there if they depend on a preceding job anyway.
377  foreach ( Job *job, queue )
378  job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
379 }
380 
381 //@endcond
382 
383 SessionPrivate::SessionPrivate( Session *parent )
384  : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 ), logFile( 0 )
385 {
386 }
387 
388 void SessionPrivate::init( const QByteArray &id )
389 {
390  kDebug() << id;
391  parser = new ImapParser();
392 
393  if ( !id.isEmpty() ) {
394  sessionId = id;
395  } else {
396  sessionId = QCoreApplication::instance()->applicationName().toUtf8()
397  + '-' + QByteArray::number( qrand() );
398  }
399 
400  connected = false;
401  theNextTag = 2;
402  jobRunning = false;
403 
404  if ( ServerManager::state() == ServerManager::NotRunning )
405  ServerManager::start();
406  mParent->connect( ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
407  SLOT(serverStateChanged(Akonadi::ServerManager::State)) );
408 
409  const QByteArray sessionLogFile = qgetenv( "AKONADI_SESSION_LOGFILE" );
410  if ( !sessionLogFile.isEmpty() ) {
411  logFile = new QFile( QString::fromLatin1( "%1.%2.%3" ).arg( QString::fromLatin1( sessionLogFile ) )
412  .arg( QString::number( QApplication::applicationPid() ) )
413  .arg( QString::fromLatin1( sessionId ) ),
414  mParent );
415  if ( !logFile->open( QIODevice::WriteOnly | QIODevice::Truncate ) ) {
416  kWarning() << "Failed to open Akonadi Session log file" << logFile->fileName();
417  delete logFile;
418  logFile = 0;
419  }
420  }
421 
422  reconnect();
423 }
424 
425 void SessionPrivate::forceReconnect()
426 {
427  jobRunning = false;
428  connected = false;
429  if ( socket ) {
430  socket->disconnect( mParent ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
431  delete socket;
432  }
433  socket = 0;
434  QMetaObject::invokeMethod( mParent, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor
435 }
436 
437 Session::Session(const QByteArray & sessionId, QObject * parent) :
438  QObject( parent ),
439  d( new SessionPrivate( this ) )
440 {
441  d->init( sessionId );
442 }
443 
444 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
445  : QObject( parent ),
446  d( dd )
447 {
448  d->init( sessionId );
449 }
450 
451 Session::~Session()
452 {
453  clear();
454  delete d;
455 }
456 
457 QByteArray Session::sessionId() const
458 {
459  return d->sessionId;
460 }
461 
462 static QThreadStorage<Session*> instances;
463 
464 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
465 {
466  Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
467  "You tried to create a default session with empty session id!" );
468  Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
469  "You tried to create a default session twice!" );
470 
471  instances.setLocalData( new Session( sessionId ) );
472 }
473 
474 void SessionPrivate::setDefaultSession( Session *session )
475 {
476  instances.setLocalData( session );
477 }
478 
479 Session* Session::defaultSession()
480 {
481  if ( !instances.hasLocalData() )
482  instances.setLocalData( new Session() );
483  return instances.localData();
484 }
485 
486 void Session::clear()
487 {
488  foreach ( Job* job, d->queue )
489  job->kill( KJob::EmitResult ); // safe, not started yet
490  d->queue.clear();
491  foreach ( Job* job, d->pipeline ) {
492  job->d_ptr->mStarted = false; // avoid killing/reconnect loops
493  job->kill( KJob::EmitResult );
494  }
495  d->pipeline.clear();
496  if ( d->currentJob ) {
497  d->currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
498  d->currentJob->kill( KJob::EmitResult );
499  }
500  d->forceReconnect();
501 }
502 
503 #include "moc_session.cpp"
Akonadi::Job::ProtocolVersionMismatch
The server protocol version is too old or too new.
Definition: job.h:107
Akonadi::SessionPrivate::forceReconnect
void forceReconnect()
Disconnects a previously existing connection and tries to reconnect.
Definition: session.cpp:425
Akonadi::SessionPrivate::nextTag
int nextTag()
Returns the next IMAP tag.
Akonadi::ServerManager::self
static ServerManager * self()
Returns the singleton instance of this class, for connecting to its signals.
Definition: servermanager.cpp:161
Akonadi::SessionPrivate::createDefaultSession
static void createDefaultSession(const QByteArray &sessionId)
Creates a new default session for this thread with the given sessionId.
Definition: session.cpp:464
Akonadi::Job
Base class for all actions in the Akonadi storage.
Definition: job.h:86
Akonadi::Job::ConnectionFailed
The connection to the Akonadi server failed.
Definition: job.h:106
Akonadi::SessionPrivate::setDefaultSession
static void setDefaultSession(Session *session)
Sets the default session.
Definition: session.cpp:474
Akonadi::Session::defaultSession
static Session * defaultSession()
Returns the default session for this thread.
Definition: session.cpp:479
Akonadi::Session::~Session
~Session()
Destroys the session.
Definition: session.cpp:451
Akonadi::Session::sessionId
QByteArray sessionId() const
Returns the session identifier.
Definition: session.cpp:457
Akonadi::Session::clear
void clear()
Stops all jobs queued for execution.
Definition: session.cpp:486
Akonadi::ServerManager::state
static State state()
Returns the state of the server.
Definition: servermanager.cpp:225
Akonadi::Session
A communication session with the Akonadi storage.
Definition: session.h:59
Akonadi::SessionPrivate
Definition: session_p.h:42
Akonadi::ServerManager::NotRunning
Server is not running, could be no one started it yet or it failed to start.
Definition: servermanager.h:51
Akonadi::ServerManager::start
static bool start()
Starts the server.
Definition: servermanager.cpp:166
Akonadi::SessionPrivate::endJob
void endJob(Job *job)
Akonadi::SessionPrivate::addJob
virtual void addJob(Job *job)
Associates the given Job object with this session.
Akonadi::Session::reconnected
void reconnected()
This signal is emitted whenever the session has been reconnected to the server (e.g.
Akonadi::SessionPrivate::reconnect
virtual void reconnect()
Attemps to establish a connections to the Akonadi server.
Akonadi::SessionPrivate::itemRevisionChanged
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to following jobs.
Akonadi::ServerManager::Running
Server is running and operational.
Definition: servermanager.h:53
Akonadi::SessionPrivate::writeData
void writeData(const QByteArray &data)
Sends the given raw data.
Akonadi::SessionPrivate::connectionFile
static QString connectionFile()
Default location for akonadiconnectionrc.
Akonadi::ServerManager::State
State
Enum for the various states the server can be in.
Definition: servermanager.h:50
Akonadi::Session::Session
Session(const QByteArray &sessionId=QByteArray(), QObject *parent=0)
Creates a new session.
Definition: session.cpp:437
Akonadi::ServerManager::Broken
Server is not operational and an error has been detected.
Definition: servermanager.h:55
This file is part of the KDE documentation.
Documentation copyright © 1996-2014 The KDE developers.
Generated on Mon Jul 21 2014 08:03:55 by doxygen 1.8.6 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.13.3 API Reference

Skip menu "kdepimlibs-4.13.3 API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal