We should estimate the count to within a factor of user defined error bound. We can create an exponential histogram:
If a new element is 0, ignore it. If a new element is 1, create a tuple of value 1. If there are k/2+2 neighboring tuples with same value, merge the oldest two tuples and double its value (k depends on the error bound given by the user: k = 1/e). The merge can be cascaded from youngest tuples to oldest tuples. Stream:
Stream s(v Int, t Timestamp);
We implemented 2 versions: one with window constructs and the other without window.
Aggregate basicCount(next Int, t Timestamp, k Int):{
Table hist(h Int, t timestamp) Memory;
Aggregate merge(next Int, t Timestamp, k Int):{
/* state table stores the current histogram, count, and the last two timestamps */
Table state(total Int, h Int, cnt Int, t1 timestamp, t2 timestamp) Memory;
Initialize:{
Insert Into state values(next, next, 1, t, NULL);
}
Iterate:{
Update state Set total = total + next;
Update state Set cnt = cnt + 1, t2 = t1, t1 = t Where h = next; /* should early return if true */
Update state Set h = next, cnt = 1, t1 = t Where h <> next AND (select cnt from state) < k/2 +2; /* should early return if true */
/* if current count is k/2+2, delete the last histogram, and double the next-to-last one */
Delete from hist h Where h.t = (select t1 from state)
AND h <> next
AND (select cnt from state) = k/2 +2;
Update hist h Set h = h * 2 Where SQLCODE = 0 AND h.t = (select t2 from state);
Update state Set h = next, cnt = 2, t1 = t Where SQLCODE = 0;
}
Terminate:{
insert into return select total - next/2 from state;
}
}
Initialize:
Iterate:{
Insert Into hist Values(next, t) where next >0; /* ignore 0 */
Delete from hist h where h.t < t - T;
Insert Into Return
Select merge(h, t, k) Over (Order By t Desc) from hist;
}
}
Select basicCount(v, t, 2)
From s;
stream s(v Int, t Timestamp);
Aggregate basicCount(next Int, t Timestamp, k Int):{
Window hist(h Int, t timestamp) Order By t;
Table memo(last Int, total Int) Memory Values (0,0);
Aggregate merge(next Int, t Timestamp, k Int):{
/* state table stores the current histogram, count, and the last two timestamps */
Table state(h Int, cnt Int, t1 timestamp, t2 timestamp) Memory;
Initialize:{
Insert Into state values(next, 1, t, NULL);
}
Iterate:{
Update state Set cnt = cnt + 1, t2 = t1, t1 = t Where h = next; /* should early return if true */
Update state Set h = next, cnt = 1, t1 = t Where h <> next AND (select cnt from state) < k/2 +2; /* should early return if true */
/* if current count is k/2+2, delete the last histogram, and double the next-to-last one */
Delete from hist h Where h.t = (select t1 from state)
AND h <> next
AND (select cnt from state) = k/2 +2;
Update hist h Set h = h * 2 Where SQLCODE = 0 AND h.t = (select t2 from state);
Update state Set h = next, cnt = 2, t1 = t Where SQLCODE = 0;
}
}
Initialize:
Iterate:{
Insert Into hist Values(next, t) where next >0; /* ignore 0 */
Update memo Set total = total + next;
Select merge(h, t, k) Over (Order By t Desc) from hist;
/* Update last pointer due to merge */
Update memo Set last = (select max(h) from hist);
Insert Into Return Select total From memo;
}
Expire:{
/* update last pointer: since it's hard to reference next-to-last histogram,
we use an indirect way. i.e. unchanged if the last two histrograms are same,
otherwise set to half.
*/
Update memo Set last = h/2 Where (select count(1) from hist h where h.h = last) =1;
/* update total pointer */
Update memo Set total = total - h/2 - last/2;
}
}
Select basicCount(v, t, 2)
Over (Range 10 Minute)
From s;
"Maintaining Stream Statistics over Sliding Windows" by Mayur Datar et al