NOTE: rewrite hash_reader, config changed, dht_hash database changed, require to remove existed dht_hash database

src
Kevin Lynx 11 years ago
parent 75b3d82f4c
commit dcf0181839

@ -15,6 +15,7 @@
terminate/2,
code_change/3]).
-export([start_link/5, stop/0, insert/1]).
-export([do_save/1]). % to avoid unused warning
-record(state, {cache_time, cache_max}).
-define(TBLNAME, hash_table).
-define(DBPOOL, hash_write_db).
@ -57,7 +58,7 @@ handle_call(_, _From, State) ->
handle_info(do_save_cache, #state{cache_time = Time} = State) ->
?T("timeout to save cache hashes"),
do_save(table_size(?TBLNAME)),
do_save_merge(table_size(?TBLNAME)),
schedule_save(Time),
{noreply, State};
@ -83,10 +84,36 @@ try_save(#state{cache_max = Max}) ->
try_save(Size, Max) when Size >= Max ->
?T(?FMT("try save all cache hashes ~p", [Size])),
do_save(Size);
do_save_merge(Size);
try_save(_, _) ->
ok.
%% new method
%% merge hashes into database, to decrease hashes processed by hash_reader
do_save_merge(0) ->
ok;
do_save_merge(_) ->
First = ets:first(?TBLNAME),
ReqAt = time_util:now_seconds(),
do_save(First, ReqAt),
ets:delete_all_objects(?TBLNAME).
do_save('$end_of_table', _) ->
0;
do_save(Key, ReqAt) ->
Conn = mongo_pool:get(?DBPOOL),
ReqCnt = get_req_cnt(Key),
BHash = list_to_binary(Key),
Cmd = {findAndModify, ?HASH_COLLNAME, query, {'_id', BHash},
update, {'$inc', {req_cnt, ReqCnt}, '$set', {req_at, ReqAt}},
fields, {'_id', 1}, upsert, true, new, false},
mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
mongo:command(Cmd)
end),
Next = ets:next(?TBLNAME, Key),
ReqCnt + do_save(Next, ReqAt).
%% old method
do_save(0) ->
ok;
do_save(_) ->

@ -0,0 +1,159 @@
%%
%% hash_download.erl
%% Kevin Lynx
%% 07.21.2013
%%
-module(hash_download).
-include("vlog.hrl").
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/1]).
-record(state, {dbpool, downloader, downloading = 0, max}).
-define(WAIT_TIME, 1*60*1000).
start_link(DBPool) ->
gen_server:start_link(?MODULE, [DBPool], []).
init([DBPool]) ->
{ok, DownPid} = tor_download:start_link(),
tor_download_stats:register(DownPid),
Max = config:get(max_download_per_reader, 50),
{ok, #state{dbpool = DBPool, downloader = DownPid, max = Max}, 0}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast({process_hash, Doc}, State) ->
#state{downloading = DownCnt, downloader = DownPid, max = Max} = State,
{BHash} = bson:lookup('_id', Doc),
Hash = binary_to_list(BHash),
ReqCnt = hash_reader_common:get_req_cnt(Doc),
Conn = db_conn(State),
AddDown = case db_store_mongo:inc_announce(Conn, Hash, ReqCnt) of
true ->
?T(?FMT("hash ~s already exists in db", [Hash])),
hash_reader_common:on_updated(Conn),
0;
false ->
schedule_download(Conn, DownPid, Hash)
end,
case AddDown + DownCnt < Max of
true ->
schedule_next();
false ->
?T(?FMT("reached the max download ~p, wait", [Max])),
wait_downloader_notify
end,
{noreply, State#state{downloading = DownCnt + AddDown}};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
handle_info({got_torrent, failed, _Hash}, State) ->
#state{downloading = D} = State,
schedule_next(),
hash_reader_stats:handle_download_failed(),
{noreply, State#state{downloading = D - 1}};
handle_info({got_torrent, ok, Hash, Content}, State) ->
schedule_next(),
Conn = db_conn(State),
true = is_binary(Content),
SaveTor = config:get(save_torrent, true),
if SaveTor -> loc_torrent_cache:save(Conn, Hash, Content); true -> ok end,
NewState = got_torrent_content(State, Hash, Content),
hash_reader_stats:handle_download_ok(),
{noreply, NewState};
handle_info({got_torrent_from_cache, Hash, Content}, State) ->
on_used_cache(),
schedule_next(),
NewState = got_torrent_content(State, Hash, Content),
{noreply, NewState};
handle_info(timeout, State) ->
schedule_next(),
{noreply, State}.
schedule_next() ->
case hash_download_cache:get_one() of
{} ->
timer:send_after(?WAIT_TIME);
Doc ->
gen_server:cast(self(), {process_hash, Doc})
end.
schedule_download(Conn, Pid, Hash) ->
TryFilter = config:get(check_cache, false),
Down = case TryFilter of
true ->
db_hash_index:exist(Conn, Hash);
false ->
true
end,
try_download(Down, Conn, Pid, Hash).
try_download(false, _, _, Hash) ->
?T(?FMT("hash does not exist in index_cache, filter it ~s", [Hash])),
0;
try_download(true, Conn, Pid, Hash) ->
case loc_torrent_cache:load(Conn, Hash) of
not_found ->
tor_download:download(Pid, Hash);
Content ->
?T(?FMT("found torrent in local cache ~s", [Hash])),
self() ! {got_torrent_from_cache, Hash, Content}
end,
1.
db_conn(State) ->
#state{dbpool = DBPool} = State,
mongo_pool:get(DBPool).
got_torrent_content(State, MagHash, Content) ->
#state{downloading = D} = State,
case catch(torrent_file:parse(Content)) of
{'EXIT', _} ->
?W(?FMT("parse a torrent failed ~s", [MagHash])),
skip;
{Type, Info} ->
got_torrent(State, MagHash, Type, Info)
end,
State#state{downloading = D - 1}.
got_torrent(State, Hash, single, {Name, Length}) ->
try_save(State, Hash, Name, Length, []);
got_torrent(State, Hash, multi, {Root, Files}) ->
try_save(State, Hash, Root, 0, Files).
try_save(State, Hash, Name, Length, Files) ->
Conn = db_conn(State),
case catch db_store_mongo:insert(Conn, Hash, Name, Length, Files) of
{'EXIT', Reason} ->
?E(?FMT("save torrent failed ~p", [Reason]));
_ ->
on_saved(Conn)
end.
on_used_cache() ->
hash_reader_stats:handle_used_cache().
on_saved(Conn) ->
% `get_peers' here means we have processed a request
db_system:stats_get_peers(Conn),
% increase the `new' counter
db_system:stats_new_saved(Conn),
hash_reader_stats:handle_insert().

@ -0,0 +1,101 @@
%%
%% hash_download_cache.erl
%% Kevin Lynx
%% cache these wait_download hashes, the downloader will read hashes from here,
%% to avoid database operation, if the cache is too big, save it then.
%% 07.21.2013
%%
-module(hash_download_cache).
-include("vlog.hrl").
-include("db_common.hrl").
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/1,
stop/0,
insert/1,
get_one/0]).
-record(state, {cache = [], max, dbpool}).
-define(SAVE_BATCH, 100).
start_link(DBPool) ->
Max = config:get(max_download_cache, 100),
gen_server:start_link({local, srv_name()}, ?MODULE, [DBPool, Max], []).
stop() ->
gen_server:cast(srv_name(), stop).
insert(Doc) ->
gen_server:cast(srv_name(), {insert, Doc}).
get_one() ->
gen_server:call(srv_name(), get_one, infinity).
srv_name() ->
?MODULE.
%
init([DBPool, Max]) ->
{ok, #state{max = Max, dbpool = DBPool}}.
terminate(_, State) ->
#state{dbpool = DBPool, cache = Cache} = State,
check_save(DBPool, Cache, 0),
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast({insert, Doc}, State) ->
#state{dbpool = DBPool, cache = Cache, max = Max} = State,
NewCache = check_save(DBPool, [Doc|Cache], Max),
{noreply, State#state{cache = NewCache}};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(get_one, _From, State) ->
#state{dbpool = DBPool, cache = Cache} = State,
{Doc, NewCache} = try_load(DBPool, Cache),
{reply, Doc, State#state{cache = NewCache}};
handle_call(_, _From, State) ->
{noreply, State}.
handle_info(_, State) ->
{noreply, State}.
check_save(DBPool, Cache, Max) when length(Cache) >= Max ->
SplitCnt = 2 * Max div 3,
{Remain, ToSave} = lists:split(SplitCnt, Cache),
?T(?FMT("download_cache reached the max, save 1/3 ~p", [length(ToSave)])),
do_save(DBPool, ToSave),
Remain;
check_save(_, Cache, _) ->
Cache.
do_save(_DBPool, []) ->
ok;
do_save(DBPool, Docs) ->
Insert = fun(Doc) ->
Conn = mongo_pool:get(DBPool),
{ID} = bson:lookup('_id', Doc),
Sel = {'_id', ID},
mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
mongo:update(?HASH_DOWNLOAD_COLL, Sel, Doc, true)
end)
end,
[Insert(Doc) || Doc <- Docs].
try_load(DBPool, []) ->
?T("download_cache empty, load hash from db"),
Conn = mongo_pool:get(DBPool),
{Doc} = hash_reader_common:load_delete_doc(Conn, ?HASH_DOWNLOAD_COLL),
{Doc, []};
try_load(_, [First|Rest]) ->
{First, Rest}.

@ -106,7 +106,7 @@ handle_info(M, State) ->
handle_cast({process_hash, Doc, DownloadDoc}, State) ->
Conn = db_conn(State),
{Hash} = bson:lookup(hash, Doc),
{Hash} = bson:lookup('_id', Doc), % lookup(hash, Doc)
ReqCnt = get_req_cnt(Doc),
ListHash = binary_to_list(Hash),
?T(?FMT("process a hash ~s download-doc ~p", [ListHash, DownloadDoc])),
@ -214,7 +214,7 @@ got_torrent(State, Hash, multi, {Root, Files}) ->
% insert the doc to the `wait-download' collection, and when the
% downloader is free, it will download this doc.
insert_to_download_wait(Conn, Doc) ->
{ID} = bson:lookup('_id', Doc),
{ID} = bson:lookup('_id', Doc), % lookup(hash, Doc)
Sel = {'_id', ID},
mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
% may exist already
@ -229,7 +229,7 @@ check_in_index_cache(_, {}) ->
timer:send_after(?WAIT_TIME, timeout),
empty;
check_in_index_cache(Conn, {Doc}) ->
{Hash} = bson:lookup(hash, Doc),
{Hash} = bson:lookup('_id', Doc), % lookup(hash, Doc)
ListHash = binary_to_list(Hash),
Try = should_try_download(config:get(check_cache, false), Conn, ListHash),
case Try of

@ -0,0 +1,71 @@
%%
%% hash_reader.erl
%% Kevin Lynx
%% 07.21.2013
%%
-module(hash_reader2).
-include("vlog.hrl").
-include("db_common.hrl").
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([start_link/1]).
-define(WAIT_TIME, 1*60*1000).
-record(state, {dbpool}).
start_link(DBPool) ->
gen_server:start_link(?MODULE, [DBPool], []).
init([DBPool]) ->
hash_download:start_link(DBPool),
{ok, #state{dbpool = DBPool}, 0}.
terminate(_, State) ->
{ok, State}.
code_change(_, _, State) ->
{ok, State}.
handle_cast({process_hash, Doc}, State) ->
{BHash} = bson:lookup('_id', Doc),
Hash = binary_to_list(BHash),
ReqCnt = hash_reader_common:get_req_cnt(Doc),
Conn = db_conn(State),
case db_store_mongo:inc_announce(Conn, Hash, ReqCnt) of
true ->
hash_reader_common:on_updated(Conn);
false ->
?T(?FMT("insert doc ~s to download_cache", [Hash])),
hash_download_cache:insert(Doc)
end,
schedule_next(Conn),
{noreply, State};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_call(_, _From, State) ->
{noreply, State}.
handle_info(timeout, State) ->
Conn = db_conn(State),
schedule_next(Conn),
{noreply, State}.
schedule_next(Conn) ->
case hash_reader_common:load_delete_doc(Conn, ?HASH_COLLNAME) of
{} ->
?T("start to wait for new hash"),
timer:send_after(?WAIT_TIME, timeout);
{Doc} ->
gen_server:cast(self(), {process_hash, Doc})
end.
db_conn(State) ->
#state{dbpool = DBPool} = State,
mongo_pool:get(DBPool).

@ -0,0 +1,34 @@
%%
%% hash_reader_common.erl
%% Kevin Lynx
%% 07.21.2013
%%
-module(hash_reader_common).
-include("db_common.hrl").
-export([get_req_cnt/1,
on_updated/1,
load_delete_doc/2]).
get_req_cnt(Doc) ->
case bson:lookup(req_cnt, Doc) of
{} -> 0;
{R} -> R
end.
on_updated(Conn) ->
% `get_peers' here means we have processed a request
db_system:stats_get_peers(Conn),
% also increase the updated counter
db_system:stats_updated(Conn),
hash_reader_stats:handle_update().
load_delete_doc(Conn, Col) ->
Cmd = {findAndModify, Col, fields, {}, remove, true},
Ret = mongo:do(safe, master, Conn, ?HASH_DBNAME, fun() ->
mongo:command(Cmd)
end),
case Ret of
{value, undefined, ok, 1.0} -> {};
{value, Obj, lastErrorObject, _, ok, 1.0} -> {Obj}
end.

@ -36,11 +36,12 @@ start_standalone(IP, Port, Size) ->
config:start_link("hash_reader.config", fun() -> config_default() end),
tor_name_seg:init(),
% NOTE:
DownloadCache = {hash_download_cache, {hash_download_cache, start_link, [?DBPOOLNAME]}, permanent, 2000, worker, [hash_download_cache]},
Stats = {hash_reader_stats, {hash_reader_stats, start_link, [Size]}, permanent, 2000, worker, [hash_reader_stats]},
DownloadStats = {tor_download_stats, {tor_download_stats, start_link, []}, permanent, 2000, worker, [tor_download_stats]},
Log = {vlog, {vlog, start_link, ["log/hash_reader.log", 3]}, permanent, 2000, worker, [vlog]},
DBDateRange = {db_daterange, {db_daterange, start_link, [?DBPOOLNAME]}, permanent, 1000, worker, [db_daterange]},
start_link(IP, Port, Size, [Log, DBDateRange, DownloadStats, Stats]).
start_link(IP, Port, Size, [Log, DownloadCache, DBDateRange, DownloadStats, Stats]).
start_link(IP, Port, Size) ->
start_link(IP, Port, Size, []).
@ -62,7 +63,7 @@ init([PoolName, Size, OtherProcess]) ->
{ok, {Spec, Children}}.
create_child(PoolName, Index) ->
{child_id(Index), {hash_reader, start_link, [PoolName]},
{child_id(Index), {hash_reader2, start_link, [PoolName]},
permanent, 1000, worker, dynamic}.
child_id(Index) ->
@ -75,5 +76,6 @@ config_default() ->
{load_from_db, false},
{text_seg, simple},
{check_cache, false},
{max_download_cache, 100},
{max_download_per_reader, 100},
{torrent_path, "torrents/"}].

@ -1,3 +1,16 @@
## 07.21.2013
* rewrite hash_reader, now it will keep a wait_download cache
* change hash_writer(crawler) to insert unique hash
## 07.19.2013
* add simple json searhch api to http
## 07.15.2013
* crawler now will keep a hash cache, merge same hash in the cache, this makes hash_reader process less hashes
## 07.08.2013
* add torrent importer which can import local torrents into torrents database

Loading…
Cancel
Save