stream person(person_id int, name char(20), email char(20), credit_card int, city char(20), state char(2), reg_time timestamp) source 'person.so' order by reg_time; stream bid(auction_id int, price int, bidder_id int, bid_time timestamp) source 'bid.so' order by bid_time; stream auction(auction_id int, item_name char(10), seller_id int, initial_price int, category_id int, expire_date timestamp, input_time timestamp) source 'auction.so' order by input_time;
Stream Mill implementation:
insert into stdout select auction_id, 0.8239*price, bidder_id, bid_time from bid;
Stream Mill implementation:
insert into stdout select auction_id, price from bid where auction_id = 1 OR auction_id = 2 OR auction_id = 3;
Stream Mill: The query is divided into two parts, the first part does a union of streams auction and person (such that we can avoid the blocking join operation, and no windowing is needed.) The second part uses an user defined aggregate on the unioned streams to filter out the correct tuples.
insert into query3
select 0, auction_id, seller_id, initial_price, -1, '', '', '', input_time
from auction
union
select 1, -1, -1, -1, person_id, name, city, state, reg_time
from person;
insert into stdout
select local_items(tuple_type, auction_id, seller_id, person_id, name, city, state)
from query3
where tuple_type = 0
or (tuple_type = 1 and state = 'KS' or state= 'NY' or state = 'SD');
The definition for unioned stream "query3"
stream query3(tuple_type int, auction_id int, seller_id int, initial_price int, person_id int, name char(20), city char(20), state char(2), in_time timestamp);
The definition for user-defined aggregate "local_items"
AGGREGATE local_items(tuple_type int, auction_id int, seller_id int, person_id int,
name char(20), city char(20), state char(2)): (name char(20), city char(20), state char(2), auction_id int)
{
TABLE persons(pid int, pname char(20), pcity char(20), pstate char(2)) MEMORY;
INITIALIZE :
ITERATE :
{
insert into persons
values(person_id, name, city, state) where tuple_type = 1;
insert into return
select pname, pcity, pstate, auction_id
from persons where tuple_type = 0 and seller_id = pid;
}
};
insert into query4 select 1, -1, -1, -1, -1, bid_time, auction_id, price, bid_time from bid union select 0, auction_id, initial_price, category_id, seller_id, expire_date, -1, -1, input_time from auction; insert into stdout select cat_avg(tuple_type, auction_id, initial_price, category_id, expire_date, b_auc_id, bid_price, in_time) from query4;
The definition for unioned stream "query4"
stream query4(tuple_type int, auction_id int, initial_price int, category_id int, seller_id int, expire_date timestamp, b_auc_id int, bid_price int, in_time timestamp);
The definition for user-defined aggregate "cat_avg"
AGGREGATE cat_avg(tuple_type int, auction_id int, initial_price int, category_id int,
expire_date timestamp, b_auc_id int, bid_price int, in_time timestamp): (cat_id int, avg_price int)
{
TABLE open_auctions(aid int, price int, cid int, expire timestamp) MEMORY;
TABLE cat_avg_tmp(category int, cur_total int, cur_count int) MEMORY;
INITIALIZE :
ITERATE :
{
insert into open_auctions
values(auction_id, initial_price, category_id, expire_date)
where tuple_type = 0;
insert into cat_avg_tmp
values(category_id, 0, 0)
where tuple_type = 0
and not exists (select category from cat_avg_tmp where category = category_id);
update cat_avg_tmp
set cur_total = cur_total +
(select sum(o.price) from open_auctions o where cat_avg_tmp.category = o.cid),
cur_count = cur_count +
(select count(o2.price) from open_auctions o2 where cat_avg_tmp.category = o2.cid)
where exists
(select * from open_auctions where expire < in_time and cid = cat_avg_tmp.category);
insert into return
select category, cur_total/cur_count
from cat_avg_tmp where cur_count > 0
and exists (select * from open_auctions where expire < in_time);
delete from open_auctions where expire < in_time;
update open_auctions
set price = bid_price
where tuple_type = 1 and expire >= in_time and price < bid_price and aid = b_auc_id;
}
};