@App:name('TestExecutionPlan')
define stream FooStream (symbol string, price float, volume long);
@sink(type='log')
@source(type='rabbitmq', uri = 'amqp://guest:guest@localhost:5672', queue.name='hello', routing_key='hello',
exchange.name = 'direct',
exchange.autodelete.enabled = 'true',
@map(type='json'))
Define stream BarStream1 (symbol string, price float, volume long);
@info(name = 'query1')
@sink(type ='rabbitmq', uri = 'amqp://guest:guest@localhost:5672', queue.name='hello', routing_key='hello',
exchange.name = 'direct', @map(type='json'))
define stream BarStream (symbol string, price float, volume long);
from FooStream
select symbol, price, volume
insert into BarStream;