Stream Mill Example -- BasicCount


Problem Statement

We have a stream where each element is either 0 or 1 (a stream of bits). We place a window of size N on the stream. How do we estimate the count of 1s in the stream without storing the window (the value of the bit that is expiring is unknown).

We should estimate the count to within a factor of user defined error bound. We can create an exponential histogram:

If a new element is 0, ignore it. If a new element is 1, create a tuple of value 1. If there are k/2+2 neighboring tuples with same value, merge the oldest two tuples and double its value (k depends on the error bound given by the user: k = 1/e). The merge can be cascaded from youngest tuples to oldest tuples. Stream:

Stream s(v Int, t Timestamp);

Query:

We implemented 2 versions: one with window constructs and the other without window.

  1. Without window constructs:
  2. Aggregate basicCount(next Int, t Timestamp, k Int):{
      Table hist(h Int, t timestamp) Memory;
      Aggregate merge(next Int, t Timestamp, k Int):{  
        /* state table stores the current histogram, count, and the last two timestamps */
        Table state(total Int, h Int, cnt Int, t1 timestamp, t2 timestamp) Memory;  
        Initialize:{
          Insert Into state values(next, next, 1, t, NULL);
        }
        Iterate:{
          Update state Set total = total + next;
          Update state Set cnt = cnt + 1, t2 = t1, t1 = t Where h = next;  /* should early return if true */
          Update state Set h = next, cnt = 1, t1 = t Where h <> next AND (select cnt from state) < k/2 +2; /* should early return if true */
    
          /* if current count is  k/2+2,  delete the last histogram, and double the next-to-last one */
          Delete from hist h Where h.t = (select t1 from state) 
                                    AND h <> next 
                                    AND (select cnt from state) = k/2 +2;
          Update hist h Set h = h * 2 Where SQLCODE = 0 AND h.t = (select t2 from state);
          Update state Set h = next, cnt = 2, t1 = t Where SQLCODE = 0;
        }
        Terminate:{
          insert into return select total - next/2 from state;
        }
        
      }
      Initialize:
      Iterate:{
        Insert Into hist Values(next, t) where next >0;    /* ignore 0 */
        Delete from hist h where h.t < t - T;
        Insert Into Return 
        Select merge(h, t, k) Over (Order By t Desc) from hist;
      }
    }
    Select basicCount(v, t, 2) 
    From s;
    
    
  3. With window constructs:
  4. stream s(v Int, t Timestamp);
    Aggregate basicCount(next Int, t Timestamp, k Int):{
      Window hist(h Int, t timestamp) Order By t;
      Table memo(last Int, total Int) Memory Values (0,0);
      Aggregate merge(next Int, t Timestamp, k Int):{  
        /* state table stores the current histogram, count, and the last two timestamps */
        Table state(h Int, cnt Int, t1 timestamp, t2 timestamp) Memory;  
        Initialize:{
          Insert Into state values(next, 1, t, NULL);
        }
        Iterate:{
          Update state Set cnt = cnt + 1, t2 = t1, t1 = t Where h = next;  /* should early return if true */
          Update state Set h = next, cnt = 1, t1 = t Where h <> next AND (select cnt from state) < k/2 +2; /* should early return if true */
    
          /* if current count is  k/2+2,  delete the last histogram, and double the next-to-last one */
          Delete from hist h Where h.t = (select t1 from state) 
                                    AND h <> next 
                                    AND (select cnt from state) = k/2 +2;
          Update hist h Set h = h * 2 Where SQLCODE = 0 AND h.t = (select t2 from state);
          Update state Set h = next, cnt = 2, t1 = t Where SQLCODE = 0;
        }
        
      }
      Initialize:
      Iterate:{
        Insert Into hist Values(next, t) where next >0;    /* ignore 0 */
        Update memo Set total = total + next;
        Select merge(h, t, k) Over (Order By t Desc) from hist;
        /* Update last pointer due to merge */
        Update memo Set last = (select max(h) from hist);
        Insert Into Return Select total From memo;
      }
      Expire:{
        /* update last pointer: since it's hard to reference next-to-last histogram,
            we use an indirect way. i.e. unchanged  if the last two histrograms are same,
            otherwise set to half.
        */
        Update memo Set last = h/2 Where (select count(1) from hist h where h.h = last) =1;
    
        /* update total pointer */
        Update memo Set total = total - h/2 - last/2;
      }
    }
    Select basicCount(v, t, 2) 
    Over (Range 10 Minute)
    From s;
    
    
    

Reference

"Maintaining Stream Statistics over Sliding Windows" by Mayur Datar et al


Copyright © 2001-2002 UCLA Web Information System Laboratory. All Rights Reserved.
Maintained by Chang Luo.