• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.10.5 API Reference
  • KDE Home
  • Contact Us
 

akonadi

  • akonadi
session.cpp
1 /*
2  Copyright (c) 2007 Volker Krause <vkrause@kde.org>
3 
4  This library is free software; you can redistribute it and/or modify it
5  under the terms of the GNU Library General Public License as published by
6  the Free Software Foundation; either version 2 of the License, or (at your
7  option) any later version.
8 
9  This library is distributed in the hope that it will be useful, but WITHOUT
10  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
12  License for more details.
13 
14  You should have received a copy of the GNU Library General Public License
15  along with this library; see the file COPYING.LIB. If not, write to the
16  Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17  02110-1301, USA.
18 */
19 
20 #include "session.h"
21 #include "session_p.h"
22 
23 #include "imapparser_p.h"
24 #include "job.h"
25 #include "job_p.h"
26 #include "servermanager.h"
27 #include "servermanager_p.h"
28 #include "xdgbasedirs_p.h"
29 
30 #include <kdebug.h>
31 #include <klocale.h>
32 
33 #include <QCoreApplication>
34 #include <QtCore/QDir>
35 #include <QtCore/QQueue>
36 #include <QtCore/QThreadStorage>
37 #include <QtCore/QTimer>
38 #include <QtCore/QThread>
39 #include <QSettings>
40 
41 #include <QtNetwork/QLocalSocket>
42 #include <QtNetwork/QTcpSocket>
43 #include <QtNetwork/QHostAddress>
44 
45 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
46 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
47 // sends responses for the next one to the already finished one
48 #define PIPELINE_LENGTH 0
49 //#define PIPELINE_LENGTH 2
50 
51 using namespace Akonadi;
52 
53 
54 //@cond PRIVATE
55 
56 void SessionPrivate::startNext()
57 {
58  QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
59 }
60 
61 void SessionPrivate::reconnect()
62 {
63  QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
64  if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
65  || localSocket->state() == QLocalSocket::ConnectingState ) ) {
66  // nothing to do, we are still/already connected
67  return;
68  }
69 
70  QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
71  if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
72  || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
73  // same here, but for TCP
74  return;
75  }
76 
77  // try to figure out where to connect to
78  QString serverAddress;
79  quint16 port = 0;
80  bool useTcp = false;
81 
82  // env var has precedence
83  const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
84  if ( !serverAddressEnvVar.isEmpty() ) {
85  const int pos = serverAddressEnvVar.indexOf( ':' );
86  const QByteArray protocol = serverAddressEnvVar.left( pos );
87  QMap<QString, QString> options;
88  foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
89  const QStringList pair = entry.split( QLatin1Char('=') );
90  if ( pair.size() != 2 )
91  continue;
92  options.insert( pair.first(), pair.last() );
93  }
94  kDebug() << protocol << options;
95 
96  if ( protocol == "tcp" ) {
97  serverAddress = options.value( QLatin1String( "host" ) );
98  port = options.value( QLatin1String( "port" ) ).toUInt();
99  useTcp = true;
100  } else if ( protocol == "unix" ) {
101  serverAddress = options.value( QLatin1String( "path" ) );
102  } else if ( protocol == "pipe" ) {
103  serverAddress = options.value( QLatin1String( "name" ) );
104  }
105  }
106 
107  // try config file next, fall back to defaults if that fails as well
108  if ( serverAddress.isEmpty() ) {
109  const QString connectionConfigFile = connectionFile();
110  const QFileInfo fileInfo( connectionConfigFile );
111  if ( !fileInfo.exists() ) {
112  kDebug() << "Akonadi Client Session: connection config file '"
113  "akonadi/akonadiconnectionrc' can not be found in"
114  << XdgBaseDirs::homePath( "config" ) << "nor in any of"
115  << XdgBaseDirs::systemPathList( "config" );
116  }
117  const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
118 
119 #ifdef Q_OS_WIN //krazy:exclude=cpp
120  serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
121 #else
122  const QString defaultSocketDir = Internal::xdgSaveDir( "data" );
123  serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), QString(defaultSocketDir + QLatin1String( "/akonadiserver.socket" )) ).toString();
124 #endif
125  }
126 #ifdef Q_OS_WINCE
127  useTcp = true;
128 #endif
129 
130  // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
131  // but that's probably not something we need to support anyway
132  if ( !socket ) {
133  if ( !useTcp ) {
134  socket = localSocket = new QLocalSocket( mParent );
135  mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
136  } else {
137  socket = tcpSocket = new QTcpSocket( mParent );
138  mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
139  }
140  mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
141  mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
142  }
143 
144  // actually do connect
145  kDebug() << "connectToServer" << serverAddress;
146 #ifdef Q_OS_WINCE
147  tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
148 #else
149  if ( !useTcp ) {
150  localSocket->connectToServer( serverAddress );
151  } else {
152  tcpSocket->connectToHost( serverAddress, port );
153  }
154 #endif
155 
156  emit mParent->reconnected();
157 }
158 
159 QString SessionPrivate::connectionFile()
160 {
161  return Internal::xdgSaveDir( "config" ) + QLatin1String("/akonadiconnectionrc");
162 }
163 
164 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
165 {
166  Q_ASSERT( mParent->sender() == socket );
167  kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
168  socketDisconnected();
169 }
170 
171 void SessionPrivate::socketError( QAbstractSocket::SocketError )
172 {
173  Q_ASSERT( mParent->sender() == socket );
174  kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
175  socketDisconnected();
176 }
177 
178 void SessionPrivate::socketDisconnected()
179 {
180  if ( currentJob )
181  currentJob->d_ptr->lostConnection();
182  connected = false;
183 }
184 
185 void SessionPrivate::dataReceived()
186 {
187  while ( socket->bytesAvailable() > 0 ) {
188  if ( parser->continuationSize() > 1 ) {
189  const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
190  parser->parseBlock( data );
191  } else if ( socket->canReadLine() ) {
192  if ( !parser->parseNextLine( socket->readLine() ) )
193  continue; // response not yet completed
194 
195  // handle login response
196  if ( parser->tag() == QByteArray( "0" ) ) {
197  if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings
198  connected = true;
199  startNext();
200  } else {
201  kWarning() << "Unable to login to Akonadi server:" << parser->data();
202  socket->close();
203  QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
204  }
205  }
206 
207  // send login command
208  if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
209  const int pos = parser->data().indexOf( "[PROTOCOL" );
210  if ( pos > 0 ) {
211  qint64 tmp = 0;
212  ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
213  protocolVersion = tmp;
214  Internal::setServerProtocolVersion( tmp );
215  }
216  kDebug() << "Server protocol version is:" << protocolVersion;
217 
218  writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
219 
220  // work for the current job
221  } else {
222  if ( currentJob )
223  currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
224  }
225 
226  // reset parser stuff
227  parser->reset();
228  } else {
229  break; // nothing we can do for now
230  }
231  }
232 }
233 
234 bool SessionPrivate::canPipelineNext()
235 {
236  if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
237  return false;
238  if ( pipeline.isEmpty() && currentJob )
239  return currentJob->d_ptr->mWriteFinished;
240  if ( !pipeline.isEmpty() )
241  return pipeline.last()->d_ptr->mWriteFinished;
242  return false;
243 }
244 
245 void SessionPrivate::doStartNext()
246 {
247  if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
248  return;
249  if ( canPipelineNext() ) {
250  Akonadi::Job *nextJob = queue.dequeue();
251  pipeline.enqueue( nextJob );
252  startJob( nextJob );
253  }
254  if ( jobRunning )
255  return;
256  jobRunning = true;
257  if ( !pipeline.isEmpty() ) {
258  currentJob = pipeline.dequeue();
259  } else {
260  currentJob = queue.dequeue();
261  startJob( currentJob );
262  }
263 }
264 
265 void SessionPrivate::startJob( Job *job )
266 {
267  if ( protocolVersion < minimumProtocolVersion() ) {
268  job->setError( Job::ProtocolVersionMismatch );
269  job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
270  job->emitResult();
271  } else {
272  job->d_ptr->startQueued();
273  }
274 }
275 
276 void SessionPrivate::endJob( Job *job )
277 {
278  job->emitResult();
279 }
280 
281 void SessionPrivate::jobDone(KJob * job)
282 {
283  // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
284  // so don't call any methods on job itself
285  if ( job == currentJob ) {
286  if ( pipeline.isEmpty() ) {
287  jobRunning = false;
288  currentJob = 0;
289  } else {
290  currentJob = pipeline.dequeue();
291  }
292  startNext();
293  } else {
294  // non-current job finished, likely canceled while still in the queue
295  queue.removeAll( static_cast<Akonadi::Job*>( job ) );
296  // ### likely not enough to really cancel already running jobs
297  pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
298  }
299 }
300 
301 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
302 {
303  Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
304  Q_UNUSED( job );
305 
306  startNext();
307 }
308 
309 void SessionPrivate::jobDestroyed(QObject * job)
310 {
311  // careful, accessing non-QObject methods of job will fail here already
312  jobDone( static_cast<KJob*>( job ) );
313 }
314 
315 void SessionPrivate::addJob(Job * job)
316 {
317  queue.append( job );
318  QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
319  QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
320  QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
321  startNext();
322 }
323 
324 int SessionPrivate::nextTag()
325 {
326  return theNextTag++;
327 }
328 
329 void SessionPrivate::writeData(const QByteArray & data)
330 {
331  if ( socket )
332  socket->write( data );
333  else
334  kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
335 }
336 
337 void SessionPrivate::serverStateChanged( ServerManager::State state )
338 {
339  if ( state == ServerManager::Running && !connected )
340  reconnect();
341 }
342 
343 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
344 {
345  // only deal with the queue, for the guys in the pipeline it's too late already anyway
346  // and they shouldn't have gotten there if they depend on a preceding job anyway.
347  foreach ( Job *job, queue )
348  job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
349 }
350 
351 //@endcond
352 
353 
354 SessionPrivate::SessionPrivate( Session *parent )
355  : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
356 {
357 }
358 
359 void SessionPrivate::init( const QByteArray &id )
360 {
361  kDebug() << id;
362  parser = new ImapParser();
363 
364  if ( !id.isEmpty() ) {
365  sessionId = id;
366  } else {
367  sessionId = QCoreApplication::instance()->applicationName().toUtf8()
368  + '-' + QByteArray::number( qrand() );
369  }
370 
371  connected = false;
372  theNextTag = 1;
373  jobRunning = false;
374 
375  if ( ServerManager::state() == ServerManager::NotRunning )
376  ServerManager::start();
377  mParent->connect( ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
378  SLOT(serverStateChanged(Akonadi::ServerManager::State)) );
379 
380  reconnect();
381 }
382 
383 void SessionPrivate::forceReconnect()
384 {
385  jobRunning = false;
386  connected = false;
387  if ( socket ) {
388  socket->disconnect( mParent ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
389  delete socket;
390  }
391  socket = 0;
392  QMetaObject::invokeMethod( mParent, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor
393 }
394 
395 
396 Session::Session(const QByteArray & sessionId, QObject * parent) :
397  QObject( parent ),
398  d( new SessionPrivate( this ) )
399 {
400  d->init( sessionId );
401 }
402 
403 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
404  : QObject( parent ),
405  d( dd )
406 {
407  d->init( sessionId );
408 }
409 
410 Session::~Session()
411 {
412  clear();
413  delete d;
414 }
415 
416 QByteArray Session::sessionId() const
417 {
418  return d->sessionId;
419 }
420 
421 static QThreadStorage<Session*> instances;
422 
423 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
424 {
425  Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
426  "You tried to create a default session with empty session id!" );
427  Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
428  "You tried to create a default session twice!" );
429 
430  instances.setLocalData( new Session( sessionId ) );
431 }
432 
433 void SessionPrivate::setDefaultSession( Session *session )
434 {
435  instances.setLocalData( session );
436 }
437 
438 Session* Session::defaultSession()
439 {
440  if ( !instances.hasLocalData() )
441  instances.setLocalData( new Session() );
442  return instances.localData();
443 }
444 
445 void Session::clear()
446 {
447  foreach ( Job* job, d->queue )
448  job->kill( KJob::EmitResult ); // safe, not started yet
449  d->queue.clear();
450  foreach ( Job* job, d->pipeline ) {
451  job->d_ptr->mStarted = false; // avoid killing/reconnect loops
452  job->kill( KJob::EmitResult );
453  }
454  d->pipeline.clear();
455  if ( d->currentJob ) {
456  d->currentJob->d_ptr->mStarted = false; // avoid killing/reconnect loops
457  d->currentJob->kill( KJob::EmitResult );
458  }
459  d->forceReconnect();
460 }
461 
462 #include "moc_session.cpp"
This file is part of the KDE documentation.
Documentation copyright © 1996-2013 The KDE developers.
Generated on Sat Jul 13 2013 01:27:41 by doxygen 1.8.3.1 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.10.5 API Reference

Skip menu "kdepimlibs-4.10.5 API Reference"
  • akonadi
  •   contact
  •   kmime
  •   socialutils
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal