>From 45a36e5c58199cd84144c42f401d51592f044768 Mon Sep 17 00:00:00 2001 From: Assaf Gordon Date: Wed, 6 Mar 2013 18:25:49 -0500 Subject: [PATCH 2/2] shuf: use reservoir-sampling when possible Reservoir Sampling enables selecting K random lines from a very large (or unknown-sized) input: http://en.wikipedia.org/wiki/Reservoir_sampling * src/shuf.c: Use reservoir-sampling when the number of output lines is known (by using '-n X' parameter). read_input_reservoir_sampling() - read lines from input file, and keep only K lines in memory, replacing lines with decreasing probability. write_permuted_output_reservoir() - output permuted reservoir lines. main() - if the number of lines is known, use reservoir-sampling instead of reading entire input file. * tests/misc/shuf-reservoir.sh - a very_expensive test using valgrind to exercise the reservoir-sampling code. * tests/local.mk - add new tests. --- src/shuf.c | 131 ++++++++++++++++++++++++++++++++++++++++-- tests/local.mk | 1 + tests/misc/shuf-reservoir.sh | 75 ++++++++++++++++++++++++ 3 files changed, 202 insertions(+), 5 deletions(-) create mode 100755 tests/misc/shuf-reservoir.sh diff --git a/src/shuf.c b/src/shuf.c index 71ac3e6..91b475b 100644 --- a/src/shuf.c +++ b/src/shuf.c @@ -25,6 +25,7 @@ #include "error.h" #include "fadvise.h" #include "getopt.h" +#include "linebuffer.h" #include "quote.h" #include "quotearg.h" #include "randint.h" @@ -38,6 +39,10 @@ #define AUTHORS proper_name ("Paul Eggert") +/* For reservoir-sampling, allocate the resevoir lines in batches */ +enum { RESERVOIR_LINES_INCREMENT = 1000 }; + + void usage (int status) { @@ -135,6 +140,90 @@ next_line (char *line, char eolbyte, size_t n) return p + 1; } +static size_t +read_input_reservoir_sampling (FILE *in, char eolbyte, size_t k, + struct randint_source *s, + struct linebuffer **out_rsrv) +{ + size_t n_lines=0; + size_t n_alloc_lines = MIN (k,RESERVOIR_LINES_INCREMENT); + struct linebuffer *line = NULL ; + struct linebuffer *rsrv ; + + rsrv = xzalloc (n_alloc_lines * sizeof (struct linebuffer)); + + /* Fill the first K lines, directly into the reservoir */ + while ( n_lines < k + && (line=readlinebuffer_delim (&rsrv[n_lines], in, eolbyte))!=NULL ) + { + ++n_lines; + + /* Enlarge reservoir */ + if (n_lines >= n_alloc_lines) + { + n_alloc_lines += RESERVOIR_LINES_INCREMENT ; + rsrv = xrealloc (rsrv, n_alloc_lines * sizeof (struct linebuffer)); + memset (&rsrv[n_lines], 0, + RESERVOIR_LINES_INCREMENT * sizeof (struct linebuffer)); + } + } + + + /* last line wasn't NULL - so there are more lines to read */ + if (line != NULL) + { + struct linebuffer dummy; + initbuffer (&dummy); /* space for lines that won't be put in reservoir */ + + /* Choose the fate of the next line, with decreasing probability (as + n_lines increases in size). + + If the line will be used, store it directly in the reservoir. + Otherwise, store it in dummy space. + + With 'struct linebuffer', storing into existing buffer will reduce + re-allocations (will only re-allocate if the new line is longer than + the currently allocated space. */ + randint j = randint_choose (s, n_lines+1); + line = ( j < k ) ? (&rsrv[j]) : (&dummy) ; + + while ( readlinebuffer_delim (line, in, eolbyte)!=NULL ) + { + n_lines++; + + /* Choose the fate of the next line, see comment above */ + j = randint_choose (s, n_lines+1); + line = ( j < k ) ? (&rsrv[j]) : (&dummy) ; + } + + freebuffer (&dummy); + } + + /* no more input lines, or an input error */ + if (ferror (in)) + error (EXIT_FAILURE, errno, _("read error")); + + *out_rsrv = rsrv; + return MIN (k, n_lines); +} + +static int +write_permuted_output_reservoir (size_t n_lines, struct linebuffer *lines, + size_t const *permutation) +{ + size_t i; + + for (i = 0; i < n_lines; i++) + { + const struct linebuffer *p = &lines[permutation[i]]; + if (fwrite (p->buffer, sizeof (char), + p->length, stdout) != p->length) + return -1; + } + + return 0; +} + /* Read data from file IN. Input lines are delimited by EOLBYTE; silently append a trailing EOLBYTE if the file ends in some other byte. Store a pointer to the resulting array of lines into *PLINE. @@ -209,14 +298,17 @@ main (int argc, char **argv) char *random_source = NULL; char eolbyte = '\n'; char **input_lines = NULL; + bool use_reservoir_sampling = false; int optc; int n_operands; char **operand; size_t n_lines; - char **line; + char **line = NULL; + struct linebuffer *reservoir = NULL; struct randint_source *randint_source; size_t *permutation; + int i; initialize_main (&argc, &argv); set_program_name (argv[0]); @@ -341,8 +433,16 @@ main (int argc, char **argv) fadvise (stdin, FADVISE_SEQUENTIAL); - n_lines = read_input (stdin, eolbyte, &input_lines); - line = input_lines; + if (head_lines != SIZE_MAX) + { + use_reservoir_sampling = true; + n_lines = SIZE_MAX; /* unknown number of input lines, for now */ + } + else + { + n_lines = read_input (stdin, eolbyte, &input_lines); + line = input_lines; + } } head_lines = MIN (head_lines, n_lines); @@ -352,6 +452,15 @@ main (int argc, char **argv) if (! randint_source) error (EXIT_FAILURE, errno, "%s", quotearg_colon (random_source)); + if (use_reservoir_sampling) + { + /* Instead of reading the entire file into 'line', + use reservoir-sampling to store just "head_lines" random lines. */ + head_lines = n_lines = read_input_reservoir_sampling (stdin, eolbyte, + head_lines, randint_source, + &reservoir); + } + /* Close stdin now, rather than earlier, so that randint_all_new doesn't have to worry about opening something other than stdin. */ @@ -363,8 +472,13 @@ main (int argc, char **argv) if (outfile && ! freopen (outfile, "w", stdout)) error (EXIT_FAILURE, errno, "%s", quotearg_colon (outfile)); - if (write_permuted_output (head_lines, line, lo_input, permutation, eolbyte) - != 0) + + if (use_reservoir_sampling) + i = write_permuted_output_reservoir (n_lines, reservoir, permutation); + else + i = write_permuted_output (head_lines, line, lo_input, + permutation, eolbyte); + if (i != 0) error (EXIT_FAILURE, errno, _("write error")); #ifdef lint @@ -375,6 +489,13 @@ main (int argc, char **argv) free (input_lines[0]); free (input_lines); } + if (reservoir) + { + size_t j; + for (j = 0; j < n_lines; ++j) + freebuffer (&reservoir[j]); + free (reservoir); + } #endif return EXIT_SUCCESS; diff --git a/tests/local.mk b/tests/local.mk index d3923f8..b71bd3f 100644 --- a/tests/local.mk +++ b/tests/local.mk @@ -314,6 +314,7 @@ all_tests = \ tests/misc/shred-remove.sh \ tests/misc/shuf.sh \ tests/misc/shuf-randomness.sh \ + tests/misc/shuf-reservoir.sh \ tests/misc/sort.pl \ tests/misc/sort-benchmark-random.sh \ tests/misc/sort-compress.sh \ diff --git a/tests/misc/shuf-reservoir.sh b/tests/misc/shuf-reservoir.sh new file mode 100755 index 0000000..60f5a74 --- /dev/null +++ b/tests/misc/shuf-reservoir.sh @@ -0,0 +1,75 @@ +#!/bin/sh +# Exercise shuf's reservoir-sampling code +# NOTE: +# These tests do not check valid randomness, +# they just check memory allocation related code. + +# Copyright (C) 2013 Free Software Foundation, Inc. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +. "${srcdir=.}/tests/init.sh"; path_prepend_ ./src +print_ver_ shuf +very_expensive_ +require_valgrind_ +getlimits_ + +# Run "shuf" with specific number of input lines and output lines +# The output must match the expected (pre-calculated) output. +run_shuf_n() +{ + INPUT_LINES="$1" + OUTPUT_LINES="$2" + + # Critical memory-related bugs will cause a segfault here + # (with varying numbres of input/output lines) + seq "$INPUT_LINES" | \ + valgrind --leak-check=full --error-exitcode=1 shuf -n "$OUTPUT_LINES" \ + -o "out_${INPUT_LINES}_${OUTPUT_LINES}" || return 1 + + EXPECTED_LINES="$OUTPUT_LINES" + test "$INPUT_LINES" -lt "$OUTPUT_LINES" && EXPECTED_LINES="$INPUT_LINES" + + # There is no sure way to verify shuffled output (as it is random). + # Ensure we got enough lines, they are all numeric, non-empty, + # and not duplicated (all would indicate a bug). + LINES=$(cat "out_${INPUT_LINES}_${OUTPUT_LINES}" | \ + grep "^[0-9][0-9]*$" | \ + LC_ALL=C sort -n | \ + uniq | wc -l) || framework_failure_ + + test "$EXPECTED_LINES" -eq "$LINES" || return 1 + + return 0 +} + +fail=0 + +# Test multiple combinations of input lines and output lines. +# (e.g. small number of input lines and large number of output lines, +# and vice-versa. Also, each reservoir allocation uses a 1000-lines batch, +# so test 999/1000/1001 and related values). +TEST_LINES="0 1 5 999 1000 1001 2999 3000 3001" + +for IN_N in $TEST_LINES +do + for OUT_N in $TEST_LINES + do + run_shuf_n "$IN_N" "$OUT_N" || + { fail=1 ; echo "shuf-reservoir-sampling failed with "\ + "IN_N=$IN_N OUT_N=$OUT_N" 1>&2 ; } + done +done + +Exit $fail -- 1.7.7.4