/* definitions */ table S(index1 int, index2 int, cnt int) memory; table itemsIds(ids int) memory; table tupleCnt(cnt int) memory; table hs(h int, ah int, bh int) memory; table mPMValues(m int, P int, M int) memory; stream items(item int) source 'port4435'; /* initalization of tables, at system reboot */ insert into S values(1, 1, 0); insert into S values(1, 2, 0); insert into S values(1, 3, 0); insert into S values(1, 4, 0); insert into S values(1, 5, 0); insert into S values(2, 1, 0); insert into S values(2, 2, 0); insert into S values(2, 3, 0); insert into S values(2, 4, 0); insert into S values(2, 5, 0); insert into S values(3, 1, 0); insert into S values(3, 2, 0); insert into S values(3, 3, 0); insert into S values(3, 4, 0); insert into S values(3, 5, 0); insert into S values(0, 1, 0); insert into S values(0, 2, 0); insert into S values(0, 3, 0); insert into S values(0, 4, 0); insert into S values(0, 5, 0); insert into itemsIds values (0); insert into itemsIds values (1); insert into itemsIds values (2); insert into itemsIds values (3); insert into itemsIds values (4); insert into itemsIds values (5); insert into itemsIds values (6); insert into itemsIds values (7); insert into itemsIds values (8); insert into itemsIds values (9); insert into itemsIds values (10); insert into itemsIds values (11); insert into itemsIds values (12); insert into itemsIds values (13); insert into itemsIds values (14); insert into itemsIds values (15); insert into hs values(1, 7, 13); insert into hs values(2, 22, 6); insert into hs values(3, 24, 11); insert into hs values(4, 14, 27); insert into hs values(5, 17, 31); insert into mPMValues values(4, 31, 16); insert into tupleCnt values(0);
/* delta comp with expire aggregate */
window aggregate hcount(k int):int {
aggregate updateCnt(k int, h int, ah int, bh int, val int):int{
initialize:iterate: {
update S set cnt = cnt+val
where index1 = ((ah*k+bh)%(select P from mPMValues))%(select m from mPMValues)
and index2 = h;
}
};
initialize:iterate: {
select updateCnt(k, h, ah, bh, 1)
from hs;
}
expire: {
select updateCnt(k, h, ah, bh, -1)
from hs;
}
};
aggregate getWinFreqs(item int, win int):(index int, item int, minC int) {
table tupleCnt(cnt int);
table tIndex(index int);
aggregate getItemFreqs(item int, M int, index int):(item int, minC int, index int) {
table minCTable(minC int);
aggregate getMinC(item1 int, h int, ah int, bh int):int {
table minTable(val int);
initialize: {
insert into minTable select cnt from S where index1 = ((ah*item1+bh)%(select P from mPMValues))%(select m from mPMValues)
and index2 = h;
}
iterate: {
insert into minTable select cnt from S where index1 = (((ah*item1+bh)%(select P from mPMValues))%(select m from mPMValues)) and index2 = h;
}
terminate: {
insert into return select min(val) from minTable;
}
};
initialize:iterate: {
delete from minCTable where 1=1;
insert into minCTable select getMinC(item, h, ah, bh) from hs;
insert into return select index, item, minC from minCTable;
}
};
initialize: {
insert into tupleCnt values(1);
insert into tIndex values(1);
}
iterate: {
update tupleCnt set cnt = cnt+1;
insert into return
select getItemFreqs(ids, M, index) from tIndex, itemsIds,mPMValues
where (select cnt from tupleCnt)%win = 0;
update tIndex set index = index+1 where (select cnt from tupleCnt)%win = 0;
}
};
select hcount(item) over (rows 999 preceding) from items; select getWinFreqs(item, 100) from items;
"Dynamically Maintaining Frequent Items over a Data Stream" by Cheqing Jin et al