pspp-cvs
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Pspp-cvs] pspp/src data/buffered-reader.c data/buffered-r... [simpler-p


From: Ben Pfaff
Subject: [Pspp-cvs] pspp/src data/buffered-reader.c data/buffered-r... [simpler-proc]
Date: Fri, 04 May 2007 03:48:48 +0000

CVSROOT:        /cvsroot/pspp
Module name:    pspp
Branch:         simpler-proc
Changes by:     Ben Pfaff <blp> 07/05/04 03:48:48

Modified files:
        src/data       : buffered-reader.c buffered-reader.h case.h 
                         casegrouper.c casereader-private.h casereader.c 
                         casereader.h casewindow.c casewindow.h 
                         casewriter-private.h casewriter.c casewriter.h 
                         datasheet.c datasheet.h por-file-reader.c 
                         por-file-writer.c procedure.c procedure.h 
                         scratch-writer.c sys-file-reader.c 
                         sys-file-writer.c 
        src/language   : command.c 
        src/language/data-io: data-list.c data-reader.c get.c inpt-pgm.c 
                              list.q 
        src/language/dictionary: apply-dictionary.c delete-variables.c 
        src/language/stats: aggregate.c autorecode.c binomial.c 
                            chisquare.c chisquare.h crosstabs.q 
                            descriptives.c examine.q flip.c 
                            frequencies.q npar.h npar.q oneway.q rank.q 
                            regression.q sort-cases.c t-test.q 
        src/libpspp    : automake.mk 
        src/math       : levene.c merge.c merge.h sort.c 
Added files:
        src/libpspp    : taint.c taint.h 

Log message:
        Add "taint" objects to simplify error propagation.

CVSWeb URLs:
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/buffered-reader.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/buffered-reader.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/case.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.9.2.2&r2=1.9.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casegrouper.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casereader-private.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casereader.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.4&r2=1.1.2.5
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casereader.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.2&r2=1.1.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casewindow.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.3&r2=1.1.2.4
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casewindow.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casewriter-private.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casewriter.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.2&r2=1.1.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/casewriter.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.2&r2=1.1.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/datasheet.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.11&r2=1.1.2.12
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/datasheet.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.4&r2=1.1.2.5
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/por-file-reader.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.17.2.1&r2=1.17.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/por-file-writer.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.9.2.2&r2=1.9.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/procedure.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.25.2.2&r2=1.25.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/procedure.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.12.2.1&r2=1.12.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/scratch-writer.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.4.2.1&r2=1.4.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/sys-file-reader.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.32.2.1&r2=1.32.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/data/sys-file-writer.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.22.2.1&r2=1.22.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/command.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.23.2.1&r2=1.23.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/data-io/data-list.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.30.2.1&r2=1.30.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/data-io/data-reader.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.20.2.1&r2=1.20.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/data-io/get.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.29.2.2&r2=1.29.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/data-io/inpt-pgm.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.23.2.1&r2=1.23.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/data-io/list.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.24.2.1&r2=1.24.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/dictionary/apply-dictionary.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.13.2.1&r2=1.13.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/dictionary/delete-variables.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/aggregate.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.30.2.1&r2=1.30.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/autorecode.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.18.2.1&r2=1.18.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/binomial.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.2.2.1&r2=1.2.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/chisquare.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.3.2.1&r2=1.3.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/chisquare.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.2.2.1&r2=1.2.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/crosstabs.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.27.2.2&r2=1.27.2.3
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/descriptives.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.22.2.1&r2=1.22.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/examine.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.22.2.1&r2=1.22.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/flip.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.22.2.1&r2=1.22.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/frequencies.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.29.2.1&r2=1.29.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/npar.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.2.2.1&r2=1.2.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/npar.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.3.2.1&r2=1.3.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/oneway.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.21.2.1&r2=1.21.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/rank.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.27.2.1&r2=1.27.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/regression.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.45.2.1&r2=1.45.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/sort-cases.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.8.2.1&r2=1.8.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/language/stats/t-test.q?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.20.2.1&r2=1.20.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/libpspp/automake.mk?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.22.2.8&r2=1.22.2.9
http://cvs.savannah.gnu.org/viewcvs/pspp/src/libpspp/taint.c?cvsroot=pspp&only_with_tag=simpler-proc&rev=1.1.2.1
http://cvs.savannah.gnu.org/viewcvs/pspp/src/libpspp/taint.h?cvsroot=pspp&only_with_tag=simpler-proc&rev=1.1.2.1
http://cvs.savannah.gnu.org/viewcvs/pspp/src/math/levene.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.9.2.1&r2=1.9.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/math/merge.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/math/merge.h?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.1.2.1&r2=1.1.2.2
http://cvs.savannah.gnu.org/viewcvs/pspp/src/math/sort.c?cvsroot=pspp&only_with_tag=simpler-proc&r1=1.23.2.1&r2=1.23.2.2

Patches:
Index: data/buffered-reader.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/buffered-reader.c,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- data/buffered-reader.c      19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ data/buffered-reader.c      4 May 2007 03:48:47 -0000       1.1.2.2
@@ -72,7 +72,8 @@
 }
 
 struct casereader *
