change sphinx torrent loading using an existing cursor

src
Kevin Lynx 11 years ago
parent 92826bf848
commit 486c354ba0

@ -46,7 +46,7 @@ init([IP, Port, Count]) ->
config:start_link("sphinx_builder.config", fun() -> config_default() end),
Builder = {sphinx_builder, {sphinx_builder, start_link, [IP, Port, Count]}, permanent, 1000, worker, [sphinx_builder]},
Indexer = {sphinx_xml, {sphinx_xml, start_link, []}, permanent, 1000, worker, [sphinx_xml]},
Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 1]}, permanent, 1000, worker, [vlog]},
Logger = {vlog, {vlog, start_link, ["log/sphinx_build.log", 0]}, permanent, 1000, worker, [vlog]},
Children = [Logger, Builder, Indexer],
{ok, {Spec, Children}}.

@ -13,11 +13,12 @@
terminate/2,
code_change/3]).
-export([start_link/3, get/0, try_times/0]).
-export([do_load_torrents/2]). % disable warning only
-define(DBNAME, torrents).
-define(COLLNAME, hashes).
-define(POOLNAME, db_pool).
-define(WAIT_TIME, 30*1000).
-record(state, {offset = 0, max, try_times = 0, tors = []}).
-record(state, {offset = 0, max, try_times = 0, tors = [], cursor}).
start_link(IP, Port, Offset) ->
gen_server:start_link({local, srv_name()}, ?MODULE, [IP, Port, Offset], []).
@ -38,12 +39,15 @@ init([IP, Port, Offset]) ->
{ok, #state{offset = Offset, max = Max}, 0}.
handle_cast(load, State) ->
#state{offset = Skip, max = Max, tors = Tors} = State,
#state{cursor = Cursor, offset = Skip, max = Max, tors = Tors} = State,
Request = Max * 2 div 3,
?T(?FMT("request next ~p torrents", [Request])),
LoadTors = do_load_torrents(Skip, Request),
LoadTors = load_next_batch(Cursor, Request),
case length(LoadTors) of
0 -> timer:send_after(?WAIT_TIME, try_load);
0 ->
?T(?FMT("no torrents in cursor ~p", [Cursor])),
mongo_cursor:close(Cursor),
timer:send_after(?WAIT_TIME, try_load);
_ -> ok
end,
?T(?FMT("load ~p torrents", [length(LoadTors)])),
@ -56,14 +60,13 @@ handle_cast(stop, State) ->
handle_info(try_load, State) ->
#state{offset = Skip, max = Max, try_times = Try} = State,
?T(?FMT("try load ~p torrents from ~p", [Max, Skip])),
LoadTors = do_load_torrents(Skip, Max),
LoadCnt = length(LoadTors),
NewTry = case LoadCnt > 0 of
true -> 0;
false -> timer:send_after(?WAIT_TIME, try_load), Try + 1
end,
?T(?FMT("load ~p torrents, try times ~p", [LoadCnt, NewTry])),
{noreply, State#state{offset = Skip + LoadCnt, tors = LoadTors, try_times = NewTry}};
case load_cursor_batch(Skip, Max) of
{} ->
timer:send_after(?WAIT_TIME, try_load),
{noreply, State#state{try_times = Try + 1}};
{Cursor, R} ->
{noreply, State#state{try_times = 0, offset = Skip + length(R), tors = R, cursor = Cursor}}
end;
handle_info(timeout, State) ->
self() ! try_load,
@ -97,6 +100,26 @@ try_load_next(Tors, Max) when length(Tors) == Max div 3 ->
try_load_next(_, _) ->
ok.
load_cursor(Skip, Size) ->
Conn = mongo_pool:get(?POOLNAME),
mongo:do(safe, master, Conn, ?DBNAME, fun() ->
mongo:find(?COLLNAME, {}, {}, Skip, Size)
end).
load_cursor_batch(Skip, Size) ->
Cursor = load_cursor(Skip, Size),
case load_next_batch(Cursor, Size) of
[] ->
mongo_cursor:close(Cursor), {};
R ->
{Cursor, R}
end.
% will cause `get_more'
load_next_batch(Cursor, Size) ->
mongo_cursor:take(Cursor, Size).
% will cause lots of queries
do_load_torrents(Skip, Size) ->
Conn = mongo_pool:get(?POOLNAME),
mongo:do(safe, master, Conn, ?DBNAME, fun() ->

Loading…
Cancel
Save