+ further multithreaded indexer dev.

This commit is contained in:
kelson42 2012-03-29 09:42:35 +00:00
parent 20daa92b65
commit e930f65297
2 changed files with 90 additions and 32 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2011 Emmanuel Engelhart <kelson@kiwix.org> * Copyright 2011-2012 Emmanuel Engelhart <kelson@kiwix.org>
* *
* This program is free software; you can redistribute it and/or modify * This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by * it under the terms of the GNU General Public License as published by
@ -41,6 +41,10 @@ namespace kiwix {
stepSize(0), stepSize(0),
keywordsBoostFactor(3) { keywordsBoostFactor(3) {
/* Initialize mutex */
pthread_mutex_init(&articleQueueMutex, NULL);
pthread_mutex_init(&articleExtractorRunningMutex, NULL);
this->setZimFilePath(zimFilePath); this->setZimFilePath(zimFilePath);
/* Read the stopwords file */ /* Read the stopwords file */
@ -59,14 +63,11 @@ namespace kiwix {
/* Compute few things */ /* Compute few things */
this->articleCount = this->zimFileHandler->getNamespaceCount('A'); this->articleCount = this->zimFileHandler->getNamespaceCount('A');
this->stepSize = (float)this->articleCount / (float)100; this->stepSize = (float)this->articleCount / (float)100;
/* Thread mgmt */
this->runningStatus = 0;
} }
void *Indexer::extractArticles(void *ptr) { void *Indexer::extractArticles(void *ptr) {
kiwix::Indexer *self = (kiwix::Indexer *)ptr; kiwix::Indexer *self = (kiwix::Indexer *)ptr;
self->incrementRunningStatus(); self->articleExtractorRunning(true);
unsigned int startOffset = self->zimFileHandler->getNamespaceBeginOffset('A'); unsigned int startOffset = self->zimFileHandler->getNamespaceBeginOffset('A');
unsigned int endOffset = self->zimFileHandler->getNamespaceEndOffset('A'); unsigned int endOffset = self->zimFileHandler->getNamespaceEndOffset('A');
@ -75,24 +76,63 @@ namespace kiwix {
zim::Article currentArticle; zim::Article currentArticle;
while (currentOffset <= endOffset) { while (currentOffset <= endOffset) {
/* Test if the thread should be cancelled */
pthread_testcancel();
/* Redirects are not indexed */ /* Redirects are not indexed */
do { do {
currentArticle = self->zimFileHandler->getArticle(currentOffset++); currentArticle = self->zimFileHandler->getArticle(currentOffset++);
} while (currentArticle.isRedirect() && currentOffset++ != endOffset); } while (currentArticle.isRedirect() && currentOffset++ != endOffset);
cout << currentArticle.getTitle() << endl; /* Add articles to the queue */
indexerArticleToken token;
token.title = currentArticle.getTitle();
self->pushArticleToQueue(token);
/* Test if the thread should be cancelled */
pthread_testcancel();
} }
self->decrementRunningStatus(); self->articleExtractorRunning(false);
pthread_exit(NULL); pthread_exit(NULL);
return NULL; return NULL;
} }
bool Indexer::isArticleQueueEmpty() {
pthread_mutex_lock(&articleQueueMutex);
bool retVal = this->articleQueue.empty();
pthread_mutex_unlock(&articleQueueMutex);
return retVal;
}
void Indexer::pushArticleToQueue(indexerArticleToken &token) {
pthread_mutex_lock(&articleQueueMutex);
this->articleQueue.push(token);
pthread_mutex_unlock(&articleQueueMutex);
}
bool Indexer::popArticleFromQueue(indexerArticleToken &token) {
while (this->isArticleQueueEmpty() && this->isArticleExtractorRunning()) {
sleep(1);
}
if (!this->isArticleQueueEmpty()) {
pthread_mutex_lock(&articleQueueMutex);
token = this->articleQueue.front();
this->articleQueue.pop();
pthread_mutex_unlock(&articleQueueMutex);
} else {
return false;
}
return true;
}
void *Indexer::parseArticles(void *ptr) { void *Indexer::parseArticles(void *ptr) {
kiwix::Indexer *self = (kiwix::Indexer *)ptr;
indexerArticleToken token;
while (self->popArticleFromQueue(token)) {
cout << token.title << endl;
}
pthread_exit(NULL); pthread_exit(NULL);
return NULL; return NULL;
} }
@ -103,32 +143,35 @@ namespace kiwix {
} }
bool Indexer::start() { bool Indexer::start() {
pthread_create(&(this->articleExtracter), NULL, Indexer::extractArticles, ( void *)this); pthread_mutex_lock(&threadIdsMutex);
pthread_detach(this->articleExtracter); pthread_create(&(this->articleExtractor), NULL, Indexer::extractArticles, (void*)this);
cout << "end" << endl; pthread_detach(this->articleExtractor);
pthread_create(&(this->articleParser), NULL, Indexer::parseArticles, (void*)this);
pthread_detach(this->articleParser);
pthread_mutex_unlock(&threadIdsMutex);
return true; return true;
} }
bool Indexer::stop() { bool Indexer::stop() {
pthread_cancel(this->articleExtractor);
return true; return true;
} }
void Indexer::incrementRunningStatus() { void Indexer::articleExtractorRunning(bool value) {
this->runningStatus++; pthread_mutex_lock(&articleExtractorRunningMutex);
this->articleExtractorRunningFlag = value;
pthread_mutex_unlock(&articleExtractorRunningMutex);
} }
void Indexer::decrementRunningStatus() { bool Indexer::isArticleExtractorRunning() {
this->runningStatus--; pthread_mutex_lock(&articleExtractorRunningMutex);
} bool retVal = this->articleExtractorRunningFlag;
pthread_mutex_unlock(&articleExtractorRunningMutex);
unsigned int Indexer::getRunningStatus() { return retVal;
return this->runningStatus;
} }
bool Indexer::isRunning() { bool Indexer::isRunning() {
return this->runningStatus > 0; return this->isArticleExtractorRunning();
} }
void Indexer::setCurrentArticleOffset(unsigned int offset) { void Indexer::setCurrentArticleOffset(unsigned int offset) {

View File

@ -22,11 +22,12 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <stack>
#include <queue>
#include <fstream> #include <fstream>
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <xapian.h>
#include <pthread.h> #include <pthread.h>
#include <unaccent.h> #include <unaccent.h>
#include <zim/file.h> #include <zim/file.h>
@ -38,6 +39,11 @@ using namespace std;
namespace kiwix { namespace kiwix {
struct indexerArticleToken {
string title;
string content;
};
class Indexer { class Indexer {
public: public:
@ -50,15 +56,24 @@ namespace kiwix {
unsigned int getProgression(); unsigned int getProgression();
private: private:
pthread_t articleExtracter, articleParser, indexWriter; pthread_t articleExtractor, articleParser, indexWriter;
pthread_mutex_t articleQueueMutex;
pthread_mutex_t threadIdsMutex;
pthread_mutex_t articleExtractorRunningMutex;
static void *extractArticles(void *ptr); static void *extractArticles(void *ptr);
static void *parseArticles(void *ptr); static void *parseArticles(void *ptr);
static void *writeIndex(void *ptr); static void *writeIndex(void *ptr);
unsigned int runningStatus; void pushArticleToQueue(indexerArticleToken &token);
void incrementRunningStatus(); bool popArticleFromQueue(indexerArticleToken &token);
void decrementRunningStatus(); bool isArticleQueueEmpty();
unsigned int getRunningStatus();
bool articleExtractorRunningFlag;
bool isArticleExtractorRunning();
void articleExtractorRunning(bool value);
std::queue<indexerArticleToken> articleQueue;
protected: protected:
virtual void indexNextPercentPre() = 0; virtual void indexNextPercentPre() = 0;