[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH 1/8] split: accept new output --filter option
From: |
Jim Meyering |
Subject: |
[PATCH 1/8] split: accept new output --filter option |
Date: |
Sat, 30 Apr 2011 15:31:02 +0200 |
From: Karl Heuer <address@hidden>
* src/split.c (FIXME): FIXME
---
src/split.c | 211 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------
1 files changed, 188 insertions(+), 23 deletions(-)
diff --git a/src/split.c b/src/split.c
index 65e44dd..7aab357 100644
--- a/src/split.c
+++ b/src/split.c
@@ -25,7 +25,9 @@
#include <assert.h>
#include <stdio.h>
#include <getopt.h>
+#include <signal.h>
#include <sys/types.h>
+#include <sys/wait.h>
#include "system.h"
#include "error.h"
@@ -35,6 +37,7 @@
#include "full-write.h"
#include "quote.h"
#include "safe-read.h"
+#include "sig2str.h"
#include "xfreopen.h"
#include "xstrtol.h"
@@ -45,6 +48,21 @@
proper_name_utf8 ("Torbjorn Granlund", "Torbj\303\266rn Granlund"), \
proper_name ("Richard M. Stallman")
+/* Shell command to filter through, instead of creating files. */
+static char const *filter_command;
+
+/* Process ID of the filter. */
+static int filter_pid;
+
+/* Array of open pipes. */
+static int *open_pipes;
+static size_t open_pipes_alloc;
+static size_t n_open_pipes;
+
+/* Blocked signals. */
+static sigset_t oldblocked;
+static sigset_t newblocked;
+
/* Base name of output files. */
static char const *outbase;
@@ -103,6 +121,7 @@ static struct option const longopts[] =
{"unbuffered", no_argument, NULL, 'u'},
{"suffix-length", required_argument, NULL, 'a'},
{"numeric-suffixes", no_argument, NULL, 'd'},
+ {"filter", required_argument, NULL, 'f'},
{"verbose", no_argument, NULL, VERBOSE_OPTION},
{"-io-blksize", required_argument, NULL,
IO_BLKSIZE_OPTION}, /* do not document */
@@ -170,6 +189,7 @@ Mandatory arguments to long options are mandatory for short
options too.\n\
-C, --line-bytes=SIZE put at most SIZE bytes of lines per output file\n\
-d, --numeric-suffixes use numeric suffixes instead of alphabetic\n\
-e, --elide-empty-files do not generate empty output files with `-n'\n\
+ -f, --filter=COMMAND write to shell COMMAND; file name is $FILE\n\
-l, --lines=NUMBER put NUMBER lines per output file\n\
-n, --number=CHUNKS generate CHUNKS output files. See below\n\
-u, --unbuffered immediately copy input to output with `-n r/...'\n\
@@ -256,10 +276,116 @@ next_file_name (void)
static int
create (const char *name)
{
- if (verbose)
- fprintf (stdout, _("creating file %s\n"), quote (name));
- return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
- (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH));
+ if (!filter_command)
+ {
+ if (verbose)
+ fprintf (stdout, _("creating file %s\n"), quote (name));
+ return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
+ (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH |
S_IWOTH));
+ }
+ else
+ {
+ int fd_pair[2];
+ pid_t child_pid;
+ char const *shell_prog = getenv ("SHELL");
+ if (shell_prog == NULL)
+ shell_prog = "/bin/sh";
+ if (setenv ("FILE", name, 1) != 0)
+ error (EXIT_FAILURE, errno, "cannot set environment variable");
+ if (verbose)
+ fprintf (stdout, _("executing with FILE=%s\n"), quote (name));
+ if (pipe (fd_pair) != 0)
+ error (EXIT_FAILURE, errno, _("cannot create pipe"));
+ child_pid = fork ();
+ if (child_pid == 0)
+ {
+ /* This is the child process. If an error occurs here, the
+ parent will eventually learn about it after doing a wait,
+ at which time it will emit its own error message. */
+ int j;
+ /* We have to close any pipes that were opened during an
+ earlier call, otherwise this process will be holding a
+ write-pipe that will prevent the earlier process from
+ reading an EOF on the corresponding read-pipe. */
+ for (j = 0; j < n_open_pipes; ++j)
+ if (close (open_pipes[j]) != 0)
+ error (EXIT_FAILURE, errno, _("closing prior pipe"));
+ if (close (fd_pair[1]))
+ error (EXIT_FAILURE, errno, _("closing output pipe"));
+ if (fd_pair[0] != STDIN_FILENO)
+ {
+ if (dup2 (fd_pair[0], STDIN_FILENO) != STDIN_FILENO)
+ error (EXIT_FAILURE, errno, _("moving input pipe"));
+ if (close (fd_pair[0]) != 0)
+ error (EXIT_FAILURE, errno, _("closing input pipe"));
+ }
+ sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+ execl (shell_prog, last_component (shell_prog), "-c",
+ filter_command, (char *) NULL);
+ error (EXIT_FAILURE, errno, "%s", shell_prog);
+ }
+ if (child_pid == -1)
+ error (EXIT_FAILURE, errno, _("cannot spawn new process"));
+ if (close (fd_pair[0]) != 0)
+ error (EXIT_FAILURE, errno, _("closing input pipe"));
+ filter_pid = child_pid;
+ if (n_open_pipes == open_pipes_alloc)
+ open_pipes = x2nrealloc (open_pipes, &open_pipes_alloc,
+ sizeof *open_pipes);
+ open_pipes[n_open_pipes++] = fd_pair[1];
+ return fd_pair[1];
+ }
+}
+
+/* Close the output file, and do any associated cleanup.
+ If FP and FD are both specified, they refer to the same open file;
+ in this case FP is closed, but FD is still used in cleanup. */
+static void
+closeout (FILE *fp, int fd, pid_t pid, char const *name)
+{
+ if (fp != NULL && fclose (fp) != 0 && errno != EPIPE)
+ error (EXIT_FAILURE, errno, "%s", name);
+ if (fd >= 0)
+ {
+ if (fp == NULL && close (fd) < 0)
+ error (EXIT_FAILURE, errno, "%s", name);
+ int j;
+ for (j = 0; j < n_open_pipes; ++j)
+ if (open_pipes[j] == fd) {
+ open_pipes[j] = open_pipes[--n_open_pipes];
+ break;
+ }
+ }
+ if (pid > 0)
+ {
+ int wstatus = 0;
+ if (waitpid (pid, &wstatus, 0) == -1 && errno != ECHILD)
+ error (EXIT_FAILURE, errno, _("waiting for child process"));
+ if (WIFSIGNALED (wstatus))
+ {
+ int sig = WTERMSIG (wstatus);
+ if (sig != SIGPIPE)
+ {
+ char signame[SIG2STR_MAX+1];
+ if (sig2str (sig, signame) != 0)
+ sprintf (signame, "%d", sig);
+ error (sig + 128, 0,
+ _("with FILE=%s, signal %s (%s) from command: %s"),
+ name, signame, strsignal (sig), filter_command);
+ }
+ }
+ else if (WIFEXITED (wstatus))
+ {
+ int ex = WEXITSTATUS (wstatus);
+ if (ex != 0)
+ error (ex, 0, _("with FILE=%s, exit %d from command: %s"),
+ name, ex, filter_command);
+ }
+ else
+ /* shouldn't happen. */
+ error (EXIT_FAILURE, 0,
+ _("unknown status from command (0x%X)"), wstatus);
+ }
}
/* Write BYTES bytes at BP to an output file.
@@ -273,13 +399,12 @@ cwrite (bool new_file_flag, const char *bp, size_t bytes)
{
if (!bp && bytes == 0 && elide_empty_files)
return;
- if (output_desc >= 0 && close (output_desc) < 0)
- error (EXIT_FAILURE, errno, "%s", outfile);
+ closeout (NULL, output_desc, filter_pid, outfile);
next_file_name ();
if ((output_desc = create (outfile)) < 0)
error (EXIT_FAILURE, errno, "%s", outfile);
}
- if (full_write (output_desc, bp, bytes) != bytes)
+ if (full_write (output_desc, bp, bytes) != bytes && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", outfile);
}
@@ -501,7 +626,8 @@ lines_chunk_split (uintmax_t k, uintmax_t n, char *buf,
size_t bufsize,
/* We don't use the stdout buffer here since we're writing
large chunks from an existing file, so it's more efficient
to write out directly. */
- if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
+ if (full_write (STDOUT_FILENO, bp, to_write) != to_write
+ && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
else
@@ -564,7 +690,7 @@ bytes_chunk_extract (uintmax_t k, uintmax_t n, char *buf,
size_t bufsize,
error (EXIT_FAILURE, errno, "%s", infile);
else if (n_read == 0)
break; /* eof. */
- if (full_write (STDOUT_FILENO, buf, n_read) != n_read)
+ if (full_write (STDOUT_FILENO, buf, n_read) != n_read && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", quote ("-"));
start += n_read;
}
@@ -575,6 +701,7 @@ typedef struct of_info
char *of_name;
int ofd;
FILE *ofile;
+ int opid;
} of_t;
enum
@@ -637,14 +764,17 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles)
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
}
- if (fclose (files[i_reopen].ofile) != 0)
+ if (fclose (files[i_reopen].ofile) != 0 && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name);
+ files[i_reopen].ofile = NULL;
files[i_reopen].ofd = OFD_APPEND;
}
files[i_check].ofd = fd;
if (!(files[i_check].ofile = fdopen (fd, "a")))
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
+ files[i_check].opid = filter_pid;
+ filter_pid = 0;
}
return file_limit;
@@ -658,6 +788,7 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles)
static void
lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize)
{
+ bool wrapped = false;
bool file_limit;
size_t i_file;
of_t *files IF_LINT (= NULL);
@@ -678,6 +809,7 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
files[i_file].of_name = xstrdup (outfile);
files[i_file].ofd = OFD_NEW;
files[i_file].ofile = NULL;
+ files[i_file].opid = 0;
}
i_file = 0;
file_limit = false;
@@ -715,10 +847,12 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
{
if (line_no == k && unbuffered)
{
- if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
+ if (full_write (STDOUT_FILENO, bp, to_write) != to_write
+ && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
- else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1)
+ else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1
+ && errno != EPIPE)
{
clearerr (stdout); /* To silence close_stdout(). */
error (EXIT_FAILURE, errno, "%s", _("write error"));
@@ -734,19 +868,25 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
{
/* Note writing to fd, rather than flushing the FILE gives
an 8% performance benefit, due to reduced data copying.
*/
- if (full_write (files[i_file].ofd, bp, to_write) != to_write)
+ if (full_write (files[i_file].ofd, bp, to_write) != to_write
+ && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
}
- else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1)
+ else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1
+ && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
if (file_limit)
{
- if (fclose (files[i_file].ofile) != 0)
+ if (fclose (files[i_file].ofile) != 0 && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+ files[i_file].ofile = NULL;
files[i_file].ofd = OFD_APPEND;
}
if (next && ++i_file == n)
- i_file = 0;
+ {
+ wrapped = true;
+ i_file = 0;
+ }
}
bp = bp_out;
@@ -757,11 +897,18 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
and to signal any waiting fifo consumers.
Also, close any open file descriptors.
FIXME: Should we do this before EXIT_FAILURE? */
- for (i_file = 0; !k && !elide_empty_files && i_file < n; i_file++)
+ if (!k)
{
- file_limit |= ofile_open (files, i_file, n);
- if (fclose (files[i_file].ofile) != 0)
- error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+ int ceiling = (wrapped ? n : i_file);
+ for (i_file = 0; i_file < n; i_file++)
+ {
+ if (i_file >= ceiling && !elide_empty_files)
+ file_limit |= ofile_open (files, i_file, n);
+ if (files[i_file].ofd >= 0)
+ closeout (files[i_file].ofile, files[i_file].ofd,
+ files[i_file].opid, files[i_file].of_name);
+ files[i_file].ofd = OFD_APPEND;
+ }
}
}
@@ -824,7 +971,8 @@ main (int argc, char **argv)
int this_optind = optind ? optind : 1;
char *slash;
- c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u", longopts, NULL);
+ c = getopt_long (argc, argv, "0123456789C:a:b:def:l:n:u",
+ longopts, NULL);
if (c == -1)
break;
@@ -955,6 +1103,10 @@ main (int argc, char **argv)
elide_empty_files = true;
break;
+ case 'f':
+ filter_command = optarg;
+ break;
+
case IO_BLKSIZE_OPTION:
{
uintmax_t tmp_blk_size;
@@ -1048,6 +1200,18 @@ main (int argc, char **argv)
buf = ptr_align (xmalloc (in_blk_size + 1 + page_size - 1), page_size);
+ /* When filtering, closure of one pipe must not terminate the process,
+ as there may still be other streams expecting input from us. */
+ sigemptyset (&newblocked);
+ if (filter_command)
+ {
+ struct sigaction act;
+ sigaction (SIGPIPE, NULL, &act);
+ if (act.sa_handler != SIG_IGN)
+ sigaddset (&newblocked, SIGPIPE);
+ }
+ sigprocmask (SIG_BLOCK, &newblocked, &oldblocked);
+
switch (split_type)
{
case type_digits:
@@ -1084,10 +1248,11 @@ main (int argc, char **argv)
abort ();
}
+ sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+
if (close (STDIN_FILENO) != 0)
error (EXIT_FAILURE, errno, "%s", infile);
- if (output_desc >= 0 && close (output_desc) < 0)
- error (EXIT_FAILURE, errno, "%s", outfile);
+ closeout (NULL, output_desc, filter_pid, outfile);
exit (EXIT_SUCCESS);
}
--
1.7.5.134.g1c08b
- [PATCH 3/8] split: add tests, (continued)
- [PATCH 3/8] split: add tests, Jim Meyering, 2011/04/30
- [PATCH 6/8] strip: style: use braces around 1-stmt-but-multi-line blocks, Jim Meyering, 2011/04/30
- [PATCH 2/8] split: remove short-named -f option; improve diagnostics, Jim Meyering, 2011/04/30
- [PATCH 5/8] split: mark a string for translation, Jim Meyering, 2011/04/30
- [PATCH 8/8] doc: document split's new --filter=CMD option, Jim Meyering, 2011/04/30
- [PATCH 1/8] split: accept new output --filter option,
Jim Meyering <=