-buffered_reader_create (size_t value_cnt, casenumber case_cnt,
+buffered_reader_create (const struct taint *taint, 
+                        size_t value_cnt, casenumber case_cnt,
                         const struct buffered_reader_class *class, void *aux) 
 {
   struct buffered_reader_shared *shared = xmalloc (sizeof *shared);
@@ -80,7 +81,7 @@
   shared->class = class;
   shared->aux = aux;
   shared->min_offset = 0;
-  return casereader_create (value_cnt, case_cnt,
+  return casereader_create (taint, value_cnt, case_cnt,
                             &buffered_reader_casereader_class,
                             make_buffered_reader (shared, 0));
 }
@@ -116,34 +117,23 @@
     return false; 
 }
 
-static bool
+static void
 buffered_reader_destroy (struct casereader *reader UNUSED, void *br_) 
 {
   struct buffered_reader *br = br_;
   struct buffered_reader_shared *shared = br->shared;
-  bool ok = !shared->class->error (shared->aux);
       
   heap_delete (shared->readers, &br->heap_node);
   if (heap_is_empty (shared->readers)) 
     {
       heap_destroy (shared->readers);
-      if (!shared->class->destroy (shared->aux))
-        ok = false;
+      shared->class->destroy (shared->aux);
       free (shared);
     }
   else
     advance_buffered_reader (shared);
 
   free (br);
-  return ok;
-}
-
-static bool
-buffered_reader_error (const struct casereader *reader UNUSED, void *br_) 
-{
-  struct buffered_reader *br = br_;
-  struct buffered_reader_shared *shared = br->shared;
-  return shared->class->error (shared->aux);
 }
 
 static struct casereader *
@@ -151,7 +141,8 @@
 {
   struct buffered_reader *br = br_;
   struct buffered_reader_shared *shared = br->shared;
-  return casereader_create (casereader_get_value_cnt (reader),
+  return casereader_create (casereader_get_taint (reader),
+                            casereader_get_value_cnt (reader),
                             casereader_get_case_cnt (reader),
                             &buffered_reader_casereader_class,
                             make_buffered_reader (shared, br->offset));
@@ -172,7 +163,6 @@
   {
     buffered_reader_read,
     buffered_reader_destroy,
-    buffered_reader_error,
     buffered_reader_clone,
     buffered_reader_peek,
   };

Index: data/buffered-reader.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/buffered-reader.h,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- data/buffered-reader.h      19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ data/buffered-reader.h      4 May 2007 03:48:47 -0000       1.1.2.2
@@ -24,13 +24,13 @@
 struct buffered_reader_class
   {
     bool (*read) (void *aux, casenumber, struct ccase *);
-    bool (*error) (void *aux);
-    bool (*destroy) (void *aux);
+    void (*destroy) (void *aux);
     void (*advance) (void *aux, casenumber);
   };
 
 struct casereader *
-buffered_reader_create (size_t value_cnt, casenumber case_cnt,
+buffered_reader_create (const struct taint *,
+                        size_t value_cnt, casenumber case_cnt,
                         const struct buffered_reader_class *, void *aux);
 
 #endif /* data/buffered-reader.h */

Index: data/case.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/case.h,v
retrieving revision 1.9.2.2
retrieving revision 1.9.2.3
diff -u -b -r1.9.2.2 -r1.9.2.3
--- data/case.h 14 Apr 2007 05:04:23 -0000      1.9.2.2
+++ data/case.h 4 May 2007 03:48:47 -0000       1.9.2.3
@@ -28,7 +28,6 @@
 
 /* A count of cases or the index of a case within a collection of
    them. */
-#define CASENUMBER_INVALID -1L
 #define CASENUMBER_MAX LONG_MAX
 typedef long int casenumber;
 

Index: data/casegrouper.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casegrouper.c,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- data/casegrouper.c  19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ data/casegrouper.c  4 May 2007 03:48:47 -0000       1.1.2.2
@@ -25,6 +25,7 @@
 #include <data/casereader.h>
 #include <data/casewriter.h>
 #include <data/dictionary.h>
+#include <libpspp/taint.h>
 #include <math/ordering.h>
 
 #include "xalloc.h"
@@ -32,6 +33,7 @@
 struct casegrouper 
   {
     struct casereader *reader;
+    struct taint *taint;
     
     bool (*same_group) (const struct ccase *, const struct ccase *, void *aux);
     void (*destroy) (void *aux);
@@ -48,6 +50,7 @@
 {
   struct casegrouper *grouper = xmalloc (sizeof *grouper);
   grouper->reader = casereader_rename (reader);
+  grouper->taint = taint_clone (casereader_get_taint (grouper->reader));
   grouper->same_group = same_group;
   grouper->destroy = destroy;
   grouper->aux = aux;
@@ -106,15 +109,22 @@
 bool
 casegrouper_destroy (struct casegrouper *grouper) 
 {
-  bool ok = true;
   if (grouper != NULL) 
     {
-      ok = casereader_destroy (grouper->reader);
+      struct taint *taint = grouper->taint;
+      bool ok;
+
+      casereader_destroy (grouper->reader);
       if (grouper->destroy != NULL)
         grouper->destroy (grouper->aux);
       free (grouper);
-    }
+
+      ok = !taint_has_tainted_successor (taint);
+      taint_destroy (taint);
   return ok;
+    }
+  else
+    return true;
 }
 
 struct casegrouper_vars 

Index: data/casereader-private.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casereader-private.h,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- data/casereader-private.h   19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ data/casereader-private.h   4 May 2007 03:48:47 -0000       1.1.2.2
@@ -28,10 +28,7 @@
   {
     /* Mandatory. */
     bool (*read) (struct casereader *, void *aux, struct ccase *);
-    bool (*destroy) (struct casereader *, void *aux);
-
-    /* Need only be supplied if reads can fail. */
-    bool (*error) (const struct casereader *, void *aux);
+    void (*destroy) (struct casereader *, void *aux);
 
     /* Optional.  If convenient and efficiently implementable,
        supply as an optimization for use by casereader_clone. */
@@ -43,7 +40,8 @@
                   struct ccase *);
   };
 
-struct casereader *casereader_create (size_t value_cnt, casenumber case_cnt,
+struct casereader *casereader_create (const struct taint *,
+                                      size_t value_cnt, casenumber case_cnt,
                                       const struct casereader_class *, void *);
 
 #endif /* data/casereader-private.h */

Index: data/casereader.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casereader.c,v
retrieving revision 1.1.2.4
retrieving revision 1.1.2.5
diff -u -b -r1.1.2.4 -r1.1.2.5
--- data/casereader.c   23 Apr 2007 01:52:29 -0000      1.1.2.4
+++ data/casereader.c   4 May 2007 03:48:47 -0000       1.1.2.5
@@ -31,6 +31,7 @@
 #include <data/settings.h>
 #include <data/variable.h>
 #include <libpspp/message.h>
+#include <libpspp/taint.h>
 
 #include "xalloc.h"
 
@@ -39,6 +40,7 @@
 
 struct casereader 
   {
+    struct taint *taint;
     size_t value_cnt;
     casenumber case_cnt;
     const struct casereader_class *class;
@@ -57,11 +59,13 @@
 }
 
 struct casereader *
-casereader_create (size_t value_cnt, casenumber case_cnt,
+casereader_create (const struct taint *taint,
+                   size_t value_cnt, casenumber case_cnt,
                    const struct casereader_class *class, void *aux) 
 {
   struct casereader *reader = xmalloc (sizeof *reader);
   assert (class->destroy != NULL);
+  reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
   reader->value_cnt = value_cnt;
   reader->case_cnt = case_cnt;
   reader->class = class;
@@ -117,7 +121,8 @@
   bool ok = true;
   if (reader != NULL) 
     {
-      ok = reader->class->destroy (reader, reader->aux);
+      reader->class->destroy (reader, reader->aux);
+      ok = taint_destroy (reader->taint);
       free (reader);
     }
   return ok;
@@ -129,7 +134,7 @@
   if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c)) 
     {
       //assert (case_get_value_cnt (c) == reader->value_cnt);
-      if (reader->case_cnt != CASENUMBER_INVALID)
+      if (reader->case_cnt != CASENUMBER_MAX)
         reader->case_cnt--;
       return true; 
     }
@@ -144,6 +149,8 @@
 bool
 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
 {
+  if (idx >= reader->case_cnt)
+    return NULL;
   if (reader->class->peek == NULL)
     insert_casereader_buffer (reader);
   return reader->class->peek (reader, reader->aux, idx, c);
@@ -152,10 +159,19 @@
 bool
 casereader_error (const struct casereader *reader) 
 {
-  if (reader->class->error != NULL)
-    return reader->class->error (reader, reader->aux);
-  else
-    return false;
+  return taint_is_tainted (reader->taint);
+}
+
+void
+casereader_force_error (struct casereader *reader) 
+{
+  taint_set_taint (reader->taint);
+}
+
+const struct taint *
+casereader_get_taint (const struct casereader *reader) 
+{
+  return reader->taint;
 }
 
 casenumber
@@ -167,19 +183,19 @@
 casenumber
 casereader_count_cases (struct casereader *reader)
 {
-  casenumber case_cnt = casereader_get_case_cnt (reader);
-  if (case_cnt == CASENUMBER_INVALID) 
+  if (reader->case_cnt == CASENUMBER_MAX) 
     {
       struct casereader *clone;
       struct ccase c;
 
-      case_cnt = 0;
+      reader->case_cnt = 0;
+
       clone = casereader_clone (reader);
       for (; casereader_read (clone, &c); case_destroy (&c)) 
-        case_cnt++;
+        reader->case_cnt++;
       casereader_destroy (clone);
     }
-  return case_cnt;
+  return reader->case_cnt;
 }
 
 size_t
@@ -188,6 +204,18 @@
   return reader->value_cnt;
 }
 
+void
+casereader_transfer (struct casereader *reader, struct casewriter *writer)
+{
+  struct ccase c;
+
+  taint_propagate (casereader_get_taint (reader),
+                   casewriter_get_taint (writer));
+  while (casereader_read (reader, &c))
+    casewriter_write (writer, &c);
+  casereader_destroy (reader);
+}
+
 struct casereader_filter 
   {
     struct casereader *subreader;
@@ -208,14 +236,19 @@
                                struct casewriter *exclude) 
 {
   struct casereader_filter *filter = xmalloc (sizeof *filter);
+  struct casereader *reader;
   filter->subreader = casereader_rename (subreader);
   filter->include = include;
   filter->destroy = destroy;
   filter->aux = aux;
   filter->exclude = exclude;
-  return casereader_create (casereader_get_value_cnt (filter->subreader),
-                            CASENUMBER_INVALID,
+  reader = casereader_create (NULL,
+                              casereader_get_value_cnt (filter->subreader),
+                              CASENUMBER_MAX,
                             &casereader_filter_class, filter);
+  taint_propagate (casereader_get_taint (filter->subreader),
+                   casereader_get_taint (reader));
+  return reader;
 }
 
 static bool
@@ -237,30 +270,20 @@
     }
 }
 
-static bool
-casereader_filter_destroy (struct casereader *reader UNUSED, void *filter_) 
+static void
+casereader_filter_destroy (struct casereader *reader, void *filter_) 
 {
   struct casereader_filter *filter = filter_;
-  bool ok = casereader_destroy (filter->subreader);
-  if (filter->destroy != NULL)
-    ok = filter->destroy (filter->aux) && ok;
+  casereader_destroy (filter->subreader);
+  if (filter->destroy != NULL && !filter->destroy (filter->aux))
+    casereader_force_error (reader);
   free (filter);
-  return ok;
-}
-
-static bool
-casereader_filter_error (const struct casereader *reader UNUSED,
-                         void *filter_) 
-{
-  struct casereader_filter *filter = filter_;
-  return casereader_error (filter->subreader);
 }
 
 static struct casereader_class casereader_filter_class = 
   {
     casereader_filter_read,
     casereader_filter_destroy,
-    casereader_filter_error,
 
     /* We could in fact delegate clone to the subreader, if the
        filter function is required to have no memory and if we
@@ -399,9 +422,15 @@
   casenumber case_cnt = casereader_get_case_cnt (reader);
   struct casereader_buffer *b = xmalloc (sizeof *b);
   b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
-  b->subreader = buffered_reader_create (value_cnt, case_cnt,
+  b->subreader = buffered_reader_create (NULL, value_cnt, case_cnt,
                                          &casereader_buffer_class, b);
  casereader_swap (reader, b->subreader);
+  /* Should we do these propagations or should we clone one of
+     them and pass it to buffered_reader_create? */
+  taint_propagate (casewindow_get_taint (b->window),
+                   casereader_get_taint (reader));
+  taint_propagate (casereader_get_taint (b->subreader),
+                   casereader_get_taint (reader));
 }
 
 static bool
@@ -425,22 +454,13 @@
           && casewindow_get_case (b->window, offset, c));
 }
 
-static bool
-casereader_buffer_error (void *b_)
-{
-  struct casereader_buffer *b = b_;
-  return casewindow_error (b->window) || casereader_error (b->subreader);
-}
-
-static bool
+static void
 casereader_buffer_destroy (void *b_) 
 {
   struct casereader_buffer *b = b_;
-  bool ok = !casereader_error (b->subreader);
-  ok = casewindow_destroy (b->window) && ok;
-  ok = casereader_destroy (b->subreader) && ok;
+  casewindow_destroy (b->window);
+  casereader_destroy (b->subreader);
   free (b);
-  return ok;
 }
 
 static void
@@ -453,7 +473,6 @@
 static struct buffered_reader_class casereader_buffer_class = 
   {
     casereader_buffer_read,
-    casereader_buffer_error,
     casereader_buffer_destroy,
     casereader_buffer_advance,
   };
@@ -488,7 +507,7 @@
 static struct casereader_class casereader_translator_class;
 
 struct casereader *
-casereader_create_translator (struct casereader *reader,
+casereader_create_translator (struct casereader *subreader,
                               size_t output_value_cnt,
                               void (*translate) (const struct ccase *input,
                                                  struct ccase *output,
@@ -497,13 +516,17 @@
                               void *aux) 
 {
   struct casereader_translator *ct = xmalloc (sizeof *ct);
-  ct->subreader = casereader_rename (reader);
+  struct casereader *reader;
+  ct->subreader = casereader_rename (subreader);
   ct->translate = translate;
   ct->destroy = destroy;
   ct->aux = aux;
-  return casereader_create (output_value_cnt,
+  reader = casereader_create (NULL, output_value_cnt,
                             casereader_get_case_cnt (ct->subreader),
                             &casereader_translator_class, ct);
+  taint_propagate (casereader_get_taint (ct->subreader),
+                   casereader_get_taint (reader));
+  return reader;
 }
 
 static bool
@@ -522,27 +545,17 @@
     return false;
 }
 
-static bool
+static void
 casereader_translator_destroy (struct casereader *reader UNUSED, void *ct_) 
 {
   struct casereader_translator *ct = ct_;
-  bool ok = casereader_destroy (ct->subreader);
+  casereader_destroy (ct->subreader);
   ct->destroy (ct->aux);
   free (ct);
-  return ok;
-}
-
-static bool
-casereader_translator_error (const struct casereader *reader UNUSED, void *ct_)
-{
-  struct casereader_translator *ct = ct_;
-
-  return casereader_error (ct->subreader);
 }
 
 static struct casereader_class casereader_translator_class = 
   {
     casereader_translator_read,
     casereader_translator_destroy,
-    casereader_translator_error,
   };

Index: data/casereader.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casereader.h,v
retrieving revision 1.1.2.2
retrieving revision 1.1.2.3
diff -u -b -r1.1.2.2 -r1.1.2.3
--- data/casereader.h   14 Apr 2007 05:04:23 -0000      1.1.2.2
+++ data/casereader.h   4 May 2007 03:48:47 -0000       1.1.2.3
@@ -19,6 +19,7 @@
 #ifndef DATA_CASEREADER_H
 #define DATA_CASEREADER_H 1
 
+#include <libpspp/compiler.h>
 #include <data/case.h>
 #include <data/missing-values.h>
 
@@ -34,13 +35,19 @@
                        struct casereader **, struct casereader **);
 struct casereader *casereader_rename (struct casereader *);
 
-bool casereader_peek (struct casereader *, casenumber, struct ccase *);
+bool casereader_peek (struct casereader *, casenumber, struct ccase *)
+     WARN_UNUSED_RESULT;
+
 bool casereader_error (const struct casereader *);
+void casereader_force_error (struct casereader *);
+const struct taint *casereader_get_taint (const struct casereader *);
 
 casenumber casereader_get_case_cnt (struct casereader *);
 casenumber casereader_count_cases (struct casereader *);
 size_t casereader_get_value_cnt (struct casereader *);
 
+void casereader_transfer (struct casereader *, struct casewriter *);
+
 struct casereader *
 casereader_create_filter_func (struct casereader *,
                                bool (*include) (const struct ccase *,

Index: data/casewindow.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casewindow.c,v
retrieving revision 1.1.2.3
retrieving revision 1.1.2.4
diff -u -b -r1.1.2.3 -r1.1.2.4
--- data/casewindow.c   14 Apr 2007 05:04:23 -0000      1.1.2.3
+++ data/casewindow.c   4 May 2007 03:48:47 -0000       1.1.2.4
@@ -26,6 +26,7 @@
 #include <libpspp/assertion.h>
 #include <libpspp/compiler.h>
 #include <libpspp/deque.h>
+#include <libpspp/taint.h>
 
 #include "xalloc.h"
 
@@ -36,7 +37,7 @@
 
     size_t value_cnt;
     casenumber max_in_core_cases;
-    bool ok;
+    struct taint *taint;
   };
 
 struct casewindow_class 
@@ -52,8 +53,9 @@
 static const struct casewindow_class casewindow_memory_class;
 static const struct casewindow_class casewindow_file_class;
 
-struct casewindow *
-casewindow_create (size_t value_cnt, casenumber max_in_core_cases) 
+static struct casewindow *
+do_casewindow_create (struct taint *taint,
+                   size_t value_cnt, casenumber max_in_core_cases) 
 {
   struct casewindow *cw = xmalloc (sizeof *cw);
   cw->class = (max_in_core_cases
@@ -62,10 +64,16 @@
   cw->aux = cw->class->create (value_cnt);
   cw->value_cnt = value_cnt;
   cw->max_in_core_cases = max_in_core_cases;
-  cw->ok = true;
+  cw->taint = taint;
   return cw;
 }
 
+struct casewindow *
+casewindow_create (size_t value_cnt, casenumber max_in_core_cases) 
+{
+  return do_casewindow_create (taint_create (), value_cnt, max_in_core_cases);
+}
+
 bool
 casewindow_destroy (struct casewindow *cw) 
 {
@@ -73,6 +81,7 @@
   if (cw != NULL) 
     {
       ok = cw->class->destroy (cw->aux);
+      ok = taint_destroy (cw->taint) && ok;
       free (cw);
     }
   return ok;
@@ -89,8 +98,9 @@
 static void
 casewindow_to_disk (struct casewindow *old) 
 {
-  struct casewindow *new = casewindow_create (old->value_cnt, 0);
-  while (casewindow_get_case_cnt (old) > 0 && !casewindow_error (old))
+  struct casewindow *new;
+  new = do_casewindow_create (taint_clone (old->taint), old->value_cnt, 0);
+  while (casewindow_get_case_cnt (old) > 0 && !casewindow_error (new))
     {
       struct ccase c;
       if (!casewindow_get_case (old, 0, &c))
@@ -98,8 +108,6 @@
       casewindow_pop_tail (old, 1);
       casewindow_push_head (new, &c);
     }
-  if (casewindow_error (old))
-    new->ok = false;
   casewindow_swap (old, new);
   casewindow_destroy (new);
 }
@@ -107,7 +115,7 @@
 void
 casewindow_push_head (struct casewindow *cw, struct ccase *c) 
 {
-  if (cw->ok)
+  if (!casewindow_error (cw))
     {
       if (cw->class->push_head (cw->aux, c)) 
         {
@@ -117,7 +125,7 @@
             casewindow_to_disk (cw); 
         }
       else
-        cw->ok = false; 
+        casewindow_force_error (cw);
     }
   else
     case_destroy (c);
@@ -126,8 +134,9 @@
 void
 casewindow_pop_tail (struct casewindow *cw, casenumber case_cnt) 
 {
-  if (cw->ok)
-    cw->ok = cw->class->pop_tail (cw->aux, case_cnt);
+  if (!casewindow_error (cw))
+    if (!cw->class->pop_tail (cw->aux, case_cnt))
+      casewindow_force_error (cw);
 }
 
 bool
@@ -137,11 +146,11 @@
   struct casewindow *cw = (struct casewindow *) cw_;
 
   assert (case_idx >= 0 && case_idx < casewindow_get_case_cnt (cw));
-  if (cw->ok && cw->class->get_case (cw->aux, case_idx, c))
+  if (!casewindow_error (cw) && cw->class->get_case (cw->aux, case_idx, c))
     return true;
   else 
     {
-      cw->ok = false;
+      casewindow_force_error (cw);
       case_nullify (c);
       return false;
     }
@@ -162,7 +171,19 @@
 bool
 casewindow_error (const struct casewindow *cw) 
 {
-  return !cw->ok;
+  return taint_is_tainted (cw->taint);
+}
+
+void
+casewindow_force_error (struct casewindow *cw) 
+{
+  taint_set_taint (cw->taint);
+}
+
+const struct taint *
+casewindow_get_taint (const struct casewindow *cw) 
+{
+  return cw->taint;
 }
 
 struct casewindow_memory 

Index: data/casewindow.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casewindow.h,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- data/casewindow.h   19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ data/casewindow.h   4 May 2007 03:48:47 -0000       1.1.2.2
@@ -32,6 +32,9 @@
                           struct ccase *);
 size_t casewindow_get_value_cnt (const struct casewindow *);
 casenumber casewindow_get_case_cnt (const struct casewindow *);
+
 bool casewindow_error (const struct casewindow *);
+void casewindow_force_error (struct casewindow *);
+const struct taint *casewindow_get_taint (const struct casewindow *);
 
 #endif /* data/casewindow.h */

Index: data/casewriter-private.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casewriter-private.h,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- data/casewriter-private.h   19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ data/casewriter-private.h   4 May 2007 03:48:47 -0000       1.1.2.2
@@ -26,10 +26,9 @@
     /* Mandatory. */
     /* Ownership of the case is transferred to the callee. */
     void (*write) (struct casewriter *, void *aux, struct ccase *);
-    bool (*destroy) (struct casewriter *, void *aux);
+    void (*destroy) (struct casewriter *, void *aux);
 
     /* Optional. */
-    bool (*error) (const struct casewriter *, void *aux);
     struct casereader *(*convert_to_reader) (struct casewriter *, void *aux);
   };
 

Index: data/casewriter.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casewriter.c,v
retrieving revision 1.1.2.2
retrieving revision 1.1.2.3
diff -u -b -r1.1.2.2 -r1.1.2.3
--- data/casewriter.c   14 Apr 2007 05:04:23 -0000      1.1.2.2
+++ data/casewriter.c   4 May 2007 03:48:47 -0000       1.1.2.3
@@ -30,11 +30,13 @@
 #include <data/casewindow.h>
 #include <data/settings.h>
 #include <libpspp/compiler.h>
+#include <libpspp/taint.h>
 
 #include "xalloc.h"
 
 struct casewriter
   {
+    struct taint *taint;
     casenumber case_cnt;
     const struct casewriter_class *class;
     void *private;
@@ -44,6 +46,7 @@
 casewriter_create (const struct casewriter_class *class, void *private) 
 {
   struct casewriter *writer = xmalloc (sizeof *writer);
+  writer->taint = taint_create ();
   writer->case_cnt = 0;
   writer->class = class;
   writer->private = private;
@@ -68,10 +71,19 @@
 bool
 casewriter_error (const struct casewriter *writer) 
 {
-  if (writer->class->error != NULL)
-    return writer->class->error (writer, writer->private);
-  else
-    return false;
+  return taint_is_tainted (writer->taint);
+}
+
+void
+casewriter_force_error (struct casewriter *writer) 
+{
+  taint_set_taint (writer->taint);
+}
+
+const struct taint *
+casewriter_get_taint (const struct casewriter *writer) 
+{
+  return writer->taint;
 }
 
 bool
@@ -80,7 +92,8 @@
   bool ok = true;
   if (writer != NULL)
     {
-      ok = writer->class->destroy (writer, writer->private);
+      writer->class->destroy (writer, writer->private);
+      ok = taint_destroy (writer->taint);
       free (writer); 
     }
   return ok;
@@ -91,6 +104,8 @@
 {
   struct casereader *reader;
   reader = writer->class->convert_to_reader (writer, writer->private);
+  taint_propagate (writer->taint, casereader_get_taint (reader));
+  taint_destroy (writer->taint);
   free (writer);
   return reader;
 }
@@ -101,8 +116,12 @@
 static struct casewriter *
 create_casewriter_window (size_t value_cnt, casenumber max_in_core_cases) 
 {
-  return casewriter_create (&casewriter_window_class,
-                            casewindow_create (value_cnt, max_in_core_cases));
+  struct casewindow *window = casewindow_create (value_cnt, max_in_core_cases);
+  struct casewriter *writer = casewriter_create (&casewriter_window_class,
+                                                 window);
+  taint_propagate (casewindow_get_taint (window),
+                   casewriter_get_taint (writer));
+  return writer;
 }
 
 struct casewriter *
@@ -131,19 +150,11 @@
   casewindow_push_head (window, c);
 }
 
-static bool
+static void
 casewriter_window_destroy (struct casewriter *writer UNUSED, void *window_)
 {
   struct casewindow *window = window_;
-  return casewindow_destroy (window);
-}
-
-static bool
-casewriter_window_error (const struct casewriter *writer UNUSED,
-                         void *window_)
-{
-  struct casewindow *window = window_;
-  return casewindow_error (window);
+  casewindow_destroy (window);
 }
 
 static struct casereader *
@@ -151,7 +162,8 @@
                                      void *window_) 
 {
   struct casewindow *window = window_;
-  return buffered_reader_create (casewindow_get_value_cnt (window),
+  return buffered_reader_create (casewindow_get_taint (window),
+                                 casewindow_get_value_cnt (window),
                                  casewindow_get_case_cnt (window),
                                  &casereader_window_class, window);
 }
@@ -166,18 +178,11 @@
     return casewindow_get_case (window, case_idx, c);
 }
 
-static bool
-casereader_window_error (void *window_)
-{
-  struct casewindow *window = window_;
-  return casewindow_error (window);
-}
-
-static bool
+static void
 casereader_window_destroy (void *window_)
 {
   struct casewindow *window = window_;
-  return casewindow_destroy (window);
+  casewindow_destroy (window);
 }
 
 static void
@@ -191,14 +196,12 @@
   {
     casewriter_window_write,
     casewriter_window_destroy,
-    casewriter_window_error,
     casewriter_window_convert_to_reader,
   };
 
 static const struct buffered_reader_class casereader_window_class = 
   {
     casereader_window_read,
-    casereader_window_error,
     casereader_window_destroy,
     casereader_window_advance,
   };
@@ -216,7 +219,7 @@
 static struct casewriter_class casewriter_translator_class;
 
 struct casewriter *
-casewriter_create_translator (struct casewriter *writer,
+casewriter_create_translator (struct casewriter *subwriter,
                               void (*translate) (const struct ccase *input,
                                                  struct ccase *output,
                                                  void *aux),
@@ -224,11 +227,15 @@
                               void *aux) 
 {
   struct casewriter_translator *ct = xmalloc (sizeof *ct);
-  ct->subwriter = casewriter_rename (writer);
+  struct casewriter *writer;
+  ct->subwriter = casewriter_rename (subwriter);
   ct->translate = translate;
   ct->destroy = destroy;
   ct->aux = aux;
-  return casewriter_create (&casewriter_translator_class, ct);
+  writer = casewriter_create (&casewriter_translator_class, ct);
+  taint_propagate (casewriter_get_taint (ct->subwriter),
+                   casewriter_get_taint (writer));
+  return writer;
 }
 
 static void
@@ -242,27 +249,17 @@
   casewriter_write (ct->subwriter, &tmp);
 }
 
-static bool
+static void
 casewriter_translator_destroy (struct casewriter *writer UNUSED, void *ct_) 
 {
   struct casewriter_translator *ct = ct_;
-  bool ok = casewriter_destroy (ct->subwriter);
+  casewriter_destroy (ct->subwriter);
   ct->destroy (ct->aux);
   free (ct);
-  return ok;
-}
-
-static bool
-casewriter_translator_error (const struct casewriter *writer UNUSED, void *ct_)
-{
-  struct casewriter_translator *ct = ct_;
-
-  return casewriter_error (ct->subwriter);
 }
 
 static struct casewriter_class casewriter_translator_class = 
   {
     casewriter_translator_write,
     casewriter_translator_destroy,
-    casewriter_translator_error,
   };

Index: data/casewriter.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/casewriter.h,v
retrieving revision 1.1.2.2
retrieving revision 1.1.2.3
diff -u -b -r1.1.2.2 -r1.1.2.3
--- data/casewriter.h   14 Apr 2007 05:04:23 -0000      1.1.2.2
+++ data/casewriter.h   4 May 2007 03:48:47 -0000       1.1.2.3
@@ -22,6 +22,7 @@
 #include <stdbool.h>
 #include <stddef.h>
 #include <data/transformations.h>
+#include <libpspp/compiler.h>
 
 struct casewriter;
 
@@ -35,6 +36,9 @@
 bool casewriter_destroy (struct casewriter *);
 
 bool casewriter_error (const struct casewriter *);
+void casewriter_force_error (struct casewriter *);
+const struct taint *casewriter_get_taint (const struct casewriter *);
+
 struct casereader *casewriter_make_reader (struct casewriter *);
 
 struct casewriter *

Index: data/datasheet.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/datasheet.c,v
retrieving revision 1.1.2.11
retrieving revision 1.1.2.12
diff -u -b -r1.1.2.11 -r1.1.2.12
--- data/datasheet.c    22 Apr 2007 20:46:48 -0000      1.1.2.11
+++ data/datasheet.c    4 May 2007 03:48:47 -0000       1.1.2.12
@@ -32,6 +32,7 @@
 #include <libpspp/model-checker.h>
 #include <libpspp/range-map.h>
 #include <libpspp/range-set.h>
+#include <libpspp/taint.h>
 #include <libpspp/tower.h>
 
 #include "minmax.h"
@@ -121,8 +122,8 @@
        likelihood of running out. */
     unsigned column_min_alloc;
 
-    /* True if an I/O error has occurred. */
-    bool error;
+    /* Indicates corrupted data in the datasheet. */
+    struct taint *taint;
   };
 
 /* Maps from a range of physical columns to a source. */
@@ -159,7 +160,7 @@
   ds->rows = axis_create ();
   range_map_init (&ds->sources);
   ds->column_min_alloc = 1;
-  ds->error = false;
+  ds->taint = taint_create ();
 
   /* Add backing. */
   if (reader != NULL) 
@@ -205,6 +206,7 @@
       struct source_info *si = source_info_from_range_map (r);
       free_source_info (ds, si);
     }
+  taint_destroy (ds->taint);
   free (ds);
 }
 
@@ -228,7 +230,19 @@
 bool
 datasheet_error (const struct datasheet *ds) 
 {
-  return ds->error;
+  return taint_is_tainted (ds->taint);
+}
+
+void
+datasheet_force_error (struct datasheet *ds) 
+{
+  taint_set_taint (ds->taint);
+}
+
+const struct taint *
+datasheet_get_taint (const struct datasheet *ds) 
+{
+  return ds->taint;
 }
 
 /* Returns the number of rows in DS. */
@@ -312,7 +326,7 @@
             {
               datasheet_delete_columns (ds, before - added,
                                         source_cnt + added);
-              ds->error = true;
+              taint_set_taint (ds->taint);
               return false;
             }
           source_increase_use (s->source, source_cnt);
@@ -509,7 +523,8 @@
 datasheet_make_reader (struct datasheet *ds) 
 {
   ds = datasheet_rename (ds);
-  return buffered_reader_create (datasheet_get_column_cnt (ds),
+  return buffered_reader_create (datasheet_get_taint (ds),
+                                 datasheet_get_column_cnt (ds),
                                  datasheet_get_row_cnt (ds),
                                  &datasheet_reader_class, ds);
 }
@@ -520,23 +535,20 @@
   struct datasheet *ds = ds_;
   if (case_idx >= datasheet_get_row_cnt (ds))
     return false;
+  else if (datasheet_get_row (ds, case_idx, c))
+    return true;
   else
-    return datasheet_get_row (ds, case_idx, c);
-}
-
-static bool
-datasheet_reader_error (void *ds_)
-{
-  const struct datasheet *ds = ds_;
-  return datasheet_error (ds);
+    {
+      taint_set_taint (ds->taint);
+      return false;
+    }
 }
 
-static bool
+static void
 datasheet_reader_destroy (void *ds_)
 {
   struct datasheet *ds = ds_;
   datasheet_destroy (ds);
-  return true;
 }
 
 static void
@@ -549,7 +561,6 @@
 static const struct buffered_reader_class datasheet_reader_class = 
   {
     datasheet_reader_read,
-    datasheet_reader_error,
     datasheet_reader_destroy,
     datasheet_reader_advance,
   };
@@ -596,7 +607,7 @@
             ? source_read (s->source, prow, pcol_ofs, data, 1)
             : source_write (s->source, prow, pcol_ofs, data, 1))) 
         {
-          ds->error = true;
+          taint_set_taint (ds->taint);
           return false; 
         }
     }
