现在Streaming的可用度如何了?

60 views
Skip to first unread message

D.Y Feng

unread,
Feb 7, 2014, 1:55:25 AM2/7/14
to dpark...@googlegroups.com
现在Streaming的可用度如何了?貌似api跟spark还是有些差距,而且没有example出来。

Windreamer

unread,
Feb 7, 2014, 2:42:18 AM2/7/14
to dpark-users
streaming 的使用可以参考test case 也欢迎pr

---
Windreamer
On 2014年2月7日 GMT+8下午2:55:25, D.Y Feng wrote:
现在Streaming的可用度如何了?貌似api跟spark还是有些差距,而且没有example出来。

--
You received this message because you are subscribed to the Google Groups "DPark Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to dpark-users...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

D.Y Feng

unread,
Feb 7, 2014, 2:56:40 AM2/7/14
to dpark...@googlegroups.com
test case,始终只是test case。例如程序最后spark用的是ssc.awaitTermination(),但是dpark貌似没有类似的东西。当然你弄个while 1:sleep(1)也没事,但我需要的是一个“最佳实践”。

例如我举个场景,我要实时统计ip数,并写入数据库,虽然问题很简单,但是用Streaming应该怎么入手?

谢谢

Windreamer

unread,
Feb 7, 2014, 3:11:17 AM2/7/14
to dpark-users
限于我们的精力和使用场景,streaming并没有很成熟的应用,目前依旧是原型状态

目前streaming只支持从文件读入并输出到文件,你可以用其他程序同步到数据库或自己实现到数据库的输出部分
---
Windreamer

D.Y Feng

unread,
Feb 7, 2014, 3:27:01 AM2/7/14
to dpark...@googlegroups.com
“目前streaming只支持从文件读入并输出到文件” 也没有这么弱吧,自己实现compute继承InputDStream应该也没问题吧。不过貌似就必须要有OutputStream,程序是OutputStream开始向上回溯的。

个人的想法是FileInputDStream读取文件,中间处理,用ForEachDStream做OutputStream汇总每次的数量,再update到数据库。

代码没有深入看过,只是跑着test case,跟了一遍,所以对流程的理解可能有误。

个人感觉,如果少了streaming,那流计算spark就最多只能算“计算”...



2014-02-07 Windreamer <windr...@gmail.com>:

--
You received this message because you are subscribed to a topic in the Google Groups "DPark Users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/dpark-users/v2R-hKYB22o/unsubscribe.
To unsubscribe from this group and all its topics, send an email to dpark-users...@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.



--
                                                       

DY.Feng(叶毅锋)
yyfeng88625@twitter
Department of Applied Mathematics
Guangzhou University,China
dyf...@stu.gzhu.edu.cn
                                                       

Windreamer

unread,
Feb 7, 2014, 3:33:51 AM2/7/14
to dpark-users
这个流程大致没有问题,通过自己实现dstream的确可以做到。不过实现的时候要注意,output dstream需要支持重算,甚至并发的重算,这对于写入数据库的dstream来说有点麻烦。所以目前dpark的输出结果往往是文件的原因

---
Windreamer

D.Y Feng

unread,
Feb 7, 2014, 5:05:22 AM2/7/14
to dpark...@googlegroups.com
重算这个指的是什么?貌似没见到有重算的逻辑,难道是因为我用的是 StreamingContext(1, "local")。

而且如果重算的话,无论输出文件还是输出数据库,还不是都有重复。




2014-02-07 Windreamer <windr...@gmail.com>:

Windreamer

unread,
Feb 7, 2014, 5:12:59 AM2/7/14
to dpark-users
在spark/dpark的计算模型里面,如果一个任务计算时间过长/计算失败会另外启动一个任务计算。

对于文件,因为每个任务都是先写临时文件再用rename替换目标文件,这样可以保证重算的正确性

---
Windreamer

D.Y Feng

unread,
Feb 10, 2014, 3:11:04 AM2/10/14
to dpark...@googlegroups.com
Streaming支不支持多分流的功能,例如我有一批日志S0,先经过一次计算P0,然后再分成两个流S1 S2,经过两个不同的计算(P1 P2)后汇总成两条流。但貌似dpark只是把从P0到P1 P2的计算过程记录,然后两条流处理的时候都会重新计算一次P0。能不能在P0计算完后,保存下结果,再供给S1 S2,而不是在重复计算。貌似跟此有关的是checkpoint,但不知道应该怎么做。

Davies Liu

unread,
Feb 10, 2014, 12:54:47 PM2/10/14
to dpark...@googlegroups.com
2014-02-10 0:11 GMT-08:00 D.Y Feng <yyfen...@gmail.com>:
> Streaming支不支持多分流的功能,例如我有一批日志S0,先经过一次计算P0,然后再分成两个流S1 S2,经过两个不同的计算(P1
> P2)后汇总成两条流。但貌似dpark只是把从P0到P1
> P2的计算过程记录,然后两条流处理的时候都会重新计算一次P0。能不能在P0计算完后,保存下结果,再供给S1
> S2,而不是在重复计算。貌似跟此有关的是checkpoint,但不知道应该怎么做。

把P0保存到磁盘,P1 和 P2 分别从磁盘读取,Checkpoint 差不多也是这么做的
- Davies

D.Y Feng

unread,
Feb 12, 2014, 2:31:51 AM2/12/14
to dpark...@googlegroups.com
我现在的做法是在分流之前汇总一下
new=old_stream.transform(lambda rdd,t:ssc.sc.parallelize(rdd.collect()))

如果old_stream是小数据,那还行,但是如果来个一次性几G日志分析,那collect就不行了。

个人觉得如果有分流的情况,解决方法有两种:

一种是从rdd层面解决,让他支持分支逻辑,这种复杂度太大了,数据共享在分布式系统一向都不好解决。

另一种就是从Streaming的层面解决,汇总一下再继续,因为Streaming的理念就是每次来点小RDD,一次次来计算。问题是如果汇总的数据过于庞大,那就后继乏力了。(一般不会出现这种情况,除非像我那样来个一次性分析)
又或者我们更改Streaming的api,让他compute出来的不再只是一个RDD,而是[rdd,rdd],把generateJob改成generateJobs。这时候我们就可以在汇总的地方,把那个汇总的大RDD给分割成N个小RDD。

spark的streaming我只看了api,在这个问题上他是怎么应对的我也不大了解,个人对java比较不感冒。


不知道dpark的目标是什么?是做一个python版的spark,还是一个借鉴rdd理念的python流计算系统。

田忠博

unread,
Feb 12, 2014, 3:21:31 AM2/12/14
to dpark-users
现在checkpoint仍然是todo状态
你可以试试
new = old.map(lambda rdd:rdd.snapshot())

这样在rdd计算时会把中间结果保存在磁盘,下次使用这个rdd的时候会把中间结果读出来,而不用再次计算

Reply all
Reply to author
Forward
0 new messages