fix some sphinx related bugs, now it can be used to build sphinx index, still in experiment stage, add `giza' library to query sphinx in http_fontend

src
Kevin Lynx 11 years ago
parent 7ab79b5d2e
commit 7b1a435a43

@ -4,6 +4,7 @@
{ibrowse, ".*", {git, "git@github.com:cmullaparthi/ibrowse.git", "HEAD"}},
{bson, ".*", {git, "git@github.com:mongodb/bson-erlang.git", "HEAD"}},
{mongodb, ".*", {git, "git@github.com:mongodb/mongodb-erlang.git", "HEAD"}},
{kdht, ".*", {git, "git@github.com:kevinlynx/kdht.git", "HEAD"}}
{kdht, ".*", {git, "git@github.com:kevinlynx/kdht.git", "HEAD"}},
{giza, ".*", {git, "https://github.com/kevinlynx/giza.git", "HEAD"}}
]}.

@ -32,6 +32,7 @@ start(DBHost, DBPort, Port, PoolSize) ->
vlog:start_link("log/crawler_http.log", ?INFO),
code:add_path("deps/bson/ebin"),
code:add_path("deps/mongodb/ebin"),
code:add_path("deps/giza/ebin"),
Apps = [crypto, public_key, ssl, inets, bson, mongodb],
[application:start(App) || App <- Apps],
gen_server:start({local, srv_name()}, ?MODULE, [DBHost, DBPort, Port, PoolSize], []).

@ -14,7 +14,7 @@
code_change/3]).
-export([start_link/3]).
-export([worker_run/0]).
-record(state, {processed = 0, worker_cnt, wait_workers = []}).
-record(state, {processed = 0, saved = false, worker_cnt, wait_workers = []}).
-define(WORKER_WAIT, 30*1000).
-define(STATE_FILE, "priv/sphinx_builder.sta").
@ -32,7 +32,8 @@ init([IP, Port, WorkerCnt]) ->
{ok, #state{processed = Offset, worker_cnt = WorkerCnt}}.
handle_call({get, Pid}, _From, State) ->
#state{processed = Processed, worker_cnt = WorkerCnt, wait_workers = WaitWorkers} = State,
#state{processed = Processed, worker_cnt = WorkerCnt, wait_workers = WaitWorkers,
saved = Saved} = State,
{NewProcessed, Ret} = case sphinx_torrent:get() of
{} ->
{Processed, wait};
@ -41,8 +42,9 @@ handle_call({get, Pid}, _From, State) ->
{Processed + 1, Tor}
end,
NewWaits = update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers),
check_all_done(NewWaits, WorkerCnt, NewProcessed, length(NewWaits) > length(WaitWorkers)),
{reply, {NewProcessed, Ret}, State#state{processed = NewProcessed, wait_workers = NewWaits}}.
NewSaved = (check_all_done(NewWaits, WorkerCnt, NewProcessed, Saved)) and (Ret == wait),
{reply, {NewProcessed, Ret}, State#state{processed = NewProcessed, wait_workers = NewWaits,
saved = NewSaved}}.
handle_cast(_, State) ->
{noreply, State}.
@ -66,19 +68,20 @@ update_wait_workers(Pid, NewProcessed, Processed, WaitWorkers) ->
WaitWorkers
end.
check_all_done(WaitWorkers, WorkerCnt, Processed, true)
check_all_done(WaitWorkers, WorkerCnt, Processed, false)
when length(WaitWorkers) == WorkerCnt ->
Try = sphinx_torrent:try_times(),
case Try > 5 of
true ->
io:format("haven't got any torrents for a while, force save~n", []),
save_result(Processed),
sphinx_xml:force_save();
sphinx_xml:force_save(),
true;
false ->
ok
false
end;
check_all_done(_WaitWorkers, _WaitCnt, _Processed, _) ->
ok.
check_all_done(_WaitWorkers, _WaitCnt, _Processed, Saved) ->
Saved.
worker_run() ->
Ret = gen_server:call(srv_name(), {get, self()}),
@ -91,9 +94,9 @@ do_process({_, wait}) ->
do_process({ID, Doc}) ->
case db_store_mongo:decode_torrent_item(Doc) of
{single, Hash, {Name, _}, Query, CreatedAt} ->
sphinx_xml:insert({Hash, Name, [], ID, Query, CreatedAt});
sphinx_xml:insert({ID, Hash, Name, [], Query, CreatedAt});
{multi, Hash, {Name, Files}, Query, CreatedAt} ->
sphinx_xml:insert({Hash, Name, Files, ID, Query, CreatedAt})
sphinx_xml:insert({ID, Hash, Name, Files, Query, CreatedAt})
end.
load_result() ->

@ -6,6 +6,7 @@
-module(sphinx_builder_sup).
-behaviour(supervisor).
-export([start_standalone/1, start_standalone/3, start_link/3]).
-export([init_indexes/0]).
-export([init/1]).
start_dep_apps() ->
@ -29,6 +30,13 @@ start_standalone(IP, Port, Size) ->
start_link(IP, Port, Count) ->
supervisor:start_link({local, srv_name()}, ?MODULE, [IP, Port, Count]).
init_indexes() ->
config:start_link("sphinx_builder.config", fun() -> config_default() end),
io:format("try init sphinx index files~n", []),
Conf = config:get(sphinx_config_file),
MainFile = config:get(main_source_file),
DeltaFile = config:get(delta_source_file),
sphinx_cmd:build_init_index(MainFile, DeltaFile, Conf).
%%
srv_name() ->
?MODULE.
@ -38,14 +46,14 @@ init([IP, Port, Count]) ->
config:start_link("sphinx_builder.config", fun() -> config_default() end),
Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]},
Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]},
Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]},
Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 1]}, permanent, 1000, worker, [vlog]},
Children = [Logger, Builder, Indexer],
{ok, {Spec, Children}}.
config_default() ->
[{load_torrent_interval, 500}, % millseconds
{max_doc_per_file, 1000},
[{max_doc_per_file, 1000},
{torrent_batch_count, 100},
{main_source_file, "var/source/main.xml"},
{delta_source_file, "var/source/delta.xml"},
{sphinx_config_file, "var/etc/csft.conf"},
{delta_index_name, "delta"},

@ -4,10 +4,25 @@
%% 07.28.2013
%%
-module(sphinx_cmd).
-export([build_delta_index/5, merge_index/3]).
-export([build_init_index/3, build_delta_index/5, merge_index/3]).
-compile(export_all).
-include("vlog.hrl").
build_init_index(MainFile, DeltaFile, CfgFile) ->
case filelib:is_file(MainFile) and filelib:is_file(DeltaFile) of
true ->
io:format("main/delta index file exists, ignore~n", []);
false ->
do_build_init_index(MainFile, DeltaFile, CfgFile)
end.
do_build_init_index(MainFile, DeltaFile, CfgFile) ->
sphinx_doc:write_test_xml(MainFile),
sphinx_doc:write_test_xml(DeltaFile),
Cmd = "indexer -c " ++ CfgFile ++ " --all",
Ret = os:cmd(Cmd),
io:format("~p~n", [Ret]).
% Index file, Delta index name
build_delta_index(IndexFile, Delta, CfgFile, MinID, MaxID) ->
Cmd = "indexer -c " ++ CfgFile ++ " --rotate " ++ Delta,

@ -4,12 +4,16 @@
%%
-module(sphinx_doc).
-include_lib("xmerl/include/xmerl.hrl").
-export([write_xml/2, element/6]).
-export([write_test_xml/1, write_xml/2, element/6]).
-compile(export_all).
-define(PROLOG, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>").
-define(CR, #xmlText{value="\
"}).
write_test_xml(File) ->
Elem = element("33FB6D00DD5E363653235449527EC1DC9959FCAB", "test", [], 1, 1, 1374508800),
write_xml(File, [Elem]).
write_xml(File, Elems) ->
Doc = {'sphinx:docset', [], [schema(), ?CR] ++ Elems},
Content = xmerl:export_simple([Doc], xmerl_xml_cdata, [{prolog, ?PROLOG}]),

@ -13,13 +13,13 @@
terminate/2,
code_change/3]).
-export([start_link/0, insert/1, force_save/0]).
-record(state, {docs = [], max, startid = -1}).
-record(state, {docs = [], ids = [], max}).
start_link() ->
gen_server:start_link({local, srv_name()}, ?MODULE, [], []).
insert(Doc) ->
gen_server:cast(srv_name(), {insert, Doc}).
gen_server:call(srv_name(), {insert, Doc}, infinity).
force_save() ->
gen_server:cast(srv_name(), save).
@ -38,42 +38,48 @@ terminate(_, State) ->
code_change(_, _, State) ->
{ok, State}.
handle_cast(save, #state{docs = Docs} = State) when length(Docs) > 0 ->
#state{startid = StartID} = State,
EndID = length(Docs) + StartID - 1,
try_save(Docs, 0, StartID, EndID),
{noreply, State#state{docs = []}};
handle_cast(save, #state{docs = []} = State) ->
{noreply, State};
handle_cast({insert, {Hash, Name, Files, ID, Query, CreatedAt}}, State) ->
#state{docs = Docs, max = Max, startid = StartID} = State,
NewStartID = if length(Docs) == 0 -> ID; true -> StartID end,
Doc = sphinx_doc:element(Hash, Name, Files, ID, Query, CreatedAt),
NewDocs = try_save([Doc|Docs], Max, NewStartID, ID),
{noreply, State#state{docs = NewDocs, startid = NewStartID}};
handle_cast(save, #state{docs = Docs, ids = IDs} = State) when length(Docs) > 0 ->
try_save(Docs, 0, IDs),
{noreply, State#state{docs = [], ids = []}};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call({insert, {ID, Hash, Name, Files, Query, CreatedAt}}, _From, State) ->
#state{docs = Docs, ids = IDs, max = Max} = State,
Doc = sphinx_doc:element(Hash, Name, Files, ID, Query, CreatedAt),
{NewDocs, NewIDs} = try_save([Doc|Docs], Max, [ID|IDs]),
{reply, ok, State#state{docs = NewDocs, ids = NewIDs}};
handle_call(_, _From, State) ->
{noreply, State}.
handle_info(_, State) ->
{noreply, State}.
try_save(Docs, Max, StartID, NowID) when length(Docs) >= Max, length(Docs) > 0 ->
try_save(Docs, Max, IDs) when length(Docs) >= Max, length(Docs) > 0 ->
File = config:get(delta_source_file),
Conf = config:get(sphinx_config_file),
Delta = config:get(delta_index_name),
Main = config:get(main_index_name),
{StartID, EndID} = get_id_range(IDs),
io:format("sync sphinx index ~p documents...", [length(Docs)]),
?I(?FMT("save sphinx xml file ~s", [File])),
sphinx_doc:write_xml(File, Docs),
?I(?FMT("build delta index : ~s", [Delta])),
sphinx_cmd:build_delta_index(File, Delta, Conf, StartID, NowID),
sphinx_cmd:build_delta_index(File, Delta, Conf, StartID, EndID),
?I(?FMT("merge delta to main index ~s -> ~s", [Delta, Main])),
sphinx_cmd:merge_index(Main, Delta, Conf),
?I("index updated done"),
io:format("done~n", []),
[];
try_save(Docs, _, _, _) ->
Docs.
{[], []};
try_save(Docs, _, IDs) ->
{Docs, IDs}.
get_id_range([First|IDs]) ->
lists:foldl(fun(ID, {Min, Max}) ->
{min(ID, Min), max(ID, Max)}
end, {First, First}, IDs).

@ -1,3 +1,7 @@
## 07.30.2013
* add sphinx (coreseek which based on sphinx) to help searhcing, in expirement stage
## 07.21.2013
* rewrite hash_reader, now it will keep a wait_download cache

@ -3,10 +3,12 @@ mkdir bin\deps\bson\ebin
mkdir bin\deps\mongodb\ebin
mkdir bin\deps\kdht\ebin
mkdir bin\deps\ibrowse\ebin
mkdir bin\deps\giza\ebin
copy deps\bson\ebin\*.* bin\deps\bson\ebin\
copy deps\mongodb\ebin\*.* bin\deps\mongodb\ebin\
copy deps\kdht\ebin\*.* bin\deps\kdht\ebin\
copy deps\ibrowse\ebin\*.* bin\deps\ibrowse\ebin\
copy deps\giza\ebin\*.* bin\deps\giza\ebin\
mkdir bin\www
copy www\*.* bin\www\
copy tools\*.* bin\

@ -0,0 +1 @@
erl -pa ebin -noshell -run sphinx_builder_sup init_indexes

@ -0,0 +1 @@
erl -pa ebin -noshell -run sphinx_builder_sup start_standalone localhost 27017 5
Loading…
Cancel
Save