@@ -1319,6 +1330,7 @@
                         range_map_node_get_width (r), &si->column_range);
     }
   ds->column_min_alloc = ods->column_min_alloc;
+  ds->taint = taint_create ();
 
   /* Clone ODATA into DATA. */
   memcpy (data, odata, MAX_ROWS * MAX_COLS * sizeof **data);

Index: data/datasheet.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/Attic/datasheet.h,v
retrieving revision 1.1.2.4
retrieving revision 1.1.2.5
diff -u -b -r1.1.2.4 -r1.1.2.5
--- data/datasheet.h    18 Apr 2007 23:16:18 -0000      1.1.2.4
+++ data/datasheet.h    4 May 2007 03:48:47 -0000       1.1.2.5
@@ -34,6 +34,8 @@
 struct datasheet *datasheet_rename (struct datasheet *);
 
 bool datasheet_error (const struct datasheet *);
+void datasheet_force_error (struct datasheet *);
+const struct taint *datasheet_get_taint (const struct datasheet *);
 
 struct casereader *datasheet_make_reader (struct datasheet *);
 

Index: data/por-file-reader.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/por-file-reader.c,v
retrieving revision 1.17.2.1
retrieving revision 1.17.2.2
diff -u -b -r1.17.2.1 -r1.17.2.2
--- data/por-file-reader.c      19 Mar 2007 21:36:24 -0000      1.17.2.1
+++ data/por-file-reader.c      4 May 2007 03:48:47 -0000       1.17.2.2
@@ -115,13 +115,11 @@
 }
 
 /* Closes portable file reader R, after we're done with it. */
-static bool
+static void
 por_file_casereader_destroy (struct casereader *reader UNUSED, void *r_)
 {
   struct pfm_reader *r = r_;
-  bool ok = r->ok;
   pool_destroy (r->pool);
-  return ok;
 }
 
 /* Read a single character into cur_char.  */
@@ -212,7 +210,7 @@
     error (r, _("Data record expected."));
 
   r->value_cnt = dict_get_next_value_idx (*dict);
-  return casereader_create (r->value_cnt, CASENUMBER_INVALID,
+  return casereader_create (NULL, r->value_cnt, CASENUMBER_MAX,
                             &por_file_casereader_class, r);
 
  error:
@@ -697,6 +695,7 @@
   setjmp (r->bail_out);
   if (!r->ok) 
     {
+      casereader_force_error (reader);
       case_destroy (c);
       return false; 
     }
@@ -730,15 +729,6 @@
   return true;
 }
 
-/* Returns true if an I/O error has occurred on READER, false
-   otherwise. */
-static bool
-por_file_casereader_error (const struct casereader *reader UNUSED, void *r_)
-{
-  const struct pfm_reader *r = r_;
-  return !r->ok;
-}
-
 /* Returns true if FILE is an SPSS portable file,
    false otherwise. */
 bool
@@ -778,5 +768,4 @@
   {
     por_file_casereader_read,
     por_file_casereader_destroy,
-    por_file_casereader_error,
   };

Index: data/por-file-writer.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/por-file-writer.c,v
retrieving revision 1.9.2.2
retrieving revision 1.9.2.3
diff -u -b -r1.9.2.2 -r1.9.2.3
--- data/por-file-writer.c      25 Apr 2007 03:49:41 -0000      1.9.2.2
+++ data/por-file-writer.c      4 May 2007 03:48:47 -0000       1.9.2.3
@@ -403,7 +403,7 @@
 
 /* Writes case C to the portable file represented by H. */
 static void 
-por_file_casewriter_write (struct casewriter *writer UNUSED, void *w_,
+por_file_casewriter_write (struct casewriter *writer, void *w_,
                            struct ccase *c)
 {
   struct pfm_writer *w = w_;
@@ -424,22 +424,18 @@
             }
         } 
     }
+  else
+    casewriter_force_error (writer);
   
   case_destroy (c);
 }
 
-static bool
-por_file_casewriter_destroy (struct casewriter *writer UNUSED, void *w_) 
+static void
+por_file_casewriter_destroy (struct casewriter *writer, void *w_) 
 {
   struct pfm_writer *w = w_;
-  return close_writer (w);
-}
-
-static bool
-por_file_casewriter_error (const struct casewriter *writer UNUSED, void *w_)
-{
-  const struct pfm_writer *w = w_;
-  return ferror (w->file);
+  if (!close_writer (w))
+    casewriter_force_error (writer);
 }
 
 /* Closes a portable file after we're done with it.
@@ -866,5 +862,4 @@
   {
     por_file_casewriter_write,
     por_file_casewriter_destroy,
-    por_file_casewriter_error,
   };

Index: data/procedure.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/procedure.c,v
retrieving revision 1.25.2.2
retrieving revision 1.25.2.3
diff -u -b -r1.25.2.2 -r1.25.2.3
--- data/procedure.c    28 Mar 2007 17:26:48 -0000      1.25.2.2
+++ data/procedure.c    4 May 2007 03:48:47 -0000       1.25.2.3
@@ -99,8 +99,6 @@
 static void add_filter_trns (struct dataset *ds);
 
 static void update_last_proc_invocation (struct dataset *ds);
-static void open_active_file (struct dataset *ds);
-static bool close_active_file (struct dataset *ds);
 
 /* Public functions. */
 
@@ -115,7 +113,10 @@
 
 /* Regular procedure. */
 
-/* Executes any pending transformations. */
+/* Executes any pending transformations, if necessary.
+   This is not identical to the EXECUTE command in that it won't
+   always read the source data.  This can be important when the
+   source data is given inline within BEGIN DATA...END FILE. */
 bool
 proc_execute (struct dataset *ds)
 {
@@ -139,7 +140,7 @@
 static struct casereader_class proc_casereader_class;
 
 /* Opens dataset DS for reading cases with proc_read.
-   proc_close must be called when done. */
+   proc_commit must be called when done. */
 struct casereader *
 proc_open (struct dataset *ds)
 {
@@ -148,14 +149,45 @@
 
   update_last_proc_invocation (ds);
 
-  open_active_file (ds);
+  caseinit_mark_for_init (ds->caseinit, ds->dict);
+
+  /* Finish up the collection of transformations. */
+  add_case_limit_trns (ds);
+  add_filter_trns (ds);
+  trns_chain_finalize (ds->cur_trns_chain);
+
+  /* Make permanent_dict refer to the dictionary right before
+     data reaches the sink. */
+  if (ds->permanent_dict == NULL)
+    ds->permanent_dict = ds->dict;
+
+  /* Prepare sink. */
+  if (!ds->discard_output) 
+    {
+      ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
+                       ? dict_make_compactor (ds->permanent_dict)
+                       : NULL);
+      ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
+                                             ds->permanent_dict)); 
+    }
+  else 
+    {
+      ds->compactor = NULL;
+      ds->sink = NULL;
+    }
+
+  /* Allocate memory for lagged cases. */
+  ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
 
   ds->proc_state = PROC_OPEN;
   ds->cases_written = 0;
   ds->ok = true;
-  return casereader_create (dict_get_next_value_idx (ds->dict),
-                            CASENUMBER_INVALID,
-                            &proc_casereader_class, ds);
+
+  /* FIXME: use taint in dataset in place of `ok'? */
+  /* FIXME: for trivial cases we can just return a clone of
+     ds->source? */
+  return casereader_create (NULL, dict_get_next_value_idx (ds->dict),
+                            CASENUMBER_MAX, &proc_casereader_class, ds);
 }
 
 bool
@@ -250,7 +282,7 @@
    while reading or closing the data set.
    If DS has not been opened, returns true without doing
    anything else. */
-static bool
+static void
 proc_casereader_destroy (struct casereader *reader, void *ds_)
 {
   struct dataset *ds = ds_;
@@ -266,73 +298,19 @@
   ds->ok = casereader_destroy (ds->source) && ds->ok;
   ds->source = NULL;
   proc_set_active_file_data (ds, NULL);
-
-  return ds->ok;
 }
 
+/* Must return false if the source casereader, a transformation,
+   or the sink casewriter signaled an error.  (If a temporary
+   transformation signals an error, then the return value is
+   false, but the replacement active file may still be
+   untainted.) */
 bool
 proc_commit (struct dataset *ds) 
 {
   assert (ds->proc_state == PROC_CLOSED);
   ds->proc_state = PROC_COMMITTED;
-  return close_active_file (ds) && ds->ok;
-}
-
-static struct casereader_class proc_casereader_class = 
-  {
-    proc_casereader_read,
-    proc_casereader_destroy,
-  };
 
-/* Updates last_proc_invocation. */
-static void
-update_last_proc_invocation (struct dataset *ds)
-{
-  ds->last_proc_invocation = time (NULL);
-}
-
-/* Makes all preparations for reading from the data source and writing
-   to the data sink. */
-static void
-open_active_file (struct dataset *ds)
-{
-  caseinit_mark_for_init (ds->caseinit, ds->dict);
-
-  add_case_limit_trns (ds);
-  add_filter_trns (ds);
-
-  /* Finalize transformations. */
-  trns_chain_finalize (ds->cur_trns_chain);
-
-  /* Make permanent_dict refer to the dictionary right before
-     data reaches the sink. */
-  if (ds->permanent_dict == NULL)
-    ds->permanent_dict = ds->dict;
-
-  /* Prepare sink. */
-  if (!ds->discard_output) 
-    {
-      ds->compactor = 
-        (dict_compacting_would_shrink (ds->permanent_dict)
-         ? dict_make_compactor (ds->permanent_dict)
-         : NULL);
-      ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
-                                             ds->permanent_dict)); 
-    }
-  else 
-    {
-      ds->compactor = NULL;
-      ds->sink = NULL;
-    }
-
-  /* Allocate memory for lagged cases. */
-  ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
-}
-
-/* Closes the active file. */
-static bool
-close_active_file (struct dataset *ds)
-{
   /* Free memory for lagged cases. */
   while (!deque_is_empty (&ds->lag))
     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
@@ -367,7 +345,20 @@
 
   dict_clear_vectors (ds->dict);
   ds->permanent_dict = NULL;
-  return proc_cancel_all_transformations (ds);
+  return proc_cancel_all_transformations (ds) && ds->ok;
+}
+
+static struct casereader_class proc_casereader_class = 
+  {
+    proc_casereader_read,
+    proc_casereader_destroy,
+  };
+
+/* Updates last_proc_invocation. */
+static void
+update_last_proc_invocation (struct dataset *ds)
+{
+  ds->last_proc_invocation = time (NULL);
 }
 
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
@@ -584,7 +575,7 @@
 
 /* Replaces the active file's data by READER without replacing
    the associated dictionary. */
-void
+bool
 proc_set_active_file_data (struct dataset *ds, struct casereader *reader) 
 {
   casereader_destroy (ds->source);
@@ -592,6 +583,8 @@
 
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+
+  return reader == NULL || !casereader_error (reader);
 }
 
 /* Returns true if an active file data source is available, false
@@ -602,6 +595,29 @@
   return ds->source != NULL;
 }
 
+/* Checks whether DS has a corrupted active file.  If so,
+   discards it and returns false.  If not, returns true without
+   doing anything. */
+bool
+dataset_end_of_command (struct dataset *ds) 
+{
+  if (ds->source != NULL) 
+    {
+      if (casereader_error (ds->source)) 
+        {
+          proc_discard_active_file (ds);
+          return false;
+        }
+      else 
+        {
+          struct taint *taint = casereader_get_taint (ds->source);
+          taint_reset_successor_taint (taint);
+          assert (!taint_has_tainted_successor (taint));
+        }
+    }
+  return true; 
+}
+
 static trns_proc_func case_limit_trns_proc;
 static trns_free_func case_limit_trns_free;
 

Index: data/procedure.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/procedure.h,v
retrieving revision 1.12.2.1
retrieving revision 1.12.2.2
diff -u -b -r1.12.2.1 -r1.12.2.2
--- data/procedure.h    19 Mar 2007 21:36:24 -0000      1.12.2.1
+++ data/procedure.h    4 May 2007 03:48:47 -0000       1.12.2.2
@@ -62,7 +62,7 @@
 void proc_discard_active_file (struct dataset *);
 void proc_set_active_file (struct dataset *,
                            struct casereader *, struct dictionary *);
-void proc_set_active_file_data (struct dataset *, struct casereader *);
+bool proc_set_active_file_data (struct dataset *, struct casereader *);
 bool proc_has_active_file (const struct dataset *ds);
 
 void proc_discard_output (struct dataset *ds);
@@ -72,7 +72,9 @@
 
 struct casereader *proc_open (struct dataset *);
 bool proc_is_open (const struct dataset *);
-bool proc_commit (struct dataset *);
+bool proc_commit (struct dataset *) WARN_UNUSED_RESULT;
+
+bool dataset_end_of_command (struct dataset *);
 
 struct dictionary *dataset_dict (const struct dataset *ds);
 

