-module(poisson). -compile(export_all). -import(mysql, [start_link/5, start_link/6]). -define(LOG(Name, Value), io:format("DEBUG: ~s: ~p~n", [Name, Value])). -define(LISTA_NO_WORDS, ["and", "billion", "dolari", "euro", "for", "lei", "miliarde", "milioane", "that", "the"]). -define(LIMITA_POISSON, 1.0). -define(MIN_WORD_LENGTH, 3). start() -> %huge inspiration from here: http://steve.vinoski.net/blog/2007/09/23/tim-bray-and-erlang/ Now1 = now(), Articles = get_article_list(), Pids = [process_list(self(), Article) || Article <- Articles], WordCountDict = lists:foldl( fun(_, WordCountDictAcc) -> receive {WordCountDict1, Pid} -> dict:merge(fun(Key, Value1, Value2) -> NrTotalAparitii1 = element(2, lists:keyfind(nr_total_aparitii, 1, Value1)), NrTotalAparitii2 = element(2, lists:keyfind(nr_total_aparitii, 1, Value2)), Set1 = element(2, lists:keyfind(lista_indecsi, 1, Value1)), Set2 = element(2, lists:keyfind(lista_indecsi, 1, Value2)), [ {nr_total_aparitii, NrTotalAparitii1 + NrTotalAparitii2}, {lista_indecsi, gb_sets:union(Set1, Set2)} ] end, WordCountDict1, WordCountDictAcc) end end, dict:new(), Pids), Now2 = now(), Diff = timer:now_diff(Now2, Now1) / 1000000, ?LOG("Time", Diff), ?LOG("DictSize", dict:size(WordCountDict)), %transform the Dict into a List, so that I can pass it to a python saver module save_dict_data(WordCountDict). save_dict_data(WordCountDict) -> Port = open_port({spawn, "python -u save_poisson_dict.py"}, [{packet, 1}, binary, use_stdio]), Name = "test3", ReqData = term_to_binary({save_poisson_dict, Name}), % Send binary data to save_poisson_dict.py script port_command(Port, ReqData), % Wait for reply from save_poisson_dict.py script receive {Port, {data, RespData}} -> % Convert binary data to term {ok, binary_to_term(RespData)} after 5000 -> {error, timeout} end. process_list(Pid, Article) -> SuffixesList = [{"ul", ""}, {"le", ""}, {"lor", ""}, {"ului", ""}, {"ea", "e"}], spawn(poisson, process_article2, [Pid, Article, SuffixesList]). process_article2(Pid, Article, SuffixesList) -> WordCountDict = process_article(Article, SuffixesList, dict:new()), Pid ! {WordCountDict, self()}. compute_poisson() -> %portam functia compute_poisson din python Now1 = now(), Articles = get_article_list(), case is_list(Articles) of false -> false; true -> NrArticles = length(Articles), WordCoundDict = process_articles(Articles), Now2 = now(), Diff = timer:now_diff(Now2, Now1) / 1000000, ?LOG("Time", Diff), ?LOG("DictSize", dict:size(WordCoundDict)) end. process_articles(Articles) -> %proceseaza fiecare articol din lista %converteste in litere mici, elimina spatiile, ",", ":" etc... %intoarce un dictionar de forma: %dict_return = {"iliescu": {"nr_total_aparitii": nr_aparitii, "lista_indecsi": [lista cu id-urile articolelor in care apare], }} %the parallel implementation started from here: http://nealabq.com/blog/2009/02/16/tic-tac-toe-in-erlang-parallel-processing/ %Pid_parent = self(). SuffixesList = [{"ul", ""}, {"le", ""}, {"lor", ""}, {"ului", ""}, {"ea", "e"}], process_articles(Articles, SuffixesList, dict:new()). process_articles([], SuffixesList, WordCountDict) -> WordCountDict; process_articles([Article|Tail], SuffixesList, WordCountDict) -> process_articles(Tail, SuffixesList, process_article(Article, SuffixesList, WordCountDict)). process_article(Article, SuffixesList, WordCountDict) -> Text = string:to_lower(binary_to_list(lists:nth(5, Article))), %get the words from the article and also retain only the words with %the length >= 3; %also trying to apply the stemming function WordList = lists:map(fun(Word) -> utils:stemming(Word, SuffixesList) end, lists:filter(fun(Word) -> length(Word) >= ?MIN_WORD_LENGTH end, string:tokens(Text, " "))), ArticleId = lists:nth(1, Article), ?LOG("WordCountDict", dict:size(WordCountDict)), count_words(WordList, WordCountDict, ArticleId). count_words([], WordCountDict, ArticleId) -> WordCountDict; count_words([H|T], WordCountDict, ArticleId) -> %?LOG("Head", H), case dict:find(H, WordCountDict) of {ok, Value} -> case lists:keyfind(nr_total_aparitii, 1, Value) of {nr_total_aparitii, NrTotalAparitii} -> Set = element(2, lists:keyfind(lista_indecsi, 1, Value)), NewValue = [{nr_total_aparitii, NrTotalAparitii + 1}, {lista_indecsi, gb_sets:add_element(ArticleId, Set)}]; % NewValue = [{nr_total_aparitii, NrTotalAparitii + 1}, {lista_indecsi, [ArticleId | Set]}]; false -> Set = gb_sets:new(), NewValue = [{nr_total_aparitii, 1}, {lista_indecsi, gb_sets:add_element(ArticleId, Set)}] % NewValue = [{nr_total_aparitii, 1}, {lista_indecsi, [ArticleId]}] end, WordCountDict1 = dict:store(H, NewValue, WordCountDict), count_words(T, WordCountDict1, ArticleId); error -> Set = gb_sets:new(), WordCountDict1 = dict:store(H, [{nr_total_aparitii, 1}, {lista_indecsi, gb_sets:add_element(ArticleId, Set)}], WordCountDict), % WordCountDict1 = dict:store(H, [{nr_total_aparitii, 1}, {lista_indecsi, [ArticleId]}], WordCountDict), count_words(T, WordCountDict1, ArticleId) end. get_article_list() -> %intoarce lista de articole ce va fi folosita %pentru calculul Poisson start_link(p1, config:read_config(db_host), undefined, config:read_config(db_user), config:read_config(db_password), config:read_config(db_name)), Result = mysql:fetch(p1, <<"SELECT * FROM stiri_articol ORDER BY id DESC LIMIT 3">>), case Result of {data, MysqlRes} -> mysql:get_result_rows(MysqlRes); {error, MysqlRes} -> mysql:get_result_reason(MysqlRes) end.