+ first working version of the multithreaded indexer

This commit is contained in:
kelson42 2012-04-02 18:25:10 +00:00
parent 6e66fd176d
commit 9e8d6f3c25
2 changed files with 199 additions and 79 deletions

View File

@ -42,8 +42,12 @@ namespace kiwix {
keywordsBoostFactor(3) { keywordsBoostFactor(3) {
/* Initialize mutex */ /* Initialize mutex */
pthread_mutex_init(&articleQueueMutex, NULL); pthread_mutex_init(&threadIdsMutex, NULL);
pthread_mutex_init(&toParseQueueMutex, NULL);
pthread_mutex_init(&toIndexQueueMutex, NULL);
pthread_mutex_init(&articleExtractorRunningMutex, NULL); pthread_mutex_init(&articleExtractorRunningMutex, NULL);
pthread_mutex_init(&articleParserRunningMutex, NULL);
pthread_mutex_init(&articleIndexerRunningMutex, NULL);
this->setZimFilePath(zimFilePath); this->setZimFilePath(zimFilePath);
@ -65,6 +69,7 @@ namespace kiwix {
this->stepSize = (float)this->articleCount / (float)100; this->stepSize = (float)this->articleCount / (float)100;
} }
/* Article extractor methods */
void *Indexer::extractArticles(void *ptr) { void *Indexer::extractArticles(void *ptr) {
kiwix::Indexer *self = (kiwix::Indexer *)ptr; kiwix::Indexer *self = (kiwix::Indexer *)ptr;
self->articleExtractorRunning(true); self->articleExtractorRunning(true);
@ -82,11 +87,11 @@ namespace kiwix {
} while (currentArticle.isRedirect() && currentOffset++ != endOffset); } while (currentArticle.isRedirect() && currentOffset++ != endOffset);
/* Add articles to the queue */ /* Add articles to the queue */
indexerArticleToken token; indexerToken token;
token.title = currentArticle.getTitle(); token.title = currentArticle.getTitle();
token.url = currentArticle.getLongUrl(); token.url = currentArticle.getLongUrl();
token.content = string(currentArticle.getData().data(), currentArticle.getData().size()); token.content = string(currentArticle.getData().data(), currentArticle.getData().size());
self->pushArticleToQueue(token); self->pushToParseQueue(token);
/* Test if the thread should be cancelled */ /* Test if the thread should be cancelled */
pthread_testcancel(); pthread_testcancel();
@ -97,107 +102,6 @@ namespace kiwix {
return NULL; return NULL;
} }
bool Indexer::isArticleQueueEmpty() {
pthread_mutex_lock(&articleQueueMutex);
bool retVal = this->articleQueue.empty();
pthread_mutex_unlock(&articleQueueMutex);
return retVal;
}
void Indexer::pushArticleToQueue(indexerArticleToken &token) {
pthread_mutex_lock(&articleQueueMutex);
this->articleQueue.push(token);
pthread_mutex_unlock(&articleQueueMutex);
sleep(int(this->articleQueue.size() / 200) / 10);
}
bool Indexer::popArticleFromQueue(indexerArticleToken &token) {
while (this->isArticleQueueEmpty() && this->isArticleExtractorRunning()) {
sleep(0.5);
}
if (!this->isArticleQueueEmpty()) {
pthread_mutex_lock(&articleQueueMutex);
token = this->articleQueue.front();
this->articleQueue.pop();
pthread_mutex_unlock(&articleQueueMutex);
} else {
return false;
}
return true;
}
void *Indexer::parseArticles(void *ptr) {
kiwix::Indexer *self = (kiwix::Indexer *)ptr;
size_t found;
indexerArticleToken token;
MyHtmlParser htmlParser;
while (self->popArticleFromQueue(token)) {
cout << token.title << endl;
/* The parser generate a lot of exceptions which should be avoided */
try {
htmlParser.parse_html(token.content, "UTF-8", true);
} catch (...) {
}
/* Get the title */
string accentedTitle = htmlParser.title;
if (accentedTitle.empty()) {
accentedTitle = token.title;
}
/* If content does not have the noindex meta tag */
/* Seems that the parser generates an exception in such case */
found = htmlParser.dump.find("NOINDEX");
if (found == string::npos) {
/* count words */
stringstream countWordStringStream;
countWordStringStream << self->countWords(htmlParser.dump);
const std::string wordCountString = countWordStringStream.str();
/* snippet */
std::string snippet = std::string(htmlParser.dump, 0, 300);
std::string::size_type last = snippet.find_last_of('.');
if (last == snippet.npos)
last = snippet.find_last_of(' ');
if (last != snippet.npos)
snippet = snippet.substr(0, last);
/* size */
stringstream sizeStringStream;
sizeStringStream << token.content.size() / 1024;
const std::string size = sizeStringStream.str();
}
}
pthread_exit(NULL);
return NULL;
}
void *Indexer::writeIndex(void *ptr) {
pthread_exit(NULL);
return NULL;
}
bool Indexer::start() {
pthread_mutex_lock(&threadIdsMutex);
pthread_create(&(this->articleExtractor), NULL, Indexer::extractArticles, (void*)this);
pthread_detach(this->articleExtractor);
pthread_create(&(this->articleParser), NULL, Indexer::parseArticles, (void*)this);
pthread_detach(this->articleParser);
pthread_mutex_unlock(&threadIdsMutex);
return true;
}
bool Indexer::stop() {
pthread_cancel(this->articleExtractor);
return true;
}
void Indexer::articleExtractorRunning(bool value) { void Indexer::articleExtractorRunning(bool value) {
pthread_mutex_lock(&articleExtractorRunningMutex); pthread_mutex_lock(&articleExtractorRunningMutex);
this->articleExtractorRunningFlag = value; this->articleExtractorRunningFlag = value;
@ -211,8 +115,198 @@ namespace kiwix {
return retVal; return retVal;
} }
/* Article parser methods */
void *Indexer::parseArticles(void *ptr) {
kiwix::Indexer *self = (kiwix::Indexer *)ptr;
size_t found;
indexerToken token;
while (self->popFromToParseQueue(token)) {
MyHtmlParser htmlParser;
/* The parser generate a lot of exceptions which should be avoided */
try {
htmlParser.parse_html(token.content, "UTF-8", true);
} catch (...) {
}
/* If content does not have the noindex meta tag */
/* Seems that the parser generates an exception in such case */
found = htmlParser.dump.find("NOINDEX");
if (found == string::npos) {
/* Get the accented title */
token.accentedTitle = (htmlParser.title.empty() ? token.title : htmlParser.title);
/* count words */
stringstream countWordStringStream;
countWordStringStream << self->countWords(htmlParser.dump);
token.wordCount = countWordStringStream.str();
/* snippet */
std::string snippet = std::string(htmlParser.dump, 0, 300);
std::string::size_type last = snippet.find_last_of('.');
if (last == snippet.npos)
last = snippet.find_last_of(' ');
if (last != snippet.npos)
snippet = snippet.substr(0, last);
token.snippet = snippet;
/* size */
stringstream sizeStringStream;
sizeStringStream << token.content.size() / 1024;
token.size = sizeStringStream.str();
/* Remove accent */
token.title = removeAccents(token.accentedTitle);
token.keywords = removeAccents(htmlParser.keywords);
token.content = removeAccents(htmlParser.dump);
self->pushToIndexQueue(token);
/* Test if the thread should be cancelled */
pthread_testcancel(); }
}
self->articleParserRunning(false);
pthread_exit(NULL);
return NULL;
}
void Indexer::articleParserRunning(bool value) {
pthread_mutex_lock(&articleParserRunningMutex);
this->articleParserRunningFlag = value;
pthread_mutex_unlock(&articleParserRunningMutex);
}
bool Indexer::isArticleParserRunning() {
pthread_mutex_lock(&articleParserRunningMutex);
bool retVal = this->articleParserRunningFlag;
pthread_mutex_unlock(&articleParserRunningMutex);
return retVal;
}
/* Article indexer methods */
void *Indexer::indexArticles(void *ptr) {
kiwix::Indexer *self = (kiwix::Indexer *)ptr;
indexerToken token;
while (self->popFromToIndexQueue(token)) {
self->indexNextArticle(token.url,
token.accentedTitle,
token.title,
token.keywords,
token.content,
token.snippet,
token.size,
token.wordCount
);
}
self->indexNextPercentPost();
self->articleIndexerRunning(false);
pthread_exit(NULL);
return NULL;
}
void Indexer::articleIndexerRunning(bool value) {
pthread_mutex_lock(&articleIndexerRunningMutex);
this->articleIndexerRunningFlag = value;
pthread_mutex_unlock(&articleIndexerRunningMutex);
}
bool Indexer::isArticleIndexerRunning() {
pthread_mutex_lock(&articleIndexerRunningMutex);
bool retVal = this->articleIndexerRunningFlag;
pthread_mutex_unlock(&articleIndexerRunningMutex);
return retVal;
}
/* ToParseQueue methods */
bool Indexer::isToParseQueueEmpty() {
pthread_mutex_lock(&toParseQueueMutex);
bool retVal = this->toParseQueue.empty();
pthread_mutex_unlock(&toParseQueueMutex);
return retVal;
}
void Indexer::pushToParseQueue(indexerToken &token) {
pthread_mutex_lock(&toParseQueueMutex);
this->toParseQueue.push(token);
pthread_mutex_unlock(&toParseQueueMutex);
sleep(int(this->toParseQueue.size() / 200) / 10);
}
bool Indexer::popFromToParseQueue(indexerToken &token) {
while (this->isToParseQueueEmpty() && this->isArticleExtractorRunning()) {
sleep(0.5);
}
if (!this->isToParseQueueEmpty()) {
pthread_mutex_lock(&toParseQueueMutex);
token = this->toParseQueue.front();
this->toParseQueue.pop();
pthread_mutex_unlock(&toParseQueueMutex);
} else {
return false;
}
return true;
}
/* ToIndexQueue methods */
bool Indexer::isToIndexQueueEmpty() {
pthread_mutex_lock(&toIndexQueueMutex);
bool retVal = this->toIndexQueue.empty();
pthread_mutex_unlock(&toIndexQueueMutex);
return retVal;
}
void Indexer::pushToIndexQueue(indexerToken &token) {
pthread_mutex_lock(&toIndexQueueMutex);
this->toIndexQueue.push(token);
pthread_mutex_unlock(&toIndexQueueMutex);
sleep(int(this->toIndexQueue.size() / 200) / 10);
}
bool Indexer::popFromToIndexQueue(indexerToken &token) {
while (this->isToIndexQueueEmpty() && this->isArticleParserRunning()) {
sleep(0.5);
}
if (!this->isToIndexQueueEmpty()) {
pthread_mutex_lock(&toIndexQueueMutex);
token = this->toIndexQueue.front();
this->toIndexQueue.pop();
pthread_mutex_unlock(&toIndexQueueMutex);
} else {
return false;
}
return true;
}
bool Indexer::start() {
this->indexNextPercentPre();
pthread_mutex_lock(&threadIdsMutex);
pthread_create(&(this->articleExtractor), NULL, Indexer::extractArticles, (void*)this);
pthread_detach(this->articleExtractor);
pthread_create(&(this->articleParser), NULL, Indexer::parseArticles, (void*)this);
pthread_detach(this->articleParser);
pthread_create(&(this->articleIndexer), NULL, Indexer::indexArticles, (void*)this);
pthread_detach(this->articleIndexer);
pthread_mutex_unlock(&threadIdsMutex);
return true;
}
bool Indexer::stop() {
pthread_cancel(this->articleExtractor);
return true;
}
bool Indexer::isRunning() { bool Indexer::isRunning() {
return this->isArticleExtractorRunning(); return this->isArticleExtractorRunning() || this->isArticleIndexerRunning() || this->isArticleParserRunning();
} }
void Indexer::setCurrentArticleOffset(unsigned int offset) { void Indexer::setCurrentArticleOffset(unsigned int offset) {

View File

@ -39,10 +39,15 @@ using namespace std;
namespace kiwix { namespace kiwix {
struct indexerArticleToken { struct indexerToken {
string title;
string url; string url;
string accentedTitle;
string title;
string keywords;
string content; string content;
string snippet;
string size;
string wordCount;
}; };
class Indexer { class Indexer {
@ -57,24 +62,45 @@ namespace kiwix {
unsigned int getProgression(); unsigned int getProgression();
private: private:
pthread_t articleExtractor, articleParser, indexWriter;
pthread_mutex_t articleQueueMutex;
pthread_mutex_t threadIdsMutex; pthread_mutex_t threadIdsMutex;
/* Article extraction */
pthread_t articleExtractor;
pthread_mutex_t articleExtractorRunningMutex; pthread_mutex_t articleExtractorRunningMutex;
static void *extractArticles(void *ptr); static void *extractArticles(void *ptr);
static void *parseArticles(void *ptr);
static void *writeIndex(void *ptr);
void pushArticleToQueue(indexerArticleToken &token);
bool popArticleFromQueue(indexerArticleToken &token);
bool isArticleQueueEmpty();
bool articleExtractorRunningFlag; bool articleExtractorRunningFlag;
bool isArticleExtractorRunning(); bool isArticleExtractorRunning();
void articleExtractorRunning(bool value); void articleExtractorRunning(bool value);
std::queue<indexerArticleToken> articleQueue; /* Article parsing */
pthread_t articleParser;
pthread_mutex_t articleParserRunningMutex;
static void *parseArticles(void *ptr);
bool articleParserRunningFlag;
bool isArticleParserRunning();
void articleParserRunning(bool value);
/* Index writting */
pthread_t articleIndexer;
pthread_mutex_t articleIndexerRunningMutex;
static void *indexArticles(void *ptr);
bool articleIndexerRunningFlag;
bool isArticleIndexerRunning();
void articleIndexerRunning(bool value);
/* To parse queue */
std::queue<indexerToken> toParseQueue;
pthread_mutex_t toParseQueueMutex;
void pushToParseQueue(indexerToken &token);
bool popFromToParseQueue(indexerToken &token);
bool isToParseQueueEmpty();
/* To index queue */
std::queue<indexerToken> toIndexQueue;
pthread_mutex_t toIndexQueueMutex;
void pushToIndexQueue(indexerToken &token);
bool popFromToIndexQueue(indexerToken &token);
bool isToIndexQueueEmpty();
protected: protected:
virtual void indexNextPercentPre() = 0; virtual void indexNextPercentPre() = 0;