Index: data/scratch-writer.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/scratch-writer.c,v
retrieving revision 1.4.2.1
retrieving revision 1.4.2.2
diff -u -b -r1.4.2.1 -r1.4.2.2
--- data/scratch-writer.c       19 Mar 2007 21:36:24 -0000      1.4.2.1
+++ data/scratch-writer.c       4 May 2007 03:48:47 -0000       1.4.2.2
@@ -30,6 +30,7 @@
 #include <data/file-handle-def.h>
 #include <data/scratch-handle.h>
 #include <libpspp/compiler.h>
+#include <libpspp/taint.h>
 
 #include "xalloc.h"
 
@@ -39,7 +40,7 @@
     struct scratch_handle *handle;      /* Underlying scratch handle. */
     struct file_handle *fh;             /* Underlying file handle. */
     struct dict_compactor *compactor;   /* Compacts into handle->dictionary. */
-    struct casewriter *writer;          /* Data output. */
+    struct casewriter *subwriter;       /* Data output. */
   };
 
 static struct casewriter_class scratch_writer_casewriter_class;
@@ -56,6 +57,7 @@
   struct scratch_writer *writer;
   struct dictionary *scratch_dict;
   struct dict_compactor *compactor;
+  struct casewriter *casewriter;
 
   if (!fh_open (fh, FH_REF_SCRATCH, "scratch file", "we"))
     return NULL;
@@ -85,11 +87,14 @@
   writer->handle = sh;
   writer->fh = fh;
   writer->compactor = compactor;
-  writer->writer = autopaging_writer_create (dict_get_next_value_idx (
+  writer->subwriter = autopaging_writer_create (dict_get_next_value_idx (
                                                scratch_dict));
 
   fh_set_scratch_handle (fh, sh);
-  return casewriter_create (&scratch_writer_casewriter_class, writer);
+  casewriter = casewriter_create (&scratch_writer_casewriter_class, writer);
+  taint_propagate (casewriter_get_taint (writer->subwriter),
+                   casewriter_get_taint (casewriter));
+  return casewriter;
 }
 
 /* Writes case C to WRITER. */
@@ -108,36 +113,23 @@
     }
   else
     case_move (&tmp, c);
-  casewriter_write (writer->writer, &tmp);
+  casewriter_write (writer->subwriter, &tmp);
 }
 
-/* Returns true if an I/O error occurred on WRITER, false otherwise. */
-static bool
-scratch_writer_casewriter_error (const struct casewriter *w UNUSED,
-                                 void *writer_) 
-{
-  const struct scratch_writer *writer = writer_;
-  return casewriter_error (writer->writer);
-}
-
-/* Closes WRITER.
-   Returns true if successful, false if an I/O error occurred on WRITER. */
-static bool
+/* Closes WRITER. */
+static void
 scratch_writer_casewriter_destroy (struct casewriter *w UNUSED, void *writer_) 
 {
   struct scratch_writer *writer = writer_;
-  struct casereader *reader = casewriter_make_reader (writer->writer);
-  bool ok = !casereader_error (reader);
-  if (ok)
+  struct casereader *reader = casewriter_make_reader (writer->subwriter);
+  if (!casereader_error (reader))
     writer->handle->casereader = reader;
   fh_close (writer->fh, "scratch file", "we");
   free (writer);
-  return ok;
 }
 
 static struct casewriter_class scratch_writer_casewriter_class = 
   {
     scratch_writer_casewriter_write,
     scratch_writer_casewriter_destroy,
-    scratch_writer_casewriter_error,
   };

Index: data/sys-file-reader.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/sys-file-reader.c,v
retrieving revision 1.32.2.1
retrieving revision 1.32.2.2
diff -u -b -r1.32.2.1 -r1.32.2.2
--- data/sys-file-reader.c      19 Mar 2007 21:36:24 -0000      1.32.2.1
+++ data/sys-file-reader.c      4 May 2007 03:48:47 -0000       1.32.2.2
@@ -283,7 +283,7 @@
 
   pool_free (r->pool, var_by_value_idx);
   r->value_cnt = dict_get_next_value_idx (*dict);
-  return casereader_create (r->value_cnt, CASENUMBER_INVALID,
+  return casereader_create (NULL, r->value_cnt, CASENUMBER_MAX,
                             &sys_file_casereader_class, r);
 }
 
@@ -318,22 +318,12 @@
   return !error;
 }
 
-/* Returns true if an I/O error has occurred on READER, false
-   otherwise. */
-static bool
-sys_file_casereader_error (const struct casereader *reader UNUSED, void *r_) 
-{
-  struct sfm_reader *r = r_;
-  return r->error;
-}
-
-/* Destroys READER.  Returns true if successful, false if an I/O
-   error occurred, whether during destruction or earlier. */
-static bool
+/* Destroys READER. */
+static void
 sys_file_casereader_destroy (struct casereader *reader UNUSED, void *r_) 
 {
   struct sfm_reader *r = r_;
-  return close_reader (r);
+  close_reader (r);
 }
 
 /* Returns true if FILE is an SPSS system file,
@@ -1120,6 +1110,7 @@
   case_create (c, r->value_cnt);
   if (setjmp (r->bail_out)) 
     {
+      casereader_force_error (reader);
       case_destroy (c);
       return false; 
     }
@@ -1698,5 +1689,4 @@
   {
     sys_file_casereader_read,
     sys_file_casereader_destroy,
-    sys_file_casereader_error,
   };

Index: data/sys-file-writer.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/data/sys-file-writer.c,v
retrieving revision 1.22.2.1
retrieving revision 1.22.2.2
diff -u -b -r1.22.2.1 -r1.22.2.2
--- data/sys-file-writer.c      19 Mar 2007 21:36:24 -0000      1.22.2.1
+++ data/sys-file-writer.c      4 May 2007 03:48:47 -0000       1.22.2.2
@@ -935,12 +935,13 @@
 
 /* Writes case C to system file W. */
 static void
-sys_file_casewriter_write (struct casewriter *writer UNUSED, void *w_,
+sys_file_casewriter_write (struct casewriter *writer, void *w_,
                            struct ccase *c)
 {
   struct sfm_writer *w = w_;
   if (ferror (w->file)) 
     {
+      casewriter_force_error (writer);
       case_destroy (c);
       return; 
     }
@@ -1006,18 +1007,12 @@
   case_destroy (c);
 }
 
-static bool
-sys_file_casewriter_destroy (struct casewriter *writer UNUSED, void *w_) 
+static void
+sys_file_casewriter_destroy (struct casewriter *writer, void *w_) 
 {
   struct sfm_writer *w = w_;
-  return close_writer (w);
-}
-
-static bool
-sys_file_casewriter_error (const struct casewriter *writer UNUSED, void *w_) 
-{
-  const struct sfm_writer *w = w_;
-  return write_error (w);
+  if (!close_writer (w))
+    casewriter_force_error (writer);
 }
 
 static void
@@ -1143,5 +1138,4 @@
   {
     sys_file_casewriter_write,
     sys_file_casewriter_destroy,
-    sys_file_casewriter_error,
   };

Index: language/command.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/command.c,v
retrieving revision 1.23.2.1
retrieving revision 1.23.2.2
diff -u -b -r1.23.2.1 -r1.23.2.2
--- language/command.c  19 Mar 2007 21:36:24 -0000      1.23.2.1
+++ language/command.c  4 May 2007 03:48:47 -0000       1.23.2.2
@@ -151,6 +151,8 @@
   assert (!proc_is_open (ds));
   unset_cmd_algorithm ();
   dict_clear_aux (dataset_dict (ds));
+  if (!dataset_end_of_command (ds))
+    result = CMD_CASCADING_FAILURE;
 
   return result;
 }

Index: language/data-io/data-list.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/data-io/data-list.c,v
retrieving revision 1.30.2.1
retrieving revision 1.30.2.2
diff -u -b -r1.30.2.1 -r1.30.2.2
--- language/data-io/data-list.c        19 Mar 2007 21:36:24 -0000      1.30.2.1
+++ language/data-io/data-list.c        4 May 2007 03:48:47 -0000       1.30.2.2
@@ -279,7 +279,8 @@
     add_transformation (ds, data_list_trns_proc, data_list_trns_free, dls);
   else 
     proc_set_active_file (ds,
-                          casereader_create (dict_get_next_value_idx (dict),
+                          casereader_create (NULL,
+                                             dict_get_next_value_idx (dict),
                                              -1, &data_list_casereader_class,
                                              dls),
                           dict);
@@ -839,16 +840,14 @@
   return ok;
 }
 
-/* Destroys the casereader.
-   Returns true if successful read, false if an I/O occurred
-   during destruction or previously. */
-static bool
+/* Destroys the casereader. */
+static void
 data_list_casereader_destroy (struct casereader *reader UNUSED, void *dls_)
 {
   struct data_list_pgm *dls = dls_;
-  bool ok = !dfm_reader_error (dls->reader);
+  if (dfm_reader_error (dls->reader))
+    casereader_force_error (reader);
   data_list_trns_free (dls);
-  return ok;
 }
 
 static const struct casereader_class data_list_casereader_class =

Index: language/data-io/data-reader.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/data-io/data-reader.c,v
retrieving revision 1.20.2.1
retrieving revision 1.20.2.2
diff -u -b -r1.20.2.1 -r1.20.2.2
--- language/data-io/data-reader.c      19 Mar 2007 21:36:24 -0000      1.20.2.1
+++ language/data-io/data-reader.c      4 May 2007 03:48:47 -0000       1.20.2.2
@@ -445,8 +445,8 @@
 
   /* Input procedure reads from inline file. */
   prompt_set_style (PROMPT_DATA);
-  ok = casereader_destroy (proc_open (ds));
-  ok = proc_commit (ds) && ok;
+  casereader_destroy (proc_open (ds));
+  ok = proc_commit (ds);
   dfm_close_reader (r);
 
   return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;

Index: language/data-io/get.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/data-io/get.c,v
retrieving revision 1.29.2.2
retrieving revision 1.29.2.3
diff -u -b -r1.29.2.2 -r1.29.2.3
--- language/data-io/get.c      22 Apr 2007 20:40:51 -0000      1.29.2.2
+++ language/data-io/get.c      4 May 2007 03:48:47 -0000       1.29.2.3
@@ -43,9 +43,9 @@
 #include <libpspp/compiler.h>
 #include <libpspp/hash.h>
 #include <libpspp/message.h>
-#include <libpspp/message.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
+#include <libpspp/taint.h>
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
@@ -382,12 +382,11 @@
 {
   bool retain_unselected;
   struct variable *saved_filter_variable;
-  struct casereader *input;
   struct casewriter *output;
-  struct ccase c;
-  bool ok = true;
+  bool ok;
 
-  output = parse_write_command (lexer, ds, writer_type, PROC_CMD, 
&retain_unselected);
+  output = parse_write_command (lexer, ds, writer_type, PROC_CMD,
+                                &retain_unselected);
   if (output == NULL) 
     return CMD_CASCADING_FAILURE;
 
@@ -395,11 +394,8 @@
   if (retain_unselected) 
     dict_set_filter (dataset_dict (ds), NULL);
 
-  input = proc_open (ds);
-  while (casereader_read (input, &c))
-    casewriter_write (output, &c);
-  ok = casereader_destroy (input) && ok;
-  ok = casewriter_destroy (output) && ok;
+  casereader_transfer (proc_open (ds), output);
+  ok = casewriter_destroy (output);
   ok = proc_commit (ds) && ok;
 
   dict_set_filter (dataset_dict (ds), saved_filter_variable);
@@ -708,7 +704,7 @@
     struct variable **prev_BY;  /* Last set of BY variables. */
   };
 
-static bool mtf_free (struct mtf_proc *);
+static void mtf_free (struct mtf_proc *);
 
 static bool mtf_close_all_files (struct mtf_proc *);
 static bool mtf_merge_dictionary (struct dictionary *const, struct mtf_file *);
@@ -734,6 +730,8 @@
   char first_name[LONG_NAME_LEN + 1] = "";
   char last_name[LONG_NAME_LEN + 1] = "";
 
+  struct taint *taint = NULL;
+
   size_t i;
 
   ll_init (&mtf.files);
@@ -985,7 +983,12 @@
       || !create_flag_var ("LAST", last_name, mtf.dict, &mtf.last))
     goto error;
 
+  dict_compact_values (mtf.dict);
+  mtf.output = autopaging_writer_create (dict_get_next_value_idx (mtf.dict));
+  taint = taint_clone (casewriter_get_taint (mtf.output));
+
   ll_for_each (file, struct mtf_file, ll, &mtf.files)
+    {
     if (file->reader == NULL) 
       {
         if (active_file == NULL) 
@@ -996,9 +999,8 @@
         else
           file->reader = casereader_clone (active_file);
       }
-
-  dict_compact_values (mtf.dict);
-  mtf.output = autopaging_writer_create (dict_get_next_value_idx (mtf.dict));
+      taint_propagate (casereader_get_taint (file->reader), taint);
+    }
 
   ll_for_each_safe (file, next, struct mtf_file, ll, &mtf.files)
     mtf_read_record (&mtf, file);
@@ -1011,8 +1013,7 @@
       casewriter_write (mtf.output, &mtf.buffered_case);
       case_nullify (&mtf.buffered_case);
     }
-  if (!mtf_close_all_files (&mtf))
-    goto error;
+  mtf_close_all_files (&mtf);
   if (active_file != NULL)
     proc_commit (ds);
 
@@ -1020,12 +1021,15 @@
   mtf.dict = NULL;
   mtf.output = NULL;
 
-  return mtf_free (&mtf) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
+  mtf_free (&mtf);
+
+  return taint_destroy (taint) ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
 
  error:
   if (active_file != NULL)
     proc_commit (ds);
   mtf_free (&mtf);
+  taint_destroy (taint);
   return CMD_CASCADING_FAILURE;
 }
 
@@ -1059,23 +1063,15 @@
   return true;
 }
 
-/* Return a string in a static buffer describing V's variable type and
-   width. */
+/* Return a string in an allocated buffer describing V's variable
+   type and width. */
 static char *
 var_type_description (struct variable *v)
 {
-  static char buf[2][32];
-  static int x = 0;
-  char *s;
-
-  x ^= 1;
-  s = buf[x];
-
   if (var_is_numeric (v))
-    strcpy (s, "numeric");
+    return xstrdup ("numeric");
   else
-    sprintf (s, "string with width %d", var_get_width (v));
-  return s;
+    return xasprintf ("string with width %d", var_get_width (v));
 }
 
 /* Closes all the files in MTF and frees their associated data.
@@ -1089,8 +1085,7 @@
 
   ll_for_each_preremove (file, struct mtf_file, ll, &mtf->files)
     {
-      if (!casereader_destroy (file->reader))
-        ok = false;
+      casereader_destroy (file->reader);
       free (file->by);
       dict_destroy (file->dict);
       free (file->in_name);
@@ -1102,22 +1097,15 @@
   return ok;
 }
 
-/* Free all the data for the MATCH FILES procedure.
-   Returns true if successful, false if an I/O error
-   occurred. */
-static bool
+/* Frees all the data for the MATCH FILES procedure. */
+static void
 mtf_free (struct mtf_proc *mtf)
 {
-  bool ok;
-
-  ok = mtf_close_all_files (mtf);
-
+  mtf_close_all_files (mtf);
   dict_destroy (mtf->dict);
   casewriter_destroy (mtf->output);
   case_destroy (&mtf->buffered_case);
   case_destroy (&mtf->prev_BY_case);
-
-  return ok;
 }
 
 /* Reads the next record into FILE, if possible, and update MTF's
@@ -1313,11 +1301,15 @@
         {
           if (var_get_width (mv) != var_get_width (dv)) 
             {
+              char *dv_description = var_type_description (dv);
+              char *mv_description = var_type_description (mv);
               msg (SE, _("Variable %s in file %s (%s) has different "
                          "type or width from the same variable in "
                          "earlier file (%s)."),
                    var_get_name (dv), fh_get_name (f->handle),
-                   var_type_description (dv), var_type_description (mv));
+                   dv_description, mv_description);
+              free (dv_description);
+              free (mv_description);
               return false;
             }
         

Index: language/data-io/inpt-pgm.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/data-io/inpt-pgm.c,v
retrieving revision 1.23.2.1
retrieving revision 1.23.2.2
diff -u -b -r1.23.2.1 -r1.23.2.2
--- language/data-io/inpt-pgm.c 19 Mar 2007 21:36:24 -0000      1.23.2.1
+++ language/data-io/inpt-pgm.c 4 May 2007 03:48:47 -0000       1.23.2.2
@@ -157,7 +157,7 @@
   inp->value_cnt = dict_get_next_value_idx (dataset_dict (ds));
   
   proc_set_active_file_data
-    (ds, casereader_create (inp->value_cnt, CASENUMBER_INVALID,
+    (ds, casereader_create (NULL, inp->value_cnt, CASENUMBER_MAX,
                             &input_program_casereader_class, inp));
 
   return CMD_SUCCESS;
@@ -224,16 +224,14 @@
     }
 }
 
-/* Destroys the casereader.
-   Returns true if successful read, false if an I/O occurred
-   during destruction or previously. */
-static bool
+/* Destroys the casereader. */
+static void
 input_program_casereader_destroy (struct casereader *reader UNUSED, void *inp_)
 {
   struct input_program_pgm *inp = inp_;
-  bool ok = inp->restart != TRNS_ERROR;
+  if (inp->restart == TRNS_ERROR)
+    casereader_force_error (reader);
   destroy_input_program (inp);
-  return ok;
 }
 
 static const struct casereader_class input_program_casereader_class =

