36 #include <QtCore/QHash>
37 #include <QtGui/QWidget>
38 #include <QtDBus/QtDBus>
46 #ifndef KDE_USE_FINAL // already defined in job.cpp
74 Q_ASSERT(newPriority >= -10 && newPriority <= 10);
75 newPriority = qBound(-10, newPriority, 10);
76 int unbiasedSerial = oldSerial % m_jobsPerPriority;
77 return unbiasedSerial + newPriority * m_jobsPerPriority;
83 m_grimTimer.setSingleShot(
true);
84 connect (&m_grimTimer, SIGNAL(
timeout()), SLOT(grimReaper()));
91 m_idleSlaves.insert(slave->
host(), slave);
104 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.find(url.host());
105 if (it == m_idleSlaves.end()) {
106 it = m_idleSlaves.begin();
108 if (it == m_idleSlaves.end()) {
112 m_idleSlaves.erase(it);
119 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.begin();
120 for (; it != m_idleSlaves.end(); ++it) {
121 if (it.value() == slave) {
122 m_idleSlaves.erase(it);
131 return m_idleSlaves.values();
134 void SlaveKeeper::scheduleGrimReaper()
136 if (!m_grimTimer.isActive()) {
142 void SlaveKeeper::grimReaper()
144 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.begin();
145 while (it != m_idleSlaves.end()) {
146 Slave *slave = it.value();
148 it = m_idleSlaves.erase(it);
150 kDebug (7006) <<
"Idle slave" << slave <<
"still has job" << slave->
job();
159 if (!m_idleSlaves.isEmpty()) {
160 scheduleGrimReaper();
168 if (first != m_queuedJobs.constEnd()) {
177 Q_ASSERT(serial != 0);
178 Q_ASSERT(!m_queuedJobs.contains(serial));
179 Q_ASSERT(!m_runningJobs.contains(job));
180 m_queuedJobs.insert(serial, job);
185 Q_ASSERT(!m_queuedJobs.isEmpty());
188 m_queuedJobs.erase(first);
189 m_runningJobs.insert(job);
196 if (m_runningJobs.remove(job)) {
197 Q_ASSERT(!m_queuedJobs.contains(serial));
200 if (m_queuedJobs.remove(serial)) {
209 Q_FOREACH (
SimpleJob *job, m_runningJobs) {
221 m_startJobsTimer.setSingleShot(
true);
222 connect (&m_startJobsTimer, SIGNAL(
timeout()), SLOT(startRunnableJobs()));
227 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave);
228 if (it == m_connectedSlaves.end()) {
237 m_runnableSlaves.insert(slave);
238 m_startJobsTimer.start();
247 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave);
248 if (it == m_connectedSlaves.end()) {
255 Q_ASSERT(!m_runnableSlaves.contains(slave));
258 const bool removedRunning = jobs.
runningJob == job;
259 const bool removedWaiting = jobs.
waitingList.removeAll(job) != 0;
260 if (removedRunning) {
262 Q_ASSERT(!removedWaiting);
264 const bool removedTheJob = removedRunning || removedWaiting;
268 return removedTheJob;
272 m_runnableSlaves.insert(slave);
273 m_startJobsTimer.start();
275 if (removedWaiting && jobs.
waitingList.isEmpty()) {
276 m_runnableSlaves.remove(slave);
278 return removedTheJob;
284 if (!m_connectedSlaves.contains(slave)) {
291 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave);
292 if (it == m_connectedSlaves.end()) {
303 m_connectedSlaves.erase(it);
304 m_runnableSlaves.remove(slave);
313 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave);
314 if (it == m_connectedSlaves.end()) {
317 return it.value().runningJob == 0;
322 void ConnectedSlaveQueue::startRunnableJobs()
324 QSet<Slave *>::Iterator it = m_runnableSlaves.begin();
325 while (it != m_runnableSlaves.end()) {
329 m_startJobsTimer.start();
333 it = m_runnableSlaves.erase(it);
341 const int port = url.port() == -1 ? 0 : url.port();
343 if (slave->
host() ==
"<reset>") {
351 Q_ASSERT(slave->
host() == url.host());
352 Q_ASSERT(slave->
port() == port);
360 Q_UNUSED(queuesBySerial);
361 #ifdef SCHEDULER_DEBUG
363 QSet<HostQueue *> seen;
364 Q_FOREACH (
HostQueue *hq, *queuesBySerial) {
365 Q_ASSERT(!seen.contains(hq));
374 Q_UNUSED(runningJobsCount);
375 #ifdef SCHEDULER_DEBUG
376 int realRunningJobsCount = 0;
377 Q_FOREACH (
const HostQueue &hq, *queues) {
380 Q_ASSERT(realRunningJobsCount == runningJobsCount);
383 QSet<SimpleJob *> seenJobs;
384 Q_FOREACH (
const HostQueue &hq, *queues) {
385 Q_FOREACH (
SimpleJob *job, hq.runningJobs()) {
386 Q_ASSERT(!seenJobs.contains(job));
387 seenJobs.insert(job);
395 : m_schedPrivate(sp),
396 m_maxConnectionsPerHost(maxSlavesPerHost ? maxSlavesPerHost : maxSlaves),
397 m_maxConnectionsTotal(qMax(maxSlaves, maxSlavesPerHost)),
398 m_runningJobsCount(0)
401 kDebug(7006) <<
"m_maxConnectionsTotal:" << m_maxConnectionsTotal
402 <<
"m_maxConnectionsPerHost:" << m_maxConnectionsPerHost;
403 Q_ASSERT(m_maxConnectionsPerHost >= 1);
404 Q_ASSERT(maxSlaves >= maxSlavesPerHost);
405 m_startJobTimer.setSingleShot(
true);
406 connect (&m_startJobTimer, SIGNAL(
timeout()), SLOT(startAJob()));
421 HostQueue &hq = m_queuesByHostname[hostname];
437 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
438 Q_UNUSED(wasQueueEmpty);
439 Q_ASSERT(wasQueueEmpty);
443 #ifdef SCHEDULER_DEBUG
447 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial));
452 m_startJobTimer.start();
460 QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(jobPriv->
m_url.host());
461 if (it == m_queuesByHostname.end()) {
471 const bool needReinsert = hq.
lowestSerial() != prevLowestSerial;
474 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) {
494 if (m_queuesBySerial.remove(prevLowestSerial) == 0) {
502 m_runningJobsCount--;
503 Q_ASSERT(m_runningJobsCount >= 0);
513 m_queuesByHostname.remove(jobPriv->
m_url.host());
520 m_startJobTimer.start();
540 if (job && job->
ui()) {
546 SLOT(slotSlaveStatus(pid_t,QByteArray,
QString,
bool)));
548 kError() <<
"couldn't create slave:" << errortext;
559 const bool removedUnconnected = m_slaveKeeper.
removeSlave(slave);
560 Q_ASSERT(!(removedConnected && removedUnconnected));
561 return removedConnected || removedUnconnected;
566 QList<Slave *> ret(m_slaveKeeper.
allSlaves());
567 Q_FOREACH (
const HostQueue &hq, m_queuesByHostname) {
575 void ProtoQueue::startAJob()
580 #ifdef SCHEDULER_DEBUG
581 kDebug(7006) <<
"m_runningJobsCount:" << m_runningJobsCount;
582 Q_FOREACH (
const HostQueue &hq, m_queuesByHostname) {
583 Q_FOREACH (
SimpleJob *job, hq.runningJobs()) {
588 if (m_runningJobsCount >= m_maxConnectionsTotal) {
589 #ifdef SCHEDULER_DEBUG
590 kDebug(7006) <<
"not starting any jobs because maxConnectionsTotal has been reached.";
596 if (first != m_queuesBySerial.end()) {
599 const int prevLowestSerial = first.key();
600 Q_UNUSED(prevLowestSerial);
609 m_queuesBySerial.erase(first);
621 m_runningJobsCount++;
623 bool isNewSlave =
false;
646 #ifdef SCHEDULER_DEBUG
647 kDebug(7006) <<
"not starting any jobs because there is no queued job.";
651 if (!m_queuesBySerial.isEmpty()) {
652 m_startJobTimer.start();
658 class KIO::SchedulerPrivate
665 m_ignoreConfigReparse(false)
682 Slave *m_slaveOnHold;
685 bool m_ignoreConfigReparse;
691 #ifndef KDE_NO_DEPRECATED
694 void setJobPriority(
SimpleJob *job,
int priority);
698 void removeSlaveOnHold();
702 void checkSlaveOnHold(
bool b);
703 void publishSlaveOnHold();
705 bool isSlaveOnHoldFor(
const KUrl& url);
706 void registerWindow(
QWidget *wid);
707 void updateInternalMetaData(
SimpleJob* job);
714 void slotSlaveStatus(pid_t pid,
const QByteArray &protocol,
715 const QString &host,
bool connected);
717 void slotReparseSlaveConfiguration(
const QString &,
const QDBusMessage&);
718 void slotSlaveOnHoldListChanged();
720 void slotSlaveConnected();
721 void slotSlaveError(
int error,
const QString &errorMsg);
722 void slotUnregisterWindow(
QObject *);
726 ProtoQueue *pq = m_protocols.value(protocol, 0);
728 kDebug(7006) <<
"creating ProtoQueue instance for" << protocol;
731 int maxSlavesPerHost = -1;
732 if (!host.isEmpty()) {
736 maxSlavesPerHost = value;
738 if (maxSlavesPerHost == -1) {
742 pq =
new ProtoQueue(
this, maxSlaves, qMin(maxSlaves, maxSlavesPerHost));
743 m_protocols.insert(protocol, pq);
748 QHash<QString, ProtoQueue *> m_protocols;
756 return schedulerPrivate->q;
759 SchedulerPrivate *Scheduler::d_func()
761 return schedulerPrivate;
767 return schedulerPrivate->q;
773 return schedulerPrivate->heldSlaveForJob(job);
777 Scheduler::Scheduler()
780 setObjectName(
"scheduler" );
782 const QString dbusPath =
"/KIO/Scheduler";
783 const QString dbusInterface =
"org.kde.KIO.Scheduler";
784 QDBusConnection dbus = QDBusConnection::sessionBus();
785 dbus.registerObject(
"/KIO/Scheduler",
this, QDBusConnection::ExportScriptableSlots |
786 QDBusConnection::ExportScriptableSignals );
787 dbus.connect(
QString(), dbusPath, dbusInterface,
"reparseSlaveConfiguration",
788 this, SLOT(slotReparseSlaveConfiguration(
QString,QDBusMessage)));
789 dbus.connect(
QString(), dbusPath, dbusInterface,
"slaveOnHoldListChanged",
790 this, SLOT(slotSlaveOnHoldListChanged()));
793 Scheduler::~Scheduler()
799 schedulerPrivate->doJob(job);
802 #ifndef KDE_NO_DEPRECATED
805 schedulerPrivate->scheduleJob(job);
811 schedulerPrivate->setJobPriority(job, priority);
816 schedulerPrivate->cancelJob(job);
821 schedulerPrivate->jobFinished(job, slave);
826 schedulerPrivate->putSlaveOnHold(job, url);
831 schedulerPrivate->removeSlaveOnHold();
836 schedulerPrivate->publishSlaveOnHold();
841 return schedulerPrivate->isSlaveOnHoldFor(url);
846 schedulerPrivate->updateInternalMetaData(job);
852 return schedulerPrivate->getConnectedSlave(url, config);
857 return schedulerPrivate->assignJobToSlave(slave, job);
862 return schedulerPrivate->disconnectSlave(slave);
867 schedulerPrivate->registerWindow(wid);
872 schedulerPrivate->slotUnregisterWindow(wid);
882 const QObject* receiver,
const char* member )
888 const QObject* receiver,
const char* member )
901 schedulerPrivate->checkSlaveOnHold(b);
908 schedulerPrivate->slotReparseSlaveConfiguration(
QString(), QDBusMessage());
910 schedulerPrivate->m_ignoreConfigReparse =
true;
915 void SchedulerPrivate::slotReparseSlaveConfiguration(
const QString &proto,
const QDBusMessage&)
917 if (m_ignoreConfigReparse) {
918 kDebug(7006) <<
"Ignoring signal sent by myself";
919 m_ignoreConfigReparse =
false;
923 kDebug(7006) <<
"proto=" << proto;
929 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() :
930 m_protocols.constFind(proto);
932 if (it == m_protocols.constEnd()) {
935 QHash<QString, ProtoQueue *>::ConstIterator endIt = proto.isEmpty() ? m_protocols.constEnd() :
937 for (; it != endIt; ++it) {
938 Q_FOREACH(
Slave *slave, (*it)->allSlaves()) {
945 void SchedulerPrivate::slotSlaveOnHoldListChanged()
947 m_checkOnHold =
true;
958 if (cmd ==
CMD_SPECIAL && protocol.startsWith(QLatin1String(
"http"), Qt::CaseInsensitive))
964 void SchedulerPrivate::doJob(
SimpleJob *job)
967 if (QThread::currentThread() != QCoreApplication::instance()->thread()) {
968 kWarning(7006) <<
"KIO is not thread-safe.";
977 m_checkOnHold =
false;
984 #ifndef KDE_NO_DEPRECATED
985 void SchedulerPrivate::scheduleJob(
SimpleJob *job)
988 setJobPriority(job, 1);
992 void SchedulerPrivate::setJobPriority(
SimpleJob *job,
int priority)
994 kDebug(7006) << job << priority;
999 void SchedulerPrivate::cancelJob(
SimpleJob *job)
1008 kDebug(7006) << job << slave;
1010 kDebug(7006) <<
"Scheduler: killing slave " << slave->slave_pid();
1013 jobFinished(job, slave);
1018 kDebug(7006) << job << slave;
1019 if (QThread::currentThread() != QCoreApplication::instance()->thread()) {
1020 kWarning(7006) <<
"KIO is not thread-safe.";
1037 kDebug(7006) <<
"Updating ioslaves with new internal metadata information";
1038 ProtoQueue * queue = m_protocols.value(slave->protocol());
1040 QListIterator<Slave*> it (queue->
allSlaves());
1041 while (it.hasNext()) {
1042 Slave* runningSlave = it.next();
1043 if (slave->host() == runningSlave->
host()) {
1045 kDebug(7006) <<
"Updated configuration of" << slave->protocol()
1046 <<
"ioslave, pid=" << slave->slave_pid();
1052 slave->disconnect(job);
1065 schedulerPrivate->setupSlave(slave, url, protocol, proxyList, newSlave, config);
1070 const QString host = url.host();
1072 sessionData.configDataFor( configData, protocol, host );
1073 if (proxyList.isEmpty()) {
1074 configData.remove(QLatin1String(
"UseProxy"));
1075 configData.remove(QLatin1String(
"ProxyUrls"));
1077 configData[QLatin1String(
"UseProxy")] = proxyList.first();
1078 configData[QLatin1String(
"ProxyUrls")] = proxyList.join(QLatin1String(
","));
1081 if ( configData.contains(
"EnableAutoLogin") &&
1082 configData.value(
"EnableAutoLogin").compare(
"true", Qt::CaseInsensitive) == 0 )
1086 bool usern = (protocol ==
"ftp");
1089 configData[
"autoLoginUser"] = l.
login;
1090 configData[
"autoLoginPass"] = l.
password;
1095 for ( ; it != l.
macdef.constEnd(); ++it )
1096 macdef += it.key() +
'\\' + it.value().join(
"\\" ) +
'\n';
1097 configData[
"autoLoginMacro"] = macdef;
1108 int port = url.port();
1111 const QString host = url.host();
1115 if (newSlave || slave->
host() != host || slave->
port() != port ||
1116 slave->
user() != user || slave->
passwd() != passwd) {
1118 MetaData configData = metaDataFor(protocol, proxyList, url);
1120 configData += *config;
1124 slave->
setHost(host, port, user, passwd);
1129 void SchedulerPrivate::slotSlaveStatus(pid_t,
const QByteArray&,
const QString &,
bool)
1134 void SchedulerPrivate::slotSlaveDied(
KIO::Slave *slave)
1145 pq->removeSlave(slave);
1147 if (slave == m_slaveOnHold) {
1149 m_urlOnHold.clear();
1157 kDebug(7006) << job << url << slave;
1158 slave->disconnect(job);
1164 if (m_slaveOnHold) {
1165 m_slaveOnHold->
kill();
1167 m_slaveOnHold = slave;
1169 m_slaveOnHold->suspend();
1172 void SchedulerPrivate::publishSlaveOnHold()
1174 kDebug(7006) << m_slaveOnHold;
1178 m_slaveOnHold->hold(m_urlOnHold);
1179 emit q->slaveOnHoldListChanged();
1182 bool SchedulerPrivate::isSlaveOnHoldFor(
const KUrl& url)
1184 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold)
1199 if (!slave && m_slaveOnHold) {
1205 canJobReuse = ( canJobReuse || cmd ==
CMD_SPECIAL );
1208 const QString resume = outgoing.value(
"resume");
1209 kDebug(7006) <<
"Resume metadata is" << resume;
1210 canJobReuse = (resume.isEmpty() || resume ==
"0");
1214 if (job->
url() == m_urlOnHold) {
1216 kDebug(7006) <<
"HOLD: Reusing held slave (" << m_slaveOnHold <<
")";
1217 slave = m_slaveOnHold;
1219 kDebug(7006) <<
"HOLD: Discarding held slave (" << m_slaveOnHold <<
")";
1220 m_slaveOnHold->kill();
1223 m_urlOnHold.clear();
1226 kDebug(7006) <<
"HOLD: Reusing klauncher held slave (" << slave <<
")";
1231 if (slave && job->
ui()) {
1238 void SchedulerPrivate::removeSlaveOnHold()
1240 kDebug(7006) << m_slaveOnHold;
1241 if (m_slaveOnHold) {
1242 m_slaveOnHold->kill();
1245 m_urlOnHold.clear();
1252 ProtoQueue *pq = protoQ(protocol, url.host());
1256 setupSlave(slave, url, protocol, proxyList,
true, &config);
1260 q->connect(slave, SIGNAL(connected()),
1261 SLOT(slotSlaveConnected()));
1262 q->connect(slave, SIGNAL(error(
int,
QString)),
1263 SLOT(slotSlaveError(
int,
QString)));
1265 kDebug(7006) << url << slave;
1270 void SchedulerPrivate::slotSlaveConnected()
1273 Slave *slave =
static_cast<Slave *
>(q->sender());
1275 q->disconnect(slave, SIGNAL(connected()), q, SLOT(slotSlaveConnected()));
1276 emit q->slaveConnected(slave);
1279 void SchedulerPrivate::slotSlaveError(
int errorNr,
const QString &errorMsg)
1281 Slave *slave =
static_cast<Slave *
>(q->sender());
1282 kDebug(7006) << slave << errorNr << errorMsg;
1287 emit q->slaveError(slave, errorNr, errorMsg);
1293 kDebug(7006) << slave << job;
1303 bool SchedulerPrivate::disconnectSlave(
KIO::Slave *slave)
1310 void SchedulerPrivate::checkSlaveOnHold(
bool b)
1332 while (w && w->parentWidget()) {
1333 w = w->parentWidget();
1335 return (w ? w->window() : 0);
1338 void SchedulerPrivate::registerWindow(
QWidget *wid)
1346 if (!m_windowList.contains(obj))
1351 WId windowId = window->winId();
1352 m_windowList.insert(obj, windowId);
1353 q->connect(window, SIGNAL(destroyed(
QObject*)),
1354 SLOT(slotUnregisterWindow(
QObject*)));
1355 QDBusInterface(
"org.kde.kded",
"/kded",
"org.kde.kded").
1356 call(QDBus::NoBlock,
"registerWindowId", qlonglong(windowId));
1360 void SchedulerPrivate::slotUnregisterWindow(
QObject *obj)
1366 if (it == m_windowList.end())
1368 WId windowId = it.value();
1369 q->disconnect(it.key(), SIGNAL(destroyed(
QObject*)),
1370 q, SLOT(slotUnregisterWindow(
QObject*)));
1371 m_windowList.erase( it );
1372 QDBusInterface(
"org.kde.kded",
"/kded",
"org.kde.kded").
1373 call(QDBus::NoBlock,
"unregisterWindowId", qlonglong(windowId));
1376 void SchedulerPrivate::updateInternalMetaData(
SimpleJob* job)
1381 const KUrl jobUrl = job->
url();
1384 while (it.hasNext()) {
1386 if (it.key().startsWith(QLatin1String(
"{internal~currenthost}"), Qt::CaseInsensitive)) {
1388 }
else if (it.key().startsWith(QLatin1String(
"{internal~allhosts}"), Qt::CaseInsensitive)) {
1395 #include "scheduler.moc"
1396 #include "scheduler_p.moc"