|
|
|
@ -1,23 +1,23 @@
|
|
|
|
|
/// `fingertips` creates an inverted index for a set of text files.
|
|
|
|
|
///
|
|
|
|
|
/// Most of the actual work is done by the modules `index`, `read`, `write`,
|
|
|
|
|
/// and `merge`. In this file, `main.rs`, we put the pieces together in two
|
|
|
|
|
/// different ways.
|
|
|
|
|
///
|
|
|
|
|
/// * `run_single_threaded` simply does everything in one thread, in
|
|
|
|
|
/// the most straightforward possible way.
|
|
|
|
|
///
|
|
|
|
|
/// * Then, we break the work into a five-stage pipeline so that we can run
|
|
|
|
|
/// it on multiple CPUs. `run_pipeline` puts the five stages together.
|
|
|
|
|
///
|
|
|
|
|
/// The `main` function at the end handles command-line arguments. It calls one
|
|
|
|
|
/// of the two functions above to do the work.
|
|
|
|
|
//! `fingertips` creates an inverted index for a set of text files.
|
|
|
|
|
//!
|
|
|
|
|
//! Most of the actual work is done by the modules `index`, `read`, `write`,
|
|
|
|
|
//! and `merge`. In this file, `main.rs`, we put the pieces together in two
|
|
|
|
|
//! different ways.
|
|
|
|
|
//!
|
|
|
|
|
//! * `run_single_threaded` simply does everything in one thread, in
|
|
|
|
|
//! the most straightforward possible way.
|
|
|
|
|
//!
|
|
|
|
|
//! * Then, we break the work into a five-stage pipeline so that we can run
|
|
|
|
|
//! it on multiple CPUs. `run_pipeline` puts the five stages together.
|
|
|
|
|
//!
|
|
|
|
|
//! The `main` function at the end handles command-line arguments. It calls one
|
|
|
|
|
//! of the two functions above to do the work.
|
|
|
|
|
|
|
|
|
|
mod index;
|
|
|
|
|
mod read;
|
|
|
|
|
mod write;
|
|
|
|
|
mod merge;
|
|
|
|
|
mod read;
|
|
|
|
|
mod tmp;
|
|
|
|
|
mod write;
|
|
|
|
|
|
|
|
|
|
use std::fs::File;
|
|
|
|
|
use std::io;
|
|
|
|
@ -25,18 +25,15 @@ use std::io::prelude::*;
|
|
|
|
|
use std::path::{Path, PathBuf};
|
|
|
|
|
use std::sync::mpsc::{channel, Receiver};
|
|
|
|
|
use std::thread::{spawn, JoinHandle};
|
|
|
|
|
use argparse::{ArgumentParser, StoreTrue, Collect};
|
|
|
|
|
|
|
|
|
|
use crate::index::InMemoryIndex;
|
|
|
|
|
use crate::write::write_index_to_tmp_file;
|
|
|
|
|
use crate::merge::FileMerge;
|
|
|
|
|
use crate::tmp::TmpDir;
|
|
|
|
|
use crate::write::write_index_to_tmp_file;
|
|
|
|
|
|
|
|
|
|
/// Create an inverted index for the given list of `documents`,
|
|
|
|
|
/// storing it in the specified `output_dir`.
|
|
|
|
|
fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
|
|
|
|
|
-> io::Result<()>
|
|
|
|
|
{
|
|
|
|
|
fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf) -> io::Result<()> {
|
|
|
|
|
// If all the documents fit comfortably in memory, we'll create the whole
|
|
|
|
|
// index in memory.
|
|
|
|
|
let mut accumulated_index = InMemoryIndex::new();
|
|
|
|
@ -54,7 +51,7 @@ fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
|
|
|
|
|
// ...load it into memory...
|
|
|
|
|
let mut f = File::open(filename)?;
|
|
|
|
|
let mut text = String::new();
|
|
|
|
|
f.read_to_string(&mut text)?;
|
|
|
|
|
_ = f.read_to_string(&mut text)?;
|
|
|
|
|
|
|
|
|
|
// ...and add its contents to the in-memory `accumulated_index`.
|
|
|
|
|
let index = InMemoryIndex::from_single_document(doc_id, text);
|
|
|
|
@ -83,16 +80,16 @@ fn run_single_threaded(documents: Vec<PathBuf>, output_dir: PathBuf)
|
|
|
|
|
/// This returns a pair of values: a receiver that receives the documents, as
|
|
|
|
|
/// Strings; and a `JoinHandle` that can be used to wait for this thread to
|
|
|
|
|
/// exit and to get the `io::Error` value if anything goes wrong.
|
|
|
|
|
fn start_file_reader_thread(documents: Vec<PathBuf>)
|
|
|
|
|
-> (Receiver<String>, JoinHandle<io::Result<()>>)
|
|
|
|
|
{
|
|
|
|
|
fn start_file_reader_thread(
|
|
|
|
|
documents: Vec<PathBuf>,
|
|
|
|
|
) -> (Receiver<String>, JoinHandle<io::Result<()>>) {
|
|
|
|
|
let (sender, receiver) = channel();
|
|
|
|
|
|
|
|
|
|
let handle = spawn(move || {
|
|
|
|
|
for filename in documents {
|
|
|
|
|
let mut f = File::open(filename)?;
|
|
|
|
|
let mut text = String::new();
|
|
|
|
|
f.read_to_string(&mut text)?;
|
|
|
|
|
_ = f.read_to_string(&mut text)?;
|
|
|
|
|
|
|
|
|
|
if sender.send(text).is_err() {
|
|
|
|
|
break;
|
|
|
|
@ -113,9 +110,9 @@ fn start_file_reader_thread(documents: Vec<PathBuf>)
|
|
|
|
|
/// receiver, the sequence of in-memory indexes; and a `JoinHandle` that can be
|
|
|
|
|
/// used to wait for this thread to exit. This stage of the pipeline is
|
|
|
|
|
/// infallible (it performs no I/O, so there are no possible errors).
|
|
|
|
|
fn start_file_indexing_thread(texts: Receiver<String>)
|
|
|
|
|
-> (Receiver<InMemoryIndex>, JoinHandle<()>)
|
|
|
|
|
{
|
|
|
|
|
fn start_file_indexing_thread(
|
|
|
|
|
texts: Receiver<String>,
|
|
|
|
|
) -> (Receiver<InMemoryIndex>, JoinHandle<()>) {
|
|
|
|
|
let (sender, receiver) = channel();
|
|
|
|
|
|
|
|
|
|
let handle = spawn(move || {
|
|
|
|
@ -143,9 +140,9 @@ fn start_file_indexing_thread(texts: Receiver<String>)
|
|
|
|
|
/// merging the input indexes; and a `JoinHandle` that can be used to wait for
|
|
|
|
|
/// this thread to exit. This stage of the pipeline is infallible (it performs
|
|
|
|
|
/// no I/O).
|
|
|
|
|
fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>)
|
|
|
|
|
-> (Receiver<InMemoryIndex>, JoinHandle<()>)
|
|
|
|
|
{
|
|
|
|
|
fn start_in_memory_merge_thread(
|
|
|
|
|
file_indexes: Receiver<InMemoryIndex>,
|
|
|
|
|
) -> (Receiver<InMemoryIndex>, JoinHandle<()>) {
|
|
|
|
|
let (sender, receiver) = channel();
|
|
|
|
|
|
|
|
|
|
let handle = spawn(move || {
|
|
|
|
@ -175,10 +172,10 @@ fn start_in_memory_merge_thread(file_indexes: Receiver<InMemoryIndex>)
|
|
|
|
|
/// This returns a pair: a receiver that receives the filenames; and a
|
|
|
|
|
/// `JoinHandle` that can be used to wait for this thread to exit and receive
|
|
|
|
|
/// any I/O errors it encountered.
|
|
|
|
|
fn start_index_writer_thread(big_indexes: Receiver<InMemoryIndex>,
|
|
|
|
|
output_dir: &Path)
|
|
|
|
|
-> (Receiver<PathBuf>, JoinHandle<io::Result<()>>)
|
|
|
|
|
{
|
|
|
|
|
fn start_index_writer_thread(
|
|
|
|
|
big_indexes: Receiver<InMemoryIndex>,
|
|
|
|
|
output_dir: &Path,
|
|
|
|
|
) -> (Receiver<PathBuf>, JoinHandle<io::Result<()>>) {
|
|
|
|
|
let (sender, receiver) = channel();
|
|
|
|
|
|
|
|
|
|
let mut tmp_dir = TmpDir::new(output_dir);
|
|
|
|
@ -197,9 +194,7 @@ fn start_index_writer_thread(big_indexes: Receiver<InMemoryIndex>,
|
|
|
|
|
|
|
|
|
|
/// Given a sequence of filenames of index data files, merge all the files
|
|
|
|
|
/// into a single index data file.
|
|
|
|
|
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
|
|
|
|
|
-> io::Result<()>
|
|
|
|
|
{
|
|
|
|
|
fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path) -> io::Result<()> {
|
|
|
|
|
let mut merge = FileMerge::new(output_dir);
|
|
|
|
|
for file in files {
|
|
|
|
|
merge.add_file(file)?;
|
|
|
|
@ -213,14 +208,12 @@ fn merge_index_files(files: Receiver<PathBuf>, output_dir: &Path)
|
|
|
|
|
/// On success this does exactly the same thing as `run_single_threaded`, but
|
|
|
|
|
/// faster since it uses multiple CPUs and keeps them busy while I/O is
|
|
|
|
|
/// happening.
|
|
|
|
|
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf)
|
|
|
|
|
-> io::Result<()>
|
|
|
|
|
{
|
|
|
|
|
fn run_pipeline(documents: Vec<PathBuf>, output_dir: PathBuf) -> io::Result<()> {
|
|
|
|
|
// Launch all five stages of the pipeline.
|
|
|
|
|
let (texts, h1) = start_file_reader_thread(documents);
|
|
|
|
|
let (pints, h2) = start_file_indexing_thread(texts);
|
|
|
|
|
let (texts, h1) = start_file_reader_thread(documents);
|
|
|
|
|
let (pints, h2) = start_file_indexing_thread(texts);
|
|
|
|
|
let (gallons, h3) = start_in_memory_merge_thread(pints);
|
|
|
|
|
let (files, h4) = start_index_writer_thread(gallons, &output_dir);
|
|
|
|
|
let (files, h4) = start_index_writer_thread(gallons, &output_dir);
|
|
|
|
|
let result = merge_index_files(files, &output_dir);
|
|
|
|
|
|
|
|
|
|
// Wait for threads to finish, holding on to any errors that they encounter.
|
|
|
|
@ -263,7 +256,7 @@ fn expand_filename_arguments(args: Vec<String>) -> io::Result<Vec<PathBuf>> {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Generate an index for a bunch of text files.
|
|
|
|
|
fn run(filenames: Vec<String>, single_threaded: bool) -> io::Result<()> {
|
|
|
|
|
pub fn run(filenames: Vec<String>, single_threaded: bool) -> io::Result<()> {
|
|
|
|
|
let output_dir = PathBuf::from(".");
|
|
|
|
|
let documents = expand_filename_arguments(filenames)?;
|
|
|
|
|
|
|
|
|
@ -273,27 +266,3 @@ fn run(filenames: Vec<String>, single_threaded: bool) -> io::Result<()> {
|
|
|
|
|
run_pipeline(documents, output_dir)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn main() {
|
|
|
|
|
let mut single_threaded = false;
|
|
|
|
|
let mut filenames = vec![];
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
let mut ap = ArgumentParser::new();
|
|
|
|
|
ap.set_description("Make an inverted index for searching documents.");
|
|
|
|
|
ap.refer(&mut single_threaded)
|
|
|
|
|
.add_option(&["-1", "--single-threaded"], StoreTrue,
|
|
|
|
|
"Do all the work on a single thread.");
|
|
|
|
|
ap.refer(&mut filenames)
|
|
|
|
|
.add_argument("filenames", Collect,
|
|
|
|
|
"Names of files/directories to index. \
|
|
|
|
|
For directories, all .txt files immediately \
|
|
|
|
|
under the directory are indexed.");
|
|
|
|
|
ap.parse_args_or_exit();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match run(filenames, single_threaded) {
|
|
|
|
|
Ok(()) => {}
|
|
|
|
|
Err(err) => println!("error: {}", err)
|
|
|
|
|
}
|
|
|
|
|
}
|