Index: language/data-io/list.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/data-io/list.q,v
retrieving revision 1.24.2.1
retrieving revision 1.24.2.2
diff -u -b -r1.24.2.1 -r1.24.2.2
--- language/data-io/list.q     19 Mar 2007 21:36:24 -0000      1.24.2.1
+++ language/data-io/list.q     4 May 2007 03:48:47 -0000       1.24.2.2
@@ -248,7 +248,7 @@
         }
     }
   ok = casegrouper_destroy (grouper);
-  proc_commit (ds);
+  ok = proc_commit (ds) && ok;
 
   ds_destroy(&line_buffer);
 
@@ -267,7 +267,8 @@
   struct outp_driver *d;
   struct ccase c;
 
-  casereader_peek (input, 0, &c);
+  if (!casereader_peek (input, 0, &c))
+    return;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 

Index: language/dictionary/apply-dictionary.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/dictionary/apply-dictionary.c,v
retrieving revision 1.13.2.1
retrieving revision 1.13.2.2
diff -u -b -r1.13.2.1 -r1.13.2.2
--- language/dictionary/apply-dictionary.c      19 Mar 2007 21:36:24 -0000      
1.13.2.1
+++ language/dictionary/apply-dictionary.c      4 May 2007 03:48:48 -0000       
1.13.2.2
@@ -137,7 +137,5 @@
         dict_set_weight (dataset_dict (ds), new_weight);
     }
   
-  casereader_destroy (reader);
-
   return lex_end_of_command (lexer);
 }

Index: language/dictionary/delete-variables.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/dictionary/delete-variables.c,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- language/dictionary/delete-variables.c      19 Mar 2007 21:36:24 -0000      
1.1.2.1
+++ language/dictionary/delete-variables.c      4 May 2007 03:48:48 -0000       
1.1.2.2
@@ -53,10 +53,10 @@
     }
 
   ok = casereader_destroy (proc_open (ds));
-  dict_delete_vars (dataset_dict (ds), vars, var_cnt);
   ok = proc_commit (ds) && ok;
   if (!ok)
     goto error;
+  dict_delete_vars (dataset_dict (ds), vars, var_cnt);
   
   free (vars);
   

Index: language/stats/aggregate.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/aggregate.c,v
retrieving revision 1.30.2.1
retrieving revision 1.30.2.2
diff -u -b -r1.30.2.1 -r1.30.2.2
--- language/stats/aggregate.c  19 Mar 2007 21:36:24 -0000      1.30.2.1
+++ language/stats/aggregate.c  4 May 2007 03:48:48 -0000       1.30.2.2
@@ -292,7 +292,8 @@
     {
       struct ccase c;
       
-      casereader_peek (group, 0, &c);
+      if (!casereader_peek (group, 0, &c))
+        continue;
       initialize_aggregate_info (&agr, &c);
       case_destroy (&c);
 
@@ -303,7 +304,12 @@
   if (!casegrouper_destroy (grouper))
     goto error;
 
-  proc_commit (ds);
+  if (!proc_commit (ds)) 
+    {
+      input = NULL;
+      goto error;
+    }
+  input = NULL;
 
   if (out_file == NULL) 
     {

Index: language/stats/autorecode.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/autorecode.c,v
retrieving revision 1.18.2.1
retrieving revision 1.18.2.2
diff -u -b -r1.18.2.1 -r1.18.2.2
--- language/stats/autorecode.c 19 Mar 2007 21:36:24 -0000      1.18.2.1
+++ language/stats/autorecode.c 4 May 2007 03:48:48 -0000       1.18.2.2
@@ -210,7 +210,7 @@
           }
       }
   ok = casereader_destroy (input);
-  proc_commit (ds);
+  ok = proc_commit (ds) && ok;
 
   for (i = 0; i < arc.var_cnt; i++)
     arc.dst_vars[i] = dict_create_var_assert (dataset_dict (ds),

Index: language/stats/binomial.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/binomial.c,v
retrieving revision 1.2.2.1
retrieving revision 1.2.2.2
diff -u -b -r1.2.2.1 -r1.2.2.2
--- language/stats/binomial.c   19 Mar 2007 21:36:24 -0000      1.2.2.1
+++ language/stats/binomial.c   4 May 2007 03:48:48 -0000       1.2.2.2
@@ -88,7 +88,7 @@
   return sig1tailed ;
 }
 
-static void
+static bool
 do_binomial (const struct dictionary *dict,
             struct casereader *input,
             const struct binomial_test *bst,
@@ -105,32 +105,30 @@
   while (casereader_read(input, &c))
     {
       int v;
-      double w =
-       dict_get_case_weight (dict, &c, &warn);
+      double w = dict_get_case_weight (dict, &c, &warn);
 
       for (v = 0 ; v < ost->n_vars ; ++v )
        {
          const struct variable *var = ost->vars[v];
          const union value *value = case_data (&c, var);
+          int width = var_get_width (var);
 
          if (var_is_value_missing (var, value, exclude))
            break;
 
          if ( NULL == cat1[v].value )
            {
-             cat1[v].value = value_dup (value, var_get_width (var));
+             cat1[v].value = value_dup (value, width);
              cat1[v].count = w;
            }
-         else if ( 0 == compare_values (cat1[v].value, value,
-                                        var_get_width (var)))
+         else if ( 0 == compare_values (cat1[v].value, value, width))
            cat1[v].count += w;
          else if ( NULL == cat2[v].value )
            {
-             cat2[v].value = value_dup (value, var_get_width (var));
+             cat2[v].value = value_dup (value, width);
              cat2[v].count = w;
            }
-         else if ( 0 == compare_values (cat2[v].value, value,
-                                        var_get_width (var)))
+         else if ( 0 == compare_values (cat2[v].value, value, width))
            cat2[v].count += w;
          else if ( bst->category1 == SYSMIS)
            msg (ME, _("Variable %s is not dichotomous"), var_get_name (var));
@@ -138,7 +136,7 @@
 
       case_destroy (&c);
     }
-  casereader_destroy (input);
+  return casereader_destroy (input);
 }
 
 
@@ -155,7 +153,6 @@
 
   struct freq_mutable *cat1 = xzalloc (sizeof (*cat1) * ost->n_vars);
   struct freq_mutable *cat2 = xzalloc (sizeof (*cat1) * ost->n_vars);
-  struct tab_table *table ;
 
   assert ((bst->category1 == SYSMIS) == (bst->category2 == SYSMIS) );
 
@@ -173,9 +170,9 @@
       cat2->value = value_dup (&v, 0);
     }
 
-  do_binomial (dataset_dict(ds), input, bst, cat1, cat2, exclude);
-
-  table = tab_create (7, ost->n_vars * 3 + 1, 0);
+  if (do_binomial (dataset_dict(ds), input, bst, cat1, cat2, exclude)) 
+    {
+      struct tab_table *table = tab_create (7, ost->n_vars * 3 + 1, 0);
 
   tab_dim (table, tab_natural_dimensions);
 
@@ -193,17 +190,10 @@
       tab_hline (table, TAL_1, 0, tab_nc (table) -1, 1 + v * 3);
 
       /* Titles */
-      tab_text (table, 0, 1 + v * 3, TAB_LEFT,
-               var_to_string (var));
-
-      tab_text (table, 1, 1 + v * 3, TAB_LEFT,
-               _("Group1"));
-
-      tab_text (table, 1, 2 + v * 3, TAB_LEFT,
-               _("Group2"));
-
-      tab_text (table, 1, 3 + v * 3, TAB_LEFT,
-               _("Total"));
+          tab_text (table, 0, 1 + v * 3, TAB_LEFT, var_to_string (var));
+          tab_text (table, 1, 1 + v * 3, TAB_LEFT, _("Group1"));
+          tab_text (table, 1, 2 + v * 3, TAB_LEFT, _("Group2"));
+          tab_text (table, 1, 3 + v * 3, TAB_LEFT, _("Total"));
 
       /* Test Prop */
       tab_float (table, 5, 1 + v * 3, TAB_NONE, bst->p, 8, 3);
@@ -211,41 +201,27 @@
       /* Category labels */
       tab_text (table, 2, 1 + v * 3, TAB_NONE,
                var_get_value_name (var, cat1[v].value));
-
       tab_text (table, 2, 2 + v * 3, TAB_NONE,
                var_get_value_name (var, cat2[v].value));
 
       /* Observed N */
-      tab_float (table, 3, 1 + v * 3, TAB_NONE,
-                cat1[v].count, 8, 0);
-
-      tab_float (table, 3, 2 + v * 3, TAB_NONE,
-                cat2[v].count, 8, 0);
+          tab_float (table, 3, 1 + v * 3, TAB_NONE, cat1[v].count, 8, 0);
+          tab_float (table, 3, 2 + v * 3, TAB_NONE, cat2[v].count, 8, 0);
 
       n_total = cat1[v].count + cat2[v].count;
-
-
-      tab_float (table, 3, 3 + v * 3, TAB_NONE,
-                n_total, 8, 0);
+          tab_float (table, 3, 3 + v * 3, TAB_NONE, n_total, 8, 0);
 
       /* Observed Proportions */
-
       tab_float (table, 4, 1 + v * 3, TAB_NONE,
                 cat1[v].count / n_total, 8, 3);
-
       tab_float (table, 4, 2 + v * 3, TAB_NONE,
                 cat2[v].count / n_total, 8, 3);
-
       tab_float (table, 4, 3 + v * 3, TAB_NONE,
                 (cat1[v].count + cat2[v].count) / n_total, 8, 2);
 
-
       /* Significance */
-      sig = calculate_binomial (cat1[v].count, cat2[v].count,
-                                      bst->p);
-
-      tab_float (table, 6, 1 + v * 3, TAB_NONE,
-                sig, 8, 3);
+          sig = calculate_binomial (cat1[v].count, cat2[v].count, bst->p);
+          tab_float (table, 6, 1 + v * 3, TAB_NONE, sig, 8, 3);
     }
 
   tab_text (table,  2, 0,  TAB_CENTER, _("Category"));
@@ -258,6 +234,8 @@
            bst->p == 0.5 ? 2: 1);
 
   tab_vline (table, TAL_2, 2, 0, tab_nr (table) -1);
+      tab_submit (table);
+    }
 
   for (v = 0; v < ost->n_vars; v++) 
     {
@@ -266,7 +244,4 @@
     }
   free (cat1);
   free (cat2); 
-
-  tab_submit (table);
-
 }

Index: language/stats/chisquare.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/chisquare.c,v
retrieving revision 1.3.2.1
retrieving revision 1.3.2.2
diff -u -b -r1.3.2.1 -r1.3.2.2
--- language/stats/chisquare.c  19 Mar 2007 21:36:24 -0000      1.3.2.1
+++ language/stats/chisquare.c  4 May 2007 03:48:48 -0000       1.3.2.2
@@ -17,38 +17,33 @@
    02110-1301, USA. */
 
 #include <config.h>
-#include <libpspp/compiler.h>
-#include <libpspp/assertion.h>
+
+#include <language/stats/chisquare.h>
 
 #include <stdlib.h>
+#include <math.h>
 
 #include <data/case.h>
 #include <data/casereader.h>
-#include <data/variable.h>
 #include <data/dictionary.h>
 #include <data/procedure.h>
-
-#include <libpspp/message.h>
-#include <libpspp/hash.h>
+#include <data/value-labels.h>
+#include <data/variable.h>
+#include <language/stats/freq.h>
+#include <language/stats/npar.h>
 #include <libpspp/alloc.h>
-
-#include <gsl/gsl_cdf.h>
-
+#include <libpspp/assertion.h>
+#include <libpspp/compiler.h>
+#include <libpspp/hash.h>
+#include <libpspp/message.h>
+#include <libpspp/taint.h>
 #include <output/table.h>
-#include <data/value-labels.h>
-
-#include "npar.h"
-#include "chisquare.h"
-#include "freq.h"
 
-#include <math.h>
+#include <gsl/gsl_cdf.h>
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
 
-
-
-
 /* Return a hash table containing the frequency counts of each 
    value of VAR in CF .
    It is the caller's responsibility to free the hash table when 
@@ -115,9 +110,13 @@
 
       case_destroy (&c);
     }
-  casereader_destroy (input);
-
+  if (casereader_destroy (input))
   return freq_hash;
+  else 
+    {
+      hsh_destroy (freq_hash);
+      return NULL;
+    }
 }
 
 
@@ -159,9 +158,13 @@
           fr->value = value_dup (fr->value, var_get_width (var));
        }
     }
-  casereader_destroy (input);
-
+  if (casereader_destroy (input))
   return freq_hash;
+  else
+    {
+      hsh_destroy (freq_hash);
+      return NULL;
+    }
 }
 
 
@@ -181,6 +184,8 @@
   const struct variable *var =  ost->vars[v];
 
   *freq_hash = create_freq_hash (dict, input, var);
+  if (*freq_hash == NULL)
+    return NULL;
       
   n_cells = hsh_count (*freq_hash);
 
@@ -320,12 +325,12 @@
   int v, i;
   struct one_sample_test *ost = (struct one_sample_test *) test;
   struct chisquare_test *cst = (struct chisquare_test *) test;
-  struct tab_table *stats_table = create_stats_table (cst);
   int n_cells = 0;
   double total_expected = 0.0;
 
   double *df = xzalloc (sizeof (*df) * ost->n_vars);
   double *xsq = xzalloc (sizeof (*df) * ost->n_vars);
+  bool ok;
   
   for ( i = 0 ; i < cst->n_expected ; ++i ) 
     total_expected += cst->expected[i];
@@ -342,13 +347,11 @@
          struct tab_table *freq_table = 
             create_variable_frequency_table(dict, reader, cst, v, &freq_hash);
 
-         struct freq **ff = (struct freq **) hsh_sort (freq_hash);
+         struct freq **ff;
 
          if ( NULL == freq_table ) 
-           {
-             hsh_destroy (freq_hash);
              continue;
-           }
+          ff = (struct freq **) hsh_sort (freq_hash);
 
          n_cells = hsh_count (freq_hash);
 
@@ -410,8 +413,12 @@
            create_freq_hash_with_range (dict, reader,
                                          ost->vars[v], cst->lo, cst->hi);
 
-         struct freq **ff = (struct freq **) hsh_sort (freq_hash);
+         struct freq **ff;
 
+          if (freq_hash == NULL)
+            continue;
+
+          ff = (struct freq **) hsh_sort (freq_hash);
          assert ( n_cells == hsh_count (freq_hash));
 
          for ( i = 0 ; i < hsh_count (freq_hash) ; ++i ) 
@@ -459,8 +466,13 @@
 
       tab_submit (freq_table);
     }
+  ok = !taint_has_tainted_successor (casereader_get_taint (input));
   casereader_destroy (input);
 
+  if (ok) 
+    {
+      struct tab_table *stats_table = create_stats_table (cst);
+      
   /* Populate the summary statistics table */
   for ( v = 0 ; v < ost->n_vars ; ++v ) 
     {
@@ -474,10 +486,10 @@
       tab_float (stats_table, 1 + v, 3, TAB_NONE, 
                 gsl_cdf_chisq_Q (xsq[v], df[v]), 8,3);
     }
+      tab_submit (stats_table);
+    }
 
   free (xsq);
   free (df);
-
-  tab_submit (stats_table);
 }
 

Index: language/stats/chisquare.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/chisquare.h,v
retrieving revision 1.2.2.1
retrieving revision 1.2.2.2
diff -u -b -r1.2.2.1 -r1.2.2.2
--- language/stats/chisquare.h  19 Mar 2007 21:36:24 -0000      1.2.2.1
+++ language/stats/chisquare.h  4 May 2007 03:48:48 -0000       1.2.2.2
@@ -19,11 +19,10 @@
 #if !chisquare_h
 #define chisquare_h 1
 
-#include <config.h>
 #include <stddef.h>
 #include <stdbool.h>
+#include <language/stats/npar.h>
 
-#include "npar.h"
 struct chisquare_test
 {
   struct one_sample_test parent;  
@@ -40,6 +39,7 @@
 struct casereader;
 struct dictionary;
 struct hsh_table;
+struct dataset;
 
 void chisquare_insert_variables (const struct npar_test *test,
                                 struct hsh_table *variables);

Index: language/stats/crosstabs.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/crosstabs.q,v
retrieving revision 1.27.2.2
retrieving revision 1.27.2.3
diff -u -b -r1.27.2.2 -r1.27.2.3
--- language/stats/crosstabs.q  20 Mar 2007 00:08:50 -0000      1.27.2.2
+++ language/stats/crosstabs.q  4 May 2007 03:48:48 -0000       1.27.2.3
@@ -207,6 +207,7 @@
 {
   struct casegrouper *grouper;
   struct casereader *input, *group;
+  bool ok;
   int i;
 
   variables = NULL;
@@ -317,10 +318,10 @@
 
       postcalc ();
     }
-  casegrouper_destroy (grouper);
-  proc_commit (ds);
+  ok = casegrouper_destroy (grouper);
+  ok = proc_commit (ds) && ok;
 
-  return true ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
+  return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
 }
 
 /* Parses the TABLES subcommand. */
@@ -516,7 +517,8 @@
 {
   struct ccase c;
 
-  casereader_peek (input, 0, &c);
+  if (!casereader_peek (input, 0, &c))
+    return;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 

Index: language/stats/descriptives.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/descriptives.c,v
retrieving revision 1.22.2.1
retrieving revision 1.22.2.2
diff -u -b -r1.22.2.1 -r1.22.2.2
--- language/stats/descriptives.c       19 Mar 2007 21:36:24 -0000      1.22.2.1
+++ language/stats/descriptives.c       4 May 2007 03:48:48 -0000       1.22.2.2
@@ -700,7 +700,8 @@
   struct ccase c;
   size_t i;
 
-  casereader_peek (group, 0, &c);
+  if (!casereader_peek (group, 0, &c))
+    return;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 
@@ -758,7 +759,8 @@
             dv->max = x;
         }
     }
-  casereader_destroy (pass1);
+  if (!casereader_destroy (pass1))
+    return;
 
   /* Second pass for higher-order moments. */
   if (dsc->max_moment > MOMENT_MEAN) 
@@ -783,7 +785,8 @@
                 moments_pass_two (dv->moments, x, weight);
             }
         }
-      casereader_destroy (pass2);
+      if (!casereader_destroy (pass2))
+        return;
     }
   
   /* Calculate results. */

Index: language/stats/examine.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/examine.q,v
retrieving revision 1.22.2.1
retrieving revision 1.22.2.2
diff -u -b -r1.22.2.1 -r1.22.2.2
--- language/stats/examine.q    19 Mar 2007 21:36:24 -0000      1.22.2.1
+++ language/stats/examine.q    4 May 2007 03:48:48 -0000       1.22.2.2
@@ -718,10 +718,12 @@
   casenumber case_no;
   struct ccase c;
   int v;
+  bool ok;
 
   struct factor *fctr;
 
-  casereader_peek (input, 0, &c);
+  if (!casereader_peek (input, 0, &c))
+    return;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 
@@ -789,7 +791,7 @@
 
       factor_calc (&c, case_no, weight, case_missing);
     }
-  casereader_destroy (input);
+  ok = casereader_destroy (input);
 
   for ( v = 0 ; v < n_dependent_vars ; ++v)
     {
@@ -885,6 +887,7 @@
       fctr = fctr->next;
     }
 
+  if (ok)
   output_examine ();
 
 

Index: language/stats/flip.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/flip.c,v
retrieving revision 1.22.2.1
retrieving revision 1.22.2.2
diff -u -b -r1.22.2.1 -r1.22.2.2
--- language/stats/flip.c       19 Mar 2007 21:36:24 -0000      1.22.2.1
+++ language/stats/flip.c       4 May 2007 03:48:48 -0000       1.22.2.2
@@ -96,6 +96,7 @@
   union value *output_buf;
   struct ccase c;
   size_t i;
+  bool ok;
 
   if (proc_make_temporary_transformations_permanent (ds))
     msg (SW, _("FLIP ignores TEMPORARY.  "
@@ -183,11 +184,11 @@
       write_flip_case (flip, &c);
       case_destroy (&c);
     }
-  casereader_destroy (input);
-  proc_commit (ds);
+  ok = casereader_destroy (input);
+  ok = proc_commit (ds) && ok;
 
   /* Flip the data we read. */
-  if (!flip_file (flip)) 
+  if (!ok || !flip_file (flip)) 
     {
       proc_discard_active_file (ds);
       goto error;
@@ -204,7 +205,8 @@
 
   /* Set up flipped data for reading. */
   proc_set_active_file_data (ds,
-                             casereader_create (dict_get_next_value_idx (dict),
+                             casereader_create (NULL,
+                                                dict_get_next_value_idx (dict),
                                                 flip->case_cnt,
                                                 &flip_casereader_class, flip));
   return lex_end_of_command (lexer);
@@ -531,13 +533,13 @@
 /* Destroys the source.
    Returns true if successful read, false if an I/O occurred
    during destruction or previously. */
-static bool
+static void
 flip_casereader_destroy (struct casereader *reader UNUSED, void *flip_)
 {
   struct flip_pgm *flip = flip_;
-  bool ok = !flip->error;
+  if (flip->error)
+    casereader_force_error (reader);
   destroy_flip_pgm (flip);
-  return ok;
 }
 
 static const struct casereader_class flip_casereader_class = 

Index: language/stats/frequencies.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/frequencies.q,v
retrieving revision 1.29.2.1
retrieving revision 1.29.2.2
diff -u -b -r1.29.2.1 -r1.29.2.2
--- language/stats/frequencies.q        19 Mar 2007 21:36:24 -0000      1.29.2.1
+++ language/stats/frequencies.q        4 May 2007 03:48:48 -0000       1.29.2.2
@@ -321,6 +321,7 @@
 {
   struct casegrouper *grouper;
   struct casereader *input, *group;
+  bool ok;
   int i;
 
   n_percentiles = 0;
@@ -388,7 +389,8 @@
   input = casereader_create_filter_weight (proc_open (ds), dataset_dict (ds),
                                            NULL, NULL);
   grouper = casegrouper_create_splits (input, dataset_dict (ds));
-  while (casegrouper_get_next_group (grouper, &group)) 
+  for (; casegrouper_get_next_group (grouper, &group);
+       casereader_destroy (group)) 
     {
       struct ccase c;
       
@@ -396,15 +398,13 @@
       for (; casereader_read (group, &c); case_destroy (&c)) 
         calc (&c, ds);
       postcalc ();
-
-      casereader_destroy (group);
     }
-  casegrouper_destroy (grouper);
-  proc_commit (ds);
+  ok = casegrouper_destroy (grouper);
+  ok = proc_commit (ds) && ok;
 
   free_frequencies(&cmd);
 
-  return true ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
+  return ok ? CMD_SUCCESS : CMD_CASCADING_FAILURE;
 }
 
 /* Figure out which charts the user requested.  */
@@ -577,7 +577,8 @@
   struct ccase c;
   size_t i;
 
-  casereader_peek (input, 0, &c);
+  if (!casereader_peek (input, 0, &c))
+    return;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 

Index: language/stats/npar.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/npar.h,v
retrieving revision 1.2.2.1
retrieving revision 1.2.2.2
diff -u -b -r1.2.2.1 -r1.2.2.2
--- language/stats/npar.h       19 Mar 2007 21:36:24 -0000      1.2.2.1
+++ language/stats/npar.h       4 May 2007 03:48:48 -0000       1.2.2.2
@@ -19,11 +19,15 @@
 #if !npar_h
 #define npar_h 1
 
-typedef struct variable *var_ptr;
-typedef var_ptr variable_pair[2];
+#include <stddef.h>
+#include <data/missing-values.h>
+
+typedef struct variable *variable_pair[2];
 
 struct hsh_table;
-struct casefilter ;
+struct casefilter;
+struct casereader;
+struct dataset;
 
 struct npar_test
 {

Index: language/stats/npar.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/npar.q,v
retrieving revision 1.3.2.1
retrieving revision 1.3.2.2
diff -u -b -r1.3.2.1 -r1.3.2.2
--- language/stats/npar.q       19 Mar 2007 21:36:24 -0000      1.3.2.1
+++ language/stats/npar.q       4 May 2007 03:48:48 -0000       1.3.2.2
@@ -20,23 +20,25 @@
 
 #include <config.h>
 
-#include <language/lexer/lexer.h>
-#include <language/lexer/variable-parser.h>
-#include <language/command.h>
-#include <data/procedure.h>
-#include <libpspp/pool.h>
-#include <libpspp/hash.h>
+#include <language/stats/npar.h>
+
+#include <math.h>
 
 #include <data/case.h>
 #include <data/casegrouper.h>
 #include <data/casereader.h>
-#include <math/moments.h>
 #include <data/dictionary.h>
-#include <language/stats/chisquare.h>
+#include <data/procedure.h>
+#include <language/command.h>
+#include <language/lexer/lexer.h>
+#include <language/lexer/variable-parser.h>
 #include <language/stats/binomial.h>
-#include <math.h>
+#include <language/stats/chisquare.h>
+#include <libpspp/hash.h>
+#include <libpspp/pool.h>
+#include <libpspp/taint.h>
+#include <math/moments.h>
 
-#include "npar.h"
 #include "npar-summary.h"
 
 #include "gettext.h"
@@ -115,7 +117,8 @@
                                       specs->filter);
     }
 
-  if ( specs->descriptives || specs->quartiles ) 
+  if ( (specs->descriptives || specs->quartiles)
+       && !taint_has_tainted_successor (casereader_get_taint (input)) ) 
     do_summary_box (summary_descriptives, specs->vv, specs->n_vars );
 
   free (summary_descriptives);
@@ -193,7 +196,7 @@
   while (casegrouper_get_next_group (grouper, &group))
     npar_execute (group, &npar_specs, ds);
   ok = casegrouper_destroy (grouper);
-  proc_commit (ds);
+  ok = proc_commit (ds) && ok;
 
   hsh_destroy (var_hash);
 

Index: language/stats/oneway.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/oneway.q,v
retrieving revision 1.21.2.1
retrieving revision 1.21.2.2
diff -u -b -r1.21.2.1 -r1.21.2.2
--- language/stats/oneway.q     19 Mar 2007 21:36:24 -0000      1.21.2.1
+++ language/stats/oneway.q     4 May 2007 03:48:48 -0000       1.21.2.2
@@ -39,9 +39,9 @@
 #include <libpspp/hash.h>
 #include <libpspp/magic.h>
 #include <libpspp/message.h>
-#include <libpspp/message.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
+#include <libpspp/taint.h>
 #include <math/group-proc.h>
 #include <math/group.h>
 #include <math/levene.h>
@@ -141,7 +141,7 @@
   while (casegrouper_get_next_group (grouper, &group)) 
     run_oneway (&cmd, group, ds);
   ok = casegrouper_destroy (grouper);
-  proc_commit (ds);
+  ok = proc_commit (ds) && ok;
 
   free (vars);
   free_oneway (&cmd);
@@ -895,15 +895,19 @@
             struct casereader *input, 
             const struct dataset *ds)
 {
+  struct taint *taint;
   struct dictionary *dict = dataset_dict (ds);
   enum mv_class exclude;
   struct casereader *reader;
   struct ccase c;
 
-  casereader_peek (input, 0, &c);
+  if (!casereader_peek (input, 0, &c))
+    return;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 
+  taint = taint_clone (casereader_get_taint (input));
+
   global_group_hash = hsh_create(4, 
                                 (hsh_compare_func *) compare_values,
                                 (hsh_hash_func *) hash_value,
@@ -1000,8 +1004,9 @@
 
   ostensible_number_of_groups = hsh_count (global_group_hash);
 
-
+  if (!taint_has_tainted_successor (taint))
   output_oneway();
+  taint_destroy (taint);
 }
 
 

Index: language/stats/rank.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/rank.q,v
retrieving revision 1.27.2.1
retrieving revision 1.27.2.2
diff -u -b -r1.27.2.1 -r1.27.2.2
--- language/stats/rank.q       19 Mar 2007 21:36:24 -0000      1.27.2.1
+++ language/stats/rank.q       4 May 2007 03:48:48 -0000       1.27.2.2
@@ -33,6 +33,7 @@
 #include <language/command.h>
 #include <language/stats/sort-criteria.h>
 #include <libpspp/compiler.h>
+#include <libpspp/taint.h>
 #include <math/ordering.h>
 #include <math/sort.h>
 #include <output/table.h>
@@ -237,7 +238,7 @@
          const struct rank_spec *rank_specs, int n_rank_specs)
 {
   struct case_ordering *base_ordering;
-  bool result = true;
+  bool ok = true;
   int i;
   const int n_splits = dict_get_split_cnt (dataset_dict (ds));
 
@@ -271,20 +272,16 @@
         rank_sorted_file (group, output, dataset_dict (ds),
                           rank_specs, n_rank_specs,
                           i, src_vars[i]); 
-      casegrouper_destroy (grouper);
-      proc_commit (ds);
-
+      ok = casegrouper_destroy (grouper);
+      ok = proc_commit (ds) && ok;
       ranked_file = casewriter_make_reader (output);
-      if (ranked_file == NULL) 
-       {
-         result = false;
+      ok = proc_set_active_file_data (ds, ranked_file) && ok;
+      if (!ok)
           break;
        }
-      proc_set_active_file_data (ds, ranked_file);
-    }
   case_ordering_destroy (base_ordering);
 
-  return result; 
+  return ok; 
 }
 
 /* Hardly a rank function !! */
@@ -494,11 +491,15 @@
   tie_grouper = casegrouper_create_vars (pass2, &rank_var, 1);
   while (casegrouper_get_next_group (tie_grouper, &pass2_1)) 
     {
-      struct casereader *pass2_2 = casereader_clone (pass2_1);
+      struct casereader *pass2_2;
       double cc_1 = cc;
       double tw = 0.0;
       int i;
 
+      pass2_2 = casereader_clone (pass2_1);
+      taint_propagate (casereader_get_taint (pass2_2),
+                       casewriter_get_taint (output));
+
       /* Pass 2.1: Sum up weight for tied cases. */
       for (; casereader_read (pass2_1, &c); case_destroy (&c)) 
         tw += dict_get_case_weight (dict, &c, NULL);
@@ -521,8 +522,6 @@
       tie_group++;
     }
   casegrouper_destroy (tie_grouper);
-
-  /* FIXME: needs to detect errors. */
 }
 
 /* Transformation function to enumerate all the cases */
@@ -776,10 +775,10 @@
     /* FIXME: loses error conditions. */
     proc_discard_output (ds);
     sorted = sort_execute (proc_open (ds), ordering);
-    proc_commit (ds);
+    result = proc_commit (ds) && result;
 
     dict_delete_var (dataset_dict (ds), order);
-    proc_set_active_file_data (ds, sorted);
+    result = proc_set_active_file_data (ds, sorted) && result;
   }
 
   rank_cleanup();

Index: language/stats/regression.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/regression.q,v
retrieving revision 1.45.2.1
retrieving revision 1.45.2.2
diff -u -b -r1.45.2.1 -r1.45.2.2
--- language/stats/regression.q 19 Mar 2007 21:36:24 -0000      1.45.2.1
+++ language/stats/regression.q 4 May 2007 03:48:48 -0000       1.45.2.2
@@ -43,12 +43,14 @@
 #include <libpspp/alloc.h>
 #include <libpspp/compiler.h>
 #include <libpspp/message.h>
+#include <libpspp/taint.h>
 #include <math/design-matrix.h>
 #include <math/coefficient.h>
 #include <math/linreg/linreg.h>
 #include <output/table.h>
 
 #include "gettext.h"
+#define _(msgid) gettext (msgid)
 
 #define REG_LARGE_DATA 1000
 
@@ -112,12 +114,7 @@
  */
 static struct file_handle *model_file;
 
-/*
-  Return value for the procedure.
- */
-static int pspp_reg_rc = CMD_SUCCESS;
-
-static void run_regression (struct casereader *, struct cmd_regression *,
+static bool run_regression (struct casereader *, struct cmd_regression *,
                             struct dataset *);
 
 /* 
@@ -940,23 +937,24 @@
 {
   struct casegrouper *grouper;
   struct casereader *group;
+  bool ok;
 
   if (!parse_regression (lexer, ds, &cmd, NULL))
     return CMD_FAILURE;
 
   models = xnmalloc (cmd.n_dependent, sizeof *models);
 
-  /* Data pass.  FIXME: error handling. */
+  /* Data pass. */
   grouper = casegrouper_create_splits (proc_open (ds), dataset_dict (ds));
   while (casegrouper_get_next_group (grouper, &group))
-    run_regression (group, &cmd, ds);
-  casegrouper_destroy (grouper);
-  proc_commit (ds);
+    ok = run_regression (group, &cmd, ds) && ok;
+  ok = casegrouper_destroy (grouper);
+  ok = proc_commit (ds) && ok;
 
   subcommand_save (ds, cmd.sbc_save, models);
   free (v_variables);
   free (models);
-  return pspp_reg_rc;
+  return ok ? CMD_SUCCESS : CMD_FAILURE;
 }
 
 /*
@@ -1047,7 +1045,7 @@
   pspp_coeff_init (c->coeff + 1, dm);
 }
 
-static void
+static bool
 run_regression (struct casereader *input, struct cmd_regression *cmd,
                 struct dataset *ds)
 {
@@ -1063,7 +1061,8 @@
 
   assert (models != NULL);
 
-  casereader_peek (input, 0, &c);
+  if (!casereader_peek (input, 0, &c))
+    return true;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 
@@ -1077,9 +1076,8 @@
     {
       if (!var_is_numeric (cmd->v_dependent[i]))
        {
-         msg (SE, gettext ("Dependent variable must be numeric."));
-         pspp_reg_rc = CMD_FAILURE;
-         return;
+         msg (SE, _("Dependent variable must be numeric."));
+         return false;
        }
     }
 
@@ -1090,18 +1088,22 @@
 
   for (k = 0; k < cmd->n_dependent; k++)
     {
-      struct casereader *reader = casereader_clone (input);
-      struct variable *dep_var = cmd->v_dependent[k];
+      struct variable *dep_var;
+      struct casereader *reader;
       casenumber row;
       struct ccase c;
       size_t n_data;           /* Number of valid cases. */
       
+      dep_var = cmd->v_dependent[k];
       n_indep = identify_indep_vars (indep_vars, dep_var);
+
+      reader = casereader_clone (input);
       reader = casereader_create_filter_missing (reader, indep_vars, n_indep,
                                                  MV_ANY, NULL);
       reader = casereader_create_filter_missing (reader, &dep_var, 1,
                                                  MV_ANY, NULL);
-      n_data = prepare_categories (casereader_clone (reader), indep_vars, 
n_indep);
+      n_data = prepare_categories (casereader_clone (reader),
+                                   indep_vars, n_indep);
 
       Y = gsl_vector_alloc (n_data);
 
@@ -1129,18 +1131,16 @@
       reader = casereader_create_counter (reader, &row, -1);
       for (; casereader_read (reader, &c); case_destroy (&c))
        {
-          const union value *val;
           for (i = 0; i < n_indep; ++i)
             {
               struct variable *v = indep_vars[i];
-              val = case_data (&c, v);
+              const union value *val = case_data (&c, v);
               if (var_is_alpha (v))
                 design_matrix_set_categorical (X, row, v, val);
               else
                 design_matrix_set_numeric (X, row, v, val);
             }
-          val = case_data (&c, dep_var);
-          gsl_vector_set (Y, row, val->f);
+          gsl_vector_set (Y, row, case_num (&c, dep_var));
         }
       casereader_destroy (reader);
       /*
@@ -1154,8 +1154,11 @@
          Find the least-squares estimates and other statistics.
       */
       pspp_linreg ((const gsl_vector *) Y, X->m, &lopts, models[k]);
+      if (!taint_has_tainted_successor (casereader_get_taint (input)))
+        {
       subcommand_statistics (cmd->a_statistics, models[k]);
       subcommand_export (cmd->sbc_export, models[k]);
+        }
 
       gsl_vector_free (Y);
       design_matrix_destroy (X);
@@ -1163,6 +1166,8 @@
   free (indep_vars);
   free (lopts.get_indep_mean_std);
   casereader_destroy (input);
+
+  return true;
 }
 
 /*

Index: language/stats/sort-cases.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/sort-cases.c,v
retrieving revision 1.8.2.1
retrieving revision 1.8.2.2
diff -u -b -r1.8.2.1 -r1.8.2.2
--- language/stats/sort-cases.c 19 Mar 2007 21:36:24 -0000      1.8.2.1
+++ language/stats/sort-cases.c 4 May 2007 03:48:48 -0000       1.8.2.2
@@ -43,12 +43,12 @@
 cmd_sort_cases (struct lexer *lexer, struct dataset *ds)
 {
   struct case_ordering *ordering;
-  struct casereader *input;
   struct casereader *output;
-  bool success = false;
+  bool ok = false;
 
   lex_match (lexer, T_BY);
 
+  proc_cancel_temporary_transformations (ds);
   ordering = parse_case_ordering (lexer, dataset_dict (ds), NULL);
   if (ordering == NULL)
     return CMD_CASCADING_FAILURE;
@@ -69,25 +69,17 @@
       lex_get (lexer);
     }
 
-  /* FIXME: the case_ordering was created for dataset_dict(ds),
-     but proc_cancel_temporary_transformations(ds) causes the
-     size of cases to shrink... */
-  proc_cancel_temporary_transformations (ds);
   proc_discard_output (ds);
-  input = proc_open (ds);
-  output = sort_execute (input, ordering);
-  proc_commit (ds);
+  output = sort_execute (proc_open (ds), ordering);
   ordering = NULL;
-  if (output == NULL) 
-    goto done;
-  proc_set_active_file_data (ds, output);
-  success = true;
+  ok = proc_commit (ds);
+  ok = proc_set_active_file_data (ds, output) && ok;
 
  done:
   min_buffers = 64;
   max_buffers = INT_MAX;
   
   case_ordering_destroy (ordering);
-  return success ? lex_end_of_command (lexer) : CMD_CASCADING_FAILURE;
+  return ok ? lex_end_of_command (lexer) : CMD_CASCADING_FAILURE;
 }
 

Index: language/stats/t-test.q
===================================================================
RCS file: /cvsroot/pspp/pspp/src/language/stats/t-test.q,v
retrieving revision 1.20.2.1
retrieving revision 1.20.2.2
diff -u -b -r1.20.2.1 -r1.20.2.2
--- language/stats/t-test.q     19 Mar 2007 21:36:24 -0000      1.20.2.1
+++ language/stats/t-test.q     4 May 2007 03:48:48 -0000       1.20.2.2
@@ -31,7 +31,6 @@
 #include <data/procedure.h>
 #include <data/value-labels.h>
 #include <data/variable.h>
-
 #include <language/command.h>
 #include <language/dictionary/split-file.h>
 #include <language/lexer/lexer.h>
@@ -41,9 +40,9 @@
 #include <libpspp/hash.h>
 #include <libpspp/magic.h>
 #include <libpspp/message.h>
-#include <libpspp/message.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
+#include <libpspp/taint.h>
 #include <math/group-proc.h>
 #include <math/levene.h>
 #include <output/manager.h>
@@ -234,7 +233,7 @@
 static void group_postcalc (struct cmd_t_test *);
 
 
-static bool calculate(struct cmd_t_test *,
+static void calculate(struct cmd_t_test *,
                       struct casereader *,
                      const struct dataset *);
 
@@ -341,12 +340,12 @@
 
   bad_weight_warn = true;
 
-  /* Data pass.  FIXME: error handling. */
+  /* Data pass. */
   grouper = casegrouper_create_splits (proc_open (ds), dataset_dict (ds));
   while (casegrouper_get_next_group (grouper, &group)) 
     calculate (&cmd, group, ds);
   ok = casegrouper_destroy (grouper);
-  proc_commit (ds);
+  ok = proc_commit (ds) && ok;
 
   n_pairs=0;
   free(pairs);
@@ -1778,7 +1777,7 @@
 
 
 
-static bool
+static void
 calculate(struct cmd_t_test *cmd,
           struct casereader *input, const struct dataset *ds)
 {
@@ -1787,11 +1786,13 @@
   struct trbox test_results_box;
 
   struct casereader *pass1, *pass2, *pass3;
+  struct taint *taint;
   struct ccase c;
 
   enum mv_class exclude = cmd->miss != TTS_INCLUDE ? MV_ANY : MV_SYSTEM;
 
-  casereader_peek (input, 0, &c);
+  if (!casereader_peek (input, 0, &c))
+    return;
   output_split_file_values (ds, &c);
   case_destroy (&c);
 
@@ -1803,6 +1804,7 @@
 
   input = casereader_create_filter_weight (input, dict, NULL, NULL);
 
+  taint = taint_clone (casereader_get_taint (input));
   casereader_split (input, &pass1, &pass2);
                                
   common_precalc (cmd);
@@ -1839,19 +1841,19 @@
     }
   casereader_destroy (pass2);
  
-
+  if (!taint_has_tainted_successor (taint)) 
+    {
   ssbox_create(&stat_summary_box,cmd,mode);
   ssbox_populate(&stat_summary_box,cmd);
   ssbox_finalize(&stat_summary_box);
 
-  if ( mode == T_PAIRED) 
+      if ( mode == T_PAIRED ) 
     pscbox();
 
   trbox_create(&test_results_box,cmd,mode);
   trbox_populate(&test_results_box,cmd);
   trbox_finalize(&test_results_box);
-
-  return true;
+    }
 }
 
 short which_group(const struct group_statistics *g,

Index: libpspp/automake.mk
===================================================================
RCS file: /cvsroot/pspp/pspp/src/libpspp/automake.mk,v
retrieving revision 1.22.2.8
retrieving revision 1.22.2.9
diff -u -b -r1.22.2.8 -r1.22.2.9
--- libpspp/automake.mk 14 Apr 2007 05:04:23 -0000      1.22.2.8
+++ libpspp/automake.mk 4 May 2007 03:48:48 -0000       1.22.2.9
@@ -63,6 +63,8 @@
        src/libpspp/start-date.h \
        src/libpspp/str.c \
        src/libpspp/str.h \
+       src/libpspp/taint.c \
+       src/libpspp/taint.h \
        src/libpspp/tower.c \
        src/libpspp/tower.h \
        src/libpspp/verbose-msg.c \

Index: math/levene.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/math/levene.c,v
retrieving revision 1.9.2.1
retrieving revision 1.9.2.2
diff -u -b -r1.9.2.1 -r1.9.2.2
--- math/levene.c       19 Mar 2007 21:36:24 -0000      1.9.2.1
+++ math/levene.c       4 May 2007 03:48:48 -0000       1.9.2.2
@@ -74,20 +74,43 @@
 
   /* Filter for missing values */
   enum mv_class exclude;
+
+  /* An array of lz_stats for each variable */
+  struct lz_stats *lz;
+
+  /* The denominator for the expression for the Levene */
+  double *lz_denominator;
+
+};
+
+/* Per variable statistics */
+struct lz_stats
+{
+  /* Total of all lz */
+  double grand_total;
+
+  /* Mean of all lz */
+  double grand_mean;
+
+  /* The total number of cases */
+  double total_n ; 
+
+  /* Number of groups */
+  int n_groups;
 };
 
 /* First pass */
 static void  levene_precalc (const struct levene_info *l);
 static int levene_calc (const struct dictionary *dict, const struct ccase *, 
                        const struct levene_info *l);
-static void levene_postcalc (void *);
+static void levene_postcalc (struct levene_info *);
 
 
 /* Second pass */
 static void levene2_precalc (struct levene_info *l);
 static int levene2_calc (const struct dictionary *, const struct ccase *, 
                         struct levene_info *l);
-static void levene2_postcalc (void *);
+static void levene2_postcalc (struct levene_info *);
 
 
 void  
@@ -104,58 +127,32 @@
   l.v_indep    = v_indep;
   l.v_dep      = v_dep;
   l.exclude    = exclude;
+  l.lz         = xnmalloc (l.n_dep, sizeof *l.lz);
+  l.lz_denominator = xnmalloc (l.n_dep, sizeof *l.lz_denominator);
 
   casereader_split (reader, &pass1, &pass2);
 
   levene_precalc (&l);
-  for(;
-      casereader_read (pass1, &c) ;
-      case_destroy (&c)) 
-    {
+  for (; casereader_read (pass1, &c); case_destroy (&c)) 
       levene_calc (dict, &c, &l);
-    }
   casereader_destroy (pass1);
   levene_postcalc (&l);
 
   levene2_precalc(&l);
-  for(;
-      casereader_read (pass2, &c) ;
-      case_destroy (&c)) 
-    {
-      levene2_calc (dict, &c,&l);
-    }
+  for (; casereader_read (pass2, &c); case_destroy (&c)) 
+    levene2_calc (dict, &c, &l);
   casereader_destroy (pass2);
   levene2_postcalc (&l);
-}
-
-/* Internal variables used in calculating the Levene statistic */
-
-/* Per variable statistics */
-struct lz_stats
-{
-  /* Total of all lz */
-  double grand_total;
-
-  /* Mean of all lz */
-  double grand_mean;
-
-  /* The total number of cases */
-  double total_n ; 
-
-  /* Number of groups */
-  int n_groups;
-};
-
-/* An array of lz_stats for each variable */
-static struct lz_stats *lz;
 
+  free (l.lz_denominator);
+  free (l.lz);
+}
 
 static void 
 levene_precalc (const struct levene_info *l)
 {
   size_t i;
 
-  lz = xnmalloc (l->n_dep, sizeof *lz);
 
   for(i = 0; i < l->n_dep ; ++i ) 
     {
@@ -164,9 +161,9 @@
       struct group_statistics *gs;
       struct hsh_iterator hi;
 
-      lz[i].grand_total = 0;
-      lz[i].total_n = 0;
-      lz[i].n_groups = gp->n_groups ; 
+      l->lz[i].grand_total = 0;
+      l->lz[i].total_n = 0;
+      l->lz[i].n_groups = gp->n_groups ; 
 
       
       for ( gs = hsh_first(gp->group_hash, &hi);
@@ -208,8 +205,8 @@
       if ( !var_is_value_missing (var, v, l->exclude))
        {
          levene_z= fabs(v->f - gs->mean);
-         lz[i].grand_total += levene_z * weight;
-         lz[i].total_n += weight; 
+         l->lz[i].grand_total += levene_z * weight;
+         l->lz[i].total_n += weight; 
 
          gs->lz_total += levene_z * weight;
        }
@@ -219,16 +216,14 @@
 
 
 static void 
-levene_postcalc (void *_l)
+levene_postcalc (struct levene_info *l)
 {
   size_t v;
 
-  struct levene_info *l = (struct levene_info *) _l;
-
   for (v = 0; v < l->n_dep; ++v) 
     {
       /* This is Z_LL */
-      lz[v].grand_mean = lz[v].grand_total / lz[v].total_n ;
+      l->lz[v].grand_mean = l->lz[v].grand_total / l->lz[v].total_n ;
     }
 
   
@@ -236,15 +231,11 @@
 
 
 
-/* The denominator for the expression for the Levene */
-static double *lz_denominator = 0;
-
 static void 
 levene2_precalc (struct levene_info *l)
 {
   size_t v;
 
-  lz_denominator = xnmalloc (l->n_dep, sizeof *lz_denominator);
 
   /* This stuff could go in the first post calc . . . */
   for (v = 0; 
@@ -264,7 +255,7 @@
        {
          g->lz_mean = g->lz_total / g->n ;
        }
-      lz_denominator[v] = 0;
+      l->lz_denominator[v] = 0;
   }
 }
 
@@ -297,7 +288,7 @@
       if ( !var_is_value_missing (var, v, l->exclude))
        {
          levene_z = fabs(v->f - gs->mean); 
-         lz_denominator[i] += weight * pow2 (levene_z - gs->lz_mean);
+         l->lz_denominator[i] += weight * pow2 (levene_z - gs->lz_mean);
        }
     }
 
@@ -306,12 +297,10 @@
 
 
 static void 
-levene2_postcalc (void *_l)
+levene2_postcalc (struct levene_info *l)
 {
   size_t v;
 
-  struct levene_info *l = (struct levene_info *) _l;
-
   for (v = 0; v < l->n_dep; ++v) 
     {
       double lz_numerator = 0;
@@ -326,18 +315,14 @@
          g != 0 ;
          g = (struct group_statistics *) hsh_next(hash,&hi) )
        {
-         lz_numerator += g->n * pow2(g->lz_mean - lz[v].grand_mean );
+         lz_numerator += g->n * pow2(g->lz_mean - l->lz[v].grand_mean );
        }
       lz_numerator *= ( gp->ugs.n - gp->n_groups );
 
-      lz_denominator[v] *= (gp->n_groups - 1);
+      l->lz_denominator[v] *= (gp->n_groups - 1);
 
-      gp->levene = lz_numerator / lz_denominator[v] ;
+      gp->levene = lz_numerator / l->lz_denominator[v] ;
 
     }
-
-  /* Now clear up after ourselves */
-  free(lz_denominator);
-  free(lz);
 }
 

Index: math/merge.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/math/Attic/merge.c,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- math/merge.c        19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ math/merge.c        4 May 2007 03:48:48 -0000       1.1.2.2
@@ -28,6 +28,7 @@
 #include <data/casewriter.h>
 #include <libpspp/array.h>
 #include <libpspp/assertion.h>
+#include <libpspp/taint.h>
 #include <math/ordering.h>
 
 #include "xalloc.h"
@@ -45,7 +46,6 @@
     struct case_ordering *ordering;
     struct merge_input inputs[MAX_MERGE_ORDER];
     size_t input_cnt;
-    bool ok;
   };
 
 static void do_merge (struct merge *m);
@@ -56,33 +56,28 @@
   struct merge *m = xmalloc (sizeof *m);
   m->ordering = case_ordering_clone (ordering);
   m->input_cnt = 0;
-  m->ok = true;
   return m;
 }
 
-bool
+void
 merge_destroy (struct merge *m) 
 {
   if (m != NULL) 
     {
-      bool ok = m->ok;
       size_t i;
       
       case_ordering_destroy (m->ordering);
       for (i = 0; i < m->input_cnt; i++)
-        m->ok = casereader_destroy (m->inputs[i].reader) && m->ok;
+        casereader_destroy (m->inputs[i].reader);
       free (m);
-      return ok;
     }
-  else
-    return true;
 }
 
 void
 merge_append (struct merge *m, struct casereader *r) 
 {
-  assert (m->input_cnt < MAX_MERGE_ORDER);
-  m->inputs[m->input_cnt++].reader = casereader_rename (r);
+  r = casereader_rename (r);
+  m->inputs[m->input_cnt++].reader = r;
   if (m->input_cnt >= MAX_MERGE_ORDER)
     do_merge (m);
 }
@@ -106,8 +101,6 @@
       struct casewriter *writer = mem_writer_create (value_cnt);
       r = casewriter_make_reader (writer);
     }
-  else if (!m->ok)
-    r = NULL;
   else
     NOT_REACHED ();
 
@@ -139,11 +132,13 @@
   assert (m->input_cnt > 1);
 
   w = tmpfile_writer_create (case_ordering_get_value_cnt (m->ordering));
+  for (i = 0; i < m->input_cnt; i++) 
+    taint_propagate (casereader_get_taint (m->inputs[i].reader),
+                     casewriter_get_taint (w));
 
   for (i = 0; i < m->input_cnt; )
     if (read_input_case (m, i))
       i++;
-  
   while (m->input_cnt > 0) 
     {
       size_t min;

Index: math/merge.h
===================================================================
RCS file: /cvsroot/pspp/pspp/src/math/Attic/merge.h,v
retrieving revision 1.1.2.1
retrieving revision 1.1.2.2
diff -u -b -r1.1.2.1 -r1.1.2.2
--- math/merge.h        19 Mar 2007 21:36:24 -0000      1.1.2.1
+++ math/merge.h        4 May 2007 03:48:48 -0000       1.1.2.2
@@ -25,7 +25,7 @@
 struct casereader;
 
 struct merge *merge_create (const struct case_ordering *);
-bool merge_destroy (struct merge *);
+void merge_destroy (struct merge *);
 void merge_append (struct merge *, struct casereader *);
 struct casereader *merge_make_reader (struct merge *);
 

Index: math/sort.c
===================================================================
RCS file: /cvsroot/pspp/pspp/src/math/sort.c,v
retrieving revision 1.23.2.1
retrieving revision 1.23.2.2
diff -u -b -r1.23.2.1 -r1.23.2.2
--- math/sort.c 19 Mar 2007 21:36:24 -0000      1.23.2.1
+++ math/sort.c 4 May 2007 03:48:48 -0000       1.23.2.2
@@ -95,20 +95,17 @@
   pqueue_push (sort->pqueue, c, sort->run_id + (next_run ? 1 : 0));
 }
 
-static bool
+static void
 sort_casewriter_destroy (struct casewriter *writer UNUSED, void *sort_) 
 {
   struct sort_writer *sort = sort_;
-  bool ok;
   
   case_ordering_destroy (sort->ordering);
-  ok = merge_destroy (sort->merge);
+  merge_destroy (sort->merge);
   pqueue_destroy (sort->pqueue);
-  ok = casewriter_destroy (sort->run) && ok;
+  casewriter_destroy (sort->run);
   case_destroy (&sort->run_end);
   free (sort);
-
-  return ok;
 }
 
 static struct casereader *
@@ -168,7 +165,6 @@
   {
     sort_casewriter_write,
     sort_casewriter_destroy,
-    NULL,
     sort_casewriter_convert_to_reader,
   };
 
@@ -179,13 +175,8 @@
 struct casereader *
 sort_execute (struct casereader *input, struct case_ordering *ordering)
 {
-  struct casewriter *output;
-  struct ccase c;
-
-  output = sort_create_writer (ordering);
-  while (casereader_read (input, &c))
-    casewriter_write (output, &c);
-  casereader_destroy (input);
+  struct casewriter *output = sort_create_writer (ordering);
+  casereader_transfer (input, output);
   return casewriter_make_reader (output);
 }
 

Index: libpspp/taint.c
===================================================================
RCS file: libpspp/taint.c
diff -N libpspp/taint.c
--- /dev/null   1 Jan 1970 00:00:00 -0000
+++ libpspp/taint.c     4 May 2007 03:48:48 -0000       1.1.2.1
@@ -0,0 +1,306 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#include <config.h>
+
+#include <libpspp/taint.h>
+
+#include <stddef.h>
+
+#include <libpspp/array.h>
+#include <libpspp/assertion.h>
+
+#include "xalloc.h"
+
+/* This code maintains two invariants:
+
+   1. If a node is tainted, then all of its successors are
+      tainted.
+
+   2. If a node is tainted, then it and all of its predecessors are
+      successor-tainted. */
+
+/* A list of pointers to taint structures. */
+struct taint_list 
+  {
+    size_t cnt;
+    struct taint **taints;
+  };
+
+static void taint_list_init (struct taint_list *);
+static void taint_list_destroy (struct taint_list *);
+static void taint_list_add (struct taint_list *, struct taint *);
+static void taint_list_remove (struct taint_list *, const struct taint *);
+
+/* A taint. */
+struct taint 
+  {
+    size_t ref_cnt;                     /* Number of owners. */
+    struct taint_list successors;       /* Successors in graph. */
+    struct taint_list predecessors;     /* Predecessors in graph. */
+    bool tainted;                       /* Is this node tainted? */
+    bool tainted_successor;             /* Is/was any derived taint tainted? */
+  };
+
+static void recursively_set_taint (struct taint *);
+static void recursively_set_tainted_successor (struct taint *);
+
+/* Creates and returns a new taint object. */
+struct taint *
+taint_create (void) 
+{
+  struct taint *taint = xmalloc (sizeof *taint);
+  taint->ref_cnt = 1;
+  taint_list_init (&taint->successors);
+  taint_list_init (&taint->predecessors);
+  taint->tainted = false;
+  taint->tainted_successor = false;
+  return taint;
+}
+
+/* Returns a clone of the given TAINT.
+   The new and old taint objects are logically indistinguishable,
+   as if they were the same object.  (In this implementation,
+   they are in fact the same object, but this is not a guarantee
+   made by the interface.) */
+struct taint *
+taint_clone (const struct taint *taint_) 
+{
+  struct taint *taint = (struct taint *) taint_;
+
+  assert (taint->ref_cnt > 0);
+  taint->ref_cnt++;
+  return taint;
+}
+
+/* Destroys the given TAINT.
+   Returns false if TAINT was tainted, true otherwise.
+   Any propagation relationships through TAINT are preserved.
+   That is, if A taints B and B taints C, then destroying B will
+   preserve the transitive relationship, so that tainting A will
+   still taint C. */
+bool
+taint_destroy (struct taint *taint) 
+{
+  bool was_tainted = taint_is_tainted (taint);
+  if (--taint->ref_cnt == 0)
+    {
+      size_t i, j;
+
+      for (i = 0; i < taint->predecessors.cnt; i++)
+        for (j = 0; j < taint->successors.cnt; j++)
+          taint_propagate (taint->predecessors.taints[i],
+                           taint->successors.taints[j]);
+
+      for (i = 0; i < taint->predecessors.cnt; i++)
+        taint_list_remove (&taint->predecessors.taints[i]->successors, taint);
+      for (i = 0; i < taint->successors.cnt; i++)
+        taint_list_remove (&taint->successors.taints[i]->predecessors, taint);
+
+      taint_list_destroy (&taint->successors);
+      taint_list_destroy (&taint->predecessors);
+      free (taint);
+    }
+  return !was_tainted;
+}
+
+/* Adds a propagation relationship from FROM to TO.  This means
+   that, should FROM ever become tainted, then TO will
+   automatically be marked tainted as well.  This takes effect
+   immediately: if FROM is currently tainted, then TO will be
+   tainted after the call completes.
+
+   Taint propagation is transitive: if A propagates to B and B
+   propagates to C, then tainting A taints both B and C.  Taint
+   propagation is not commutative: propagation from A to B does
+   not imply propagation from B to A.  Taint propagation is
+   robust against loops, so that if A propagates to B and vice
+   versa, whether directly or indirectly, then tainting either A
+   or B will cause the other to be tainted, without producing an
+   infinite loop. */
+void
+taint_propagate (const struct taint *from_, const struct taint *to_) 
+{
+  struct taint *from = (struct taint *) from_;
+  struct taint *to = (struct taint *) to_;
+  
+  if (from != to) 
+    {
+      taint_list_add (&from->successors, to);
+      taint_list_add (&to->predecessors, from);
+      if (from->tainted && !to->tainted)
+        recursively_set_taint (to);
+      else if (to->tainted_successor && !from->tainted_successor) 
+        recursively_set_tainted_successor (from);
+    }
+}
+
+/* Returns true if TAINT is tainted, false otherwise. */
+bool
+taint_is_tainted (const struct taint *taint) 
+{
+  return taint->tainted;
+}
+
+/* Marks TAINT tainted and propagates the taint to all of its
+   successors. */
+void
+taint_set_taint (struct taint *taint) 
+{
+  if (!taint->tainted)
+    recursively_set_taint (taint);
+}
+
+/* Returns true if TAINT is successor-tainted, that is, if it or
+   any of its successors is or ever has been tainted.  (A
+   "successor" of a taint object X is any taint object that can
+   be reached by following propagation relationships starting
+   from X.) */
+bool
+taint_has_tainted_successor (const struct taint *taint) 
+{
+  return taint->tainted_successor;
+}
+
+/* Attempts to reset the successor-taint on TAINT.  This is
+   successful only if TAINT currently has no tainted successor. */
+void
+taint_reset_successor_taint (struct taint *taint) 
+{
+  if (taint->tainted_successor) 
+    {
+      size_t i;
+
+      for (i = 0; i < taint->successors.cnt; i++) 
+        if (taint->successors.taints[i]->tainted_successor)
+          return;
+
+      taint->tainted_successor = false;
+    }
+}
+
+/* Initializes LIST as an empty list of taints. */
+static void
+taint_list_init (struct taint_list *list) 
+{
+  list->cnt = 0;
+  list->taints = NULL;
+}
+
+/* Destroys LIST. */
+static void
+taint_list_destroy (struct taint_list *list) 
+{
+  free (list->taints);
+}
+
+/* Returns true if TAINT is in LIST, false otherwise. */
+static bool
+taint_list_contains (const struct taint_list *list, const struct taint *taint) 
+{
+  size_t i;
+
+  for (i = 0; i < list->cnt; i++) 
+    if (list->taints[i] == taint)
+      return true;
+
+  return false;
+}
+
+/* Returns true if X is zero or a power of 2, false otherwise. */
+static bool
+is_zero_or_power_of_2 (size_t x) 
+{
+  return (x & (x - 1)) == 0;
+}
+
+/* Adds TAINT to LIST, if it isn't already in the list. */
+static void
+taint_list_add (struct taint_list *list, struct taint *taint) 
+{
+  if (!taint_list_contains (list, taint)) 
+    {
+      /* To save a few bytes of memory per list, we don't store
+         the list capacity as a separate member.  Instead, the
+         list capacity is always zero or a power of 2.  Thus, if
+         the list count is one of these threshold values, we need
+         to allocate more memory. */
+      if (is_zero_or_power_of_2 (list->cnt)) 
+        list->taints = xnrealloc (list->taints,
+                                  list->cnt == 0 ? 1 : 2 * list->cnt,
+                                  sizeof *list->taints);
+      list->taints[list->cnt++] = taint;
+    }
+}
+
+/* Removes TAINT from LIST (which must contain it). */
+static void
+taint_list_remove (struct taint_list *list, const struct taint *taint) 
+{
+  size_t i;
+
+  for (i = 0; i < list->cnt; i++) 
+    if (list->taints[i] == taint)
+      {
+        remove_element (list->taints, list->cnt, sizeof *list->taints, i);
+        list->cnt--;
+        return;
+      }
+
+  NOT_REACHED ();
+}
+
+/* Marks TAINT as tainted, as well as all of its successors
+   recursively.  Also marks TAINT's predecessors as
+   successor-tainted, recursively. */
+static void
+recursively_set_taint (struct taint *taint) 
+{
+  size_t i;
+  
+  taint->tainted = taint->tainted_successor = true;
+   for (i = 0; i < taint->successors.cnt; i++) 
+    {
+      struct taint *s = taint->successors.taints[i];
+      if (!s->tainted)
+        recursively_set_taint (s);
+    }
+  for (i = 0; i < taint->predecessors.cnt; i++)
+    {
+      struct taint *p = taint->predecessors.taints[i];
+      if (!p->tainted_successor)
+        recursively_set_tainted_successor (p);
+    }
+}
+
+/* Marks TAINT as successor-tainted, as well as all of its
+   predecessors recursively. */
+static void
+recursively_set_tainted_successor (struct taint *taint) 
+{
+  size_t i;
+  
+  taint->tainted_successor = true;
+  for (i = 0; i < taint->predecessors.cnt; i++)
+    {
+      struct taint *p = taint->predecessors.taints[i];
+      if (!p->tainted_successor)
+        recursively_set_tainted_successor (p);
+    }
+}
+

Index: libpspp/taint.h
===================================================================
RCS file: libpspp/taint.h
diff -N libpspp/taint.h
--- /dev/null   1 Jan 1970 00:00:00 -0000
+++ libpspp/taint.h     4 May 2007 03:48:48 -0000       1.1.2.1
@@ -0,0 +1,132 @@
+/* PSPP - computes sample statistics.
+   Copyright (C) 2007 Free Software Foundation, Inc.
+
+   This program is free software; you can redistribute it and/or
+   modify it under the terms of the GNU General Public License as
+   published by the Free Software Foundation; either version 2 of the
+   License, or (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful, but
+   WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
+
+#ifndef LIBPSPP_TAINT_H
+#define LIBPSPP_TAINT_H 1
+
+/* Tainting and taint propagation.
+
+   Properly handling I/O errors and other hard errors in data
+   handling is important.  At a minimum, we must notify the user
+   that an error occurred and refrain from presenting possibly
+   corrupted output.  It is unacceptable, however, to simply
+   terminate PSPP when an I/O error occurs, because of the
+   unfriendliness of that approach, especially in a GUI
+   environment.  We should also propagate the error to the top
+   level of command execution; that is, ensure that the command
+   procedure returns CMD_CASCADING_FAILURE to its caller.
+
+   Usually in C we propagate errors via return values, or by
+   maintaining an error state on an object (e.g. the error state
+   that the ferror function tests on C streams).  But neither
+   approach is ideal for PSPP.  Using return values requires the
+   programmer to pay more attention to error handling than one
+   would like, especially given how difficult it can be to test
+   error paths.  Maintaining error states on important PSPP
+   objects (e.g. casereaders, casewriters) is a step up, but it
+   still requires more attention than one would like, because
+   quite often there are many such objects in use at any given
+   time, and an I/O error encountered by any of them indicates
+   that the final result of any computation that depends on that
+   object is incorrect.
+
+   The solution implemented here is an attempt to automate as
+   much as possible of PSPP's error-detection problem.  It is
+   based on use of "taint" objects, created with taint_create or
+   taint_clone.  Each taint object represents a state of
+   correctness or corruption (taint) in an associated object
+   whose correctness must be established.  The taint_set_taint
+   function is used to mark a taint object as tainted.  The taint
+   status of a taint object can be queried with taint_is_tainted.
+
+   The benefit of taint objects lies in the ability to connect
+   them together in propagation relationships, using
+   taint_propagate.  The existence of a propagation relationship
+   from taint object A to taint object B means that, should
+   object A ever become tainted, then object B will automatically
+   be marked tainted as well.  This models the situation where
+   the data represented by B are derived from data obtained from
+   A.  This is a common situation in PSPP; for example, the data
+   in one casereader or casewriter are often derived from data in
+   another casereader or casewriter.
+
+   Taint propagation is transitive: if A propagates to B and B
+   propagates to C, then tainting A taints both B and C.  Taint
+   propagation is not commutative: propagation from A to B does
+   not imply propagation from B to A.  However, taint propagation
+   is robust against loops, so that if A propagates to B and vice
+   versa, whether directly or indirectly, then tainting either A
+   or B will cause the other to be tainted, without producing an
+   infinite loop.  
+
+   The implementation is robust against destruction of taints in
+   propagation relationships.  When this happens, taint
+   propagation through the destroyed taint object is preserved,
+   that is, if A taints B and B taints C, then destroying B will
+   preserve the transitive relationship, so that tainting A will
+   still taint C.
+
+   Taint objects actually propagate two different types of taints
+   across the taint graph.  The first type of taint is the one
+   already described, which indicates that an associated object
+   has corrupted state.  The second type of taint, called a
+   "successor-taint" does not necessarily indicate that the
+   associated object is corrupted.  Rather, it indicates some
+   successor of the associated object is corrupted, or was
+   corrupted some time in the past before it was destroyed.  (A
+   "successor" of a taint object X is any taint object that can
+   be reached by following propagation relationships starting
+   from X.)  Stated another way, when a taint object is marked
+   tainted, all the taint objects that are reachable by following
+   propagation relationships *backward* are marked with a
+   successor-taint.  In addition, any object that is marked
+   tainted is also marked successor-tainted.
+
+   The value of a successor-taint is in summarizing the history
+   of the taint objects derived from a common parent.  For
+   example, consider a casereader that represents the active
+   file.  A statistical procedure can clone this casereader any
+   number of times and pass it to analysis functions, which may
+   themselves in turn clone it themselves, pass it to sort or
+   merge functions, etc.  Conventionally, all of these functions
+   would have to carefully check for I/O errors and propagate
+   them upward, which is error-prone and inconvenient.  However,
+   given the successor-taint feature, the statistical procedure
+   may simply check the successor-taint on the top-level
+   casereader after calling the analysis functions and, if a
+   successor-taint is present, skip displaying the procedure's
+   output.  Thus, error checking is centralized, simplified, and
+   made convenient.  This feature is now used in a number of the
+   PSPP statistical procedures; search the source tree for
+   "taint_has_tainted_successor" for details. */
+
+#include <stdbool.h>
+
+struct taint *taint_create (void);
+struct taint *taint_clone (const struct taint *);
+bool taint_destroy (struct taint *);
+
+void taint_propagate (const struct taint *from, const struct taint *to);
+
+bool taint_is_tainted (const struct taint *);
+void taint_set_taint (struct taint *);
+
+bool taint_has_tainted_successor (const struct taint *);
+void taint_reset_successor_taint (struct taint *);
+
+#endif /* libpspp/taint.h */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]