如今IT行业最热门的流行语之一是“大数据”,但“大”这个词有点用词不当:大数据不仅关乎体量,还关乎速度和多样性:4
如果我们沿着体积、速度和多样性这三个维度画出大数据设计空间的图,那么我们就得到了如图所示的大数据立方体图1.立方体的八个角分别对应一种(众所周知的)数据库技术。例如,传统的RDBMS位于带坐标(small, pull, fk/pk)的顶部/后角,这意味着数据集很小;它假设一个由数据库完全控制的封闭世界,客户端在发出查询后同步地从数据库中拉出行,数据模型基于Codd的关系模型。基于hadoop的系统如HBase位于左上角,带有坐标(big, pull, fk/pk)。数据模型基本上仍然是由行、列和主键组成的矩形,结果由客户机从存储中提取,但数据存储在使用某种分区方案的计算机集群上。
当从顶部平面移动到底部平面时,数据模型从具有主键和外键的行更改为对象和指针。在左下角坐标(small, pull, k/v)处是传统的O/R(对象/关系)映射解决方案,如LINQ to SQL、实体框架和Hibernate,它们将OO(面向对象)贴面置于关系数据库之上。在立方体的前面是带有坐标(大,拉,k/v)的对象的LINQ。方法对实际数据源进行虚拟化IEnumerable < T >
接口,该接口允许动态生成无限个项目集合。在右边,多维数据集从批处理变为流数据,其中数据源异步地向其客户机推送项流。具有行和列数据模型的流数据库系统(如Percolator、StreamBase和StreamInsight)占据了右上轴。
最后,在右下方的坐标(大,推,k/v)处,是Rx(反应扩展),或者有时称为LINQ to Events,这是本文的主题。
Rx的目标是利用标准的面向对象编程语言(如Java、c#或Visual Basic)来协调和编排基于事件和异步计算,如低延迟传感器流、Twitter和社交媒体状态更新、SMS消息、GPS坐标、鼠标移动和其他UI事件、Web套接字以及对Web服务的高延迟调用。
有许多方法可以派生Rx,其中一些涉及到类别理论并诉诸于数学对偶性,但本文展示了每个开发人员如何通过跨越标准JDK (Java开发工具包)来发明Rx。未来的< T >
使用GWT(谷歌Web Toolkit)的接口AsyncCallBack < T >
接口,创建一对接口IObservable < T >
而且IObserver < T >
它对值为类型的异步数据流建模T
.这与众所周知的主体/观察者设计模式相对应。然后,本文将展示如何通过将UI事件和Web服务公开为异步数据流并使用流畅的API组合它们来编写一个简单的ajax风格的应用程序。
GWT开发人员文档包含一个略显歉意的部分,名为“习惯于异步调用”,3.这解释了虽然异步调用乍一看对开发人员来说是残酷和不自然的,但它们是防止UI锁定和允许客户端有多个并发未完成的服务器请求的必要之恶。
在GWT中,同步方法的异步对应物人[]getPeople(…)
,它会进行同步跨网络调用并阻塞,直到返回Person数组为止,它将返回void并接受一个额外的回调参数空白getPeople(…, AsyncCallback
.回调接口有两个方法:void onFailure(可抛出错误)
,当异步调用抛出异常时调用该函数;而且void onSuccess(T result)
,当异步调用成功返回结果值时调用该函数。给定一个异步函数,例如getPeople
,调用通常传递一个匿名接口实现,该接口实现分别处理成功和失败回调,如中所示图2.
虽然GWT中对异步的承诺是值得称赞的,但由于没有进一步细化和统一整个框架中的异步编程模型,它错失了一个巨大的机会。例如,RequestBuilder
类用于直接HTTP调用用于
接口,它有两个方法onError
而且onResponseReceived
这些方法实际上是同构的AsyncCallback
前面讨论过的接口。
这两个AsyncCallback
而且用于
接口假定异步调用一次性交付它们的结果。然而,在本例中,以流的方式增量地返回Person数组的元素非常有意义,特别是在结果集很大甚至无穷大的情况下。方法可以异步流回结果调用onSuccess
方法将被多次调用,一次用于结果数组的每个额外块,另一次是通过添加一个方法空白oncomplete ()
,当所有块都已成功交付时将调用该函数。让我们称之为派生接口观察者< T >
表明它可以观察到多个T
值,以反映标准的Java非泛型观察者
接口。
与观察者< T >
而不是AsyncCallback < T >
,异步计算和客户端之间可能的交互顺序为:成功终止后我0的值;终止失败后j值;或者无穷无尽的值流,永远不会完成,如图所示图3.
将回调直接作为参数传递给异步方法的另一个缺点是,一旦调用完成,要从被调用的回调中撤销回调是很棘手的,只会给您留下一个空白。假设,例如,函数getPeople
流每分钟都返回那些注册了营销推广的人的名字,但你对收到超过前1000个名字的人不感兴趣。如果您在进行调用和接收返回时没有预料到这种模式,那么您以后如何实现这一点呢无效
.即使异步调用最多交付一个值,在一定时间间隔内没有收到结果后,您也可以选择稍后通过超时来忽略或取消调用。同样,如果在传递回调到时预期到这一点,这是可能的getPeople
,但你以后不能改变主意。
这些问题表明,异步计算和流没有被视为可以从方法返回的、存储在变量中的一类值,等等。下一节将介绍如何通过引入一个额外的容器接口来将异步数据流转换为适当的值,该容器接口表示异步计算本身,并在其上注册回调函数,以通知计算的结果。现在异步方法可以返回一个表示暂挂的异步计算或流的值,而不仅仅是无效
.特别是,这允许您在调用之后改变主意,并随意过滤、操作或转换计算。
Java SDK已经提供了(单次)异步计算,作为类值的形式未来的< T >
接口,其主体方法T get ()
获取计算的结果,并在底层计算尚未结束时阻塞:
请注意,原则上,未来的< T >
可用于产生多个值。在这种情况下,每个调用get ()
是否阻塞并返回下一个可用的值,只要结束()
不是真的。这类似于可迭代接口。在本文的其余部分中,假定异步计算返回多个结果流。
虽然期货提供了异步计算的一流表示,但get方法是阻塞的。幸运的是,您可以创建JDK接口未来的< T >
的非阻塞T get ()
方法,使用类型的回调观察者< T >
的接口AsyncCallback < T >
GWT接口)。注意,阻塞isCancelled
而且结束
方法不再需要,因为信息也是通过回调传递的。为了简单起见,忽略get的第二个重载,因为稍后可以很容易地重构它。类的非阻塞版本应用这些更改未来的< T >
接口看起来像这样:
您还没有完成重构。与通过cancel方法取消整个future相比,更有意义的是取消
只是一个特别的未完成的电话得到
每一个观察者。这可以通过出租来实现得到
返回一个表示可取消资源的接口。而且,既然你已经打过电话了得到
,不需要指定mayInterruptIfRunning
参数,因为计算已经在那个点上运行,您可以通过决定是否调用cancel来对布尔值进行编码。最后,你可以做取消
方法的非阻塞性无效
而不是布尔
.你可以试着做取消
返回一个未来的布尔> <
相反,但这样您就会陷入异步的无限递归兔子洞。事实证明java。io。可闭
接口恰好符合要求,导致以下的变异未来的< T >
:
注意,调用close ()
的方法可闭
接口可能会取消底层计算,也可能不会,因为一个可观察对象可能有多个观察者(处理,例如,鼠标移动的订阅,这不应该阻止你的鼠标工作)。但是,由于该观察者没有得到任何进一步值的通知,从它的角度来看,计算已经终止。如果需要,实现的类IObservable < T >
可以用其他方法取消计算。
而不是未来的< T >
而且观察者< T >
, .NET有标准IObservable < T >
而且IObserver < T >
接口;而不是可闭
,它有IDisposable
.类型的值IObservable < T >
(或观察者< T >
取决于您首选的编程语言)表示具有类型值的异步数据流或事件流T
.
仔细观察由此产生的三位一体界面,可以发现经典的主体/观察者界面的通用变体2对于发布/订阅模式,这是面向对象程序员数十年来处理基于事件的系统的主要工具。JDK 1.0已经通过(非泛型)可观测的
类和观察者
接口。在。net中,Rx库支持这种模式。
Rx库做了一些附加的行为假设IObserver < T >
而且IObservable < T >
不通过其(语法)类型签名表示的接口:
IObserver < T >
接口应遵循正则表达式OnNext(t)* (OnCompleted() | OnError(e))?
.换句话说,在0或更多之后OnNext
打电话,随便一个oncomplete
或OnError
将可选地调用。IObserver < T >
可以假定是同步的;从概念上讲,它们运行在一个锁下,类似于常规的。net事件处理程序或反应器模式。10OnError
或oncomplete
被称为。对象返回的订阅订阅
观察者的调用将在流完成时被可观察对象处理。在实践中,这是通过关闭IDisposable
返回的订阅
在实施中OnError
而且oncomplete
方法。IObservable < T >
流应该生成的最优尝试停止该订阅的所有未完成工作。任何已经在进行的工作仍然可以完成,因为终止正在进行的工作并不总是安全的,但不应该向未订阅的观察员发出信号。该契约确保了易于推理和证明操作人员和用户代码的正确性。类的实例可见< T >
在Java中,您将使用匿名内部类并定义抽象基类ObservableBase < T >
负责执行Rx合同。它通过提供subscribe方法的实现而专门化:
由于.NET缺少匿名接口,所以它使用了工厂方法可观察到的。创建
这将从类型的匿名委托创建一个新的可观察实例Func < IObservable < T >, IDisposable >
实现了订阅
功能:
方法返回的具体类型与Java解决方案中一样创建
方法强制执行所需的Rx行为。
一旦有了表示异步数据流的单个接口,就可以将现有的基于事件和回调的抽象(如GUI控件)公开为异步数据流的源。的文本更改事件文本框
控件作为异步数据流,使用中所示的美味令牌沙拉图4.
您可以将UI控件、鼠标、文本字段或按钮想象成一个流数据库,它为底层控件每次触发事件生成一个无限的值集合。相反,具有可设置属性(如列表和标签)的对象可以用作此类异步数据流的观察器。
控件表示的异步数据流IObservable < T >
接口(或可见< T >
在Java中)表现为类型的值的常规集合T
,只不过它们是基于推的或流的,而不是通常的基于拉的集合,如数组和列表IEnumerable < T >
接口(或在Java iterable < T >
).这意味着您可以使用标准查询操作符的流畅API将异步数据流连接在一起,以高度可组合和声明的方式创建复杂的事件处理系统。
例如,在哪里
运算符接受类型的谓词Func <年代,bool >
并过滤掉谓词不包含类型为输入可观察集合的所有值IObservable <年代>
这和它的表兄弟在拉力基础上的工作原理完全一样IEnumerable < T >
集合。图5说明了这一点。
使用此操作符,可以清除公开为的文本字段输入IObservable <字符串>
使用查询表达式输入输入并删除所有空字符串和空字符串。在哪里(s = > ! string.IsNullOrEmpty (s))
.
在带有lambda表达式和防御器方法的Java 8中,代码看起来非常类似于这里所示的c#代码,只是使用->
而不是= >
对于lambda和变量名的不同大小写。但是,即使没有这些即将到来的Java语言特性,您也可以在Flume-Java中近似地获得流畅的Javaas接口1或Reactive4Java9使用标准查询运算符操作事件流。例如,通过将操作符作为方法ObservableBase < T >
,你可以这样写过滤器的例子:
然而,为了避免我们进行太多的输入,下面的两个示例仅用c#提供,尽管没有什么是c#或。net特定的。
的选择
算子取一个变换函数Func < S T >
转换类型的输入数据流中的每个值IObservable <年代>
.这将产生一个新的类型的异步结果流IObservable < T >
就像IEnumerable < T >
的版本,如所见图6.
的SelectMany
运算符通常用于连接两个数据流,基于拉或基于推。SelectMany
取类型的源流IObservable <年代>
和一个类型的充气函数Func <年代,IObservable < T > >
,并从原始源流中的每个元素生成一个包含零个、一个或多个元素的新嵌套流。然后,它将所有中间异步数据流合并为单个类型的输出流IObservable < T >
,如图7.
的SelectMany
运算符清楚地显示了异步性质之间的区别IObservable < T >
和同步的本质IEnumerable < T >
.作为图7显示,源流上的值是异步出现的,即使您仍然从前面的充气函数产生值。在IEnumerable < T >
例中,只有在生成膨胀器函数的所有值之后,才从源流中提取下一个值(也就是说,输出流是所有后续膨胀器生成的流的拼接,而不是不确定的交错),如图8.
有时,使用更有顺序的模式来生成异步流的输出流是很方便的。如图9,开关
运算符接受嵌套的异步数据流IObservable < IObservable < T > >
并生成到该点为止已接收到的最新内部异步数据流的元素。这会产生一个新的非嵌套异步数据流IObservable < T >
.它允许后面的流覆盖前面的流,总是产生“最新的可能结果”,就像滚动的新闻提要。
的实现,有决心的读者可以尝试创建自己的实现开关
,结果证明,这是惊人的棘手,特别是处理所有的边缘条件,同时也满足Rx契约。
使用前面介绍的流畅API,可以用几行代码编写原型Ajax“字典建议”程序。假设您有一个字典查找Web服务,给定一个单词,它通过以下方法异步返回该单词的补全数组:
还假设使用这些帮助程序,您已经将GUI文本字段输入公开为IObservable <字符串>
以便在每次输入更改时生成文本字段的值,并且您已经将标签输出包装为IObserver < string [] >
显示字符串数组的异步数据流。然后可以连接一个管道,该管道异步调用完成
服务为输入到文本字段的每个部分的单词,但只显示标签上的最新结果:
的影响开关
操作符是每次进行另一个异步调用时完成
在响应输入中的更改时,结果被切换为接收最新调用的输出,如图10,之前所有未执行的调用的结果将被忽略。
这不仅仅是性能优化。不使用开关
,将有多个未处理的请求完成
服务,因为流是异步的,结果可能以任意顺序返回,可能用旧请求的结果更新UI。
通过在查询中插入更多的操作符,可以改进这个基本的字典示例。第一个运算符是DistinctUntilChanged(IObservable
,它确保异步数据流只包含不同的相邻元素——换句话说,它删除了等价的附加元素。对于本例,这确保了完成
只在输入实际发生更改时调用。
第二,如果用户输入的速度比您调用Web服务的速度快,那么大量的工作就会浪费掉,因为您发出了许多请求,只是在输入再次更改时,在返回前一个结果之前立即取消它们。相反,你至少要等一等N
使用运算符进行最后一次更改后的毫秒数节流(IObservable
.节流操作符通过忽略后面紧跟在指定延迟之前的另一个值对异步数据流进行采样,如所示图11.
节流阀操作员丢弃了以过高速率进入的事件;但是,可以很容易地定义其他操作符,在(翻转)窗口中聚合事件或以特定间隔采样输入流。
这里给出的最后一个Ajax程序是一个数据流管道,它只在最近10毫秒内输入没有触发新的不同值时调用字典服务;如果一个新的输入值出现在前一个值返回完成之前,它会忽略服务的结果:
当然,IObservable < T >
接口并不局限于UI事件和异步计算,它同样适用于任何基于推送的数据流,如tweet、股票贴纸和GPS位置,当然还适用于我们一开始使用的GWT的异步服务调用。例如,你可以将筛选给定标签的Twitter流建模为方法:
这将向调用函数的客户端推送(无限)推文流。此外,向内看(而不是向外看),观察者是异步I/O和协处理器操作(如来自dsp(数字信号处理器)和gpu(图形处理器)的操作)的“完成端口”的自然表达。
到目前为止,我们已经能够避免“M”这个词(以及“L”这个词),但没有更多的隐藏它。如果我们忽略操作方面的问题,如异常、终止和取消订阅,并将事情归结为它们的本质,那么IObservable < T >
而且IObserver < T >
接口表示类型的函数(T - > ()) - > ()
,这是延续单子,所有单子的母亲,和一个联合单子。
在过去,我们并没有通过本文中执行的重构发现Rx接口。相反,我们应用的定义绝对的二元性从维基百科的字面意思到IEnumerable < T >
而且IEnumerator < T >
接口用于拉基集合,从而派生出IObservable < T >
而且IObserver < T >
接口完全机械地通过交换所有方法签名的参数和结果来实现,而不受过程中任何操作直觉的指导。
注意,我们的异步数据流模型没有对时间做特殊的假设。这使得该方法不同于函数式编程中的典型响应式编程方法,如Fran或FlapJax,它们强调(连续的)时变值(称为行为),以及基于sql的复杂事件处理系统,如StreamBase和StreamInsight,它们在语义模型中也强调时间。相反,时钟和计时器被视为常规的异步数据流类型IObservable < DateTimeOffset >
.我们通过另一个接口参数化并发性和逻辑时钟IScheduler
(这里稍微简化了一点),它表示一个执行上下文,它有一个本地的时间概念,可以在未来安排工作:
Java程序员将立即看到与执行程序接口的对应关系,该执行程序接口在Java SDK中扮演着与精确引入并发性相同的抽象角色。
Web和移动应用程序越来越多地由异步和流服务组成,强调大数据的三个V的速度方面。本文展示了如何将异步数据流公开为基于推的类型集合IObservable < T >
(与基于拉的类型集合相反IEnumerable < T >
)以及如何使用Rx库提供的流畅API操作符查询这些异步数据流。这个流行的库可用于。net和JavaScript(包括针对流行框架的绑定,如JQuery和Node),也附带在Windows Phone的ROM中。f#的一级事件基于Rx和其他语言的替代实现,比如Dart8或Haskell,7都是由社区创造的。
要了解更多关于LINQ,特别是Rx的知识,请阅读简短的教科书响应式扩展和LINQ编程.5
相关文章
在queue.acm.org
九个即时通讯帐户和计数
乔·希尔德布兰德
http://queue.acm.org/detail.cfm?id=966720
在异步世界中调试
迈克尔Donat说
http://queue.acm.org/detail.cfm?id=945134
脚本化Web服务原型
克里斯托弗·文森特
http://queue.acm.org/detail.cfm?id=640158
1.C. Chambers, Raniwala, A. Perry, F. Adams, S. Henry, R. R. Bradshaw, R. Weizenbaum, N. FlumeJava:简单、高效、数据并行管道。ACM SIGPLAN编程设计与实现会议论文集(2010);https://dl.acm.org/citation.cfm?id=1806638.
2.尤斯特,p。菲尔伯,P. A.,格雷欧,R., Kermarrec, A. m .。发布/订阅的多种形式。ACM计算调查35, 2 (2003), 114131;https://dl.acm.org/citation.cfm?id=857076.857078.
3.谷歌Web Toolkit。习惯异步调用(2007);http://www.gwtapps.com/doc/html/com.google.gwt.doc.DeveloperGuide.RemoteProcedureCalls.GettingUsedToAsyncCalls.html.
4.3D数据管理:控制数据量、速度和种类。应用交付策略(2001);http://blogs.gartner.com/douglaney/files/2012/01/ad949-3D-Data-Management-Controlling-Data-Volume-Velocity-and-Variety.pdf.
5.利伯蒂,J,贝茨,P。响应式扩展和LINQ编程.Apress,纽约,2011;http://www.apress.com/9781430237471.
6.Meijer, E.和Bierman, G.大型共享数据库的数据协同关系模型。Commun。ACM 54(2011年4月),4958。
7.Reactive-bacon;https://github.com/raimohanska/reactive-bacon.
8.Reactive-Dart;https://github.com/prujohn/Reactive-Dart.
9.Reactive4java;https://code.google.com/p/reactive4java/.
10.维基百科。反应器模式;https://en.wikipedia.org/wiki/Reactor_pattern.
©2012 acm 0001-0782/12/0500 $10.00
允许为个人或课堂使用部分或全部作品制作数字或硬拷贝,但不得为盈利或商业利益而复制或分发,且副本在首页上附有本通知和完整的引用。除ACM外,本作品的其他组件的版权必须受到尊重。允许有信用的文摘。以其他方式复制、重新发布、在服务器上发布或重新分发到列表,都需要事先获得特定的许可和/或费用。请求发布的权限permissions@acm.org传真(212)869-0481。
数字图书馆是由计算机协会出版的。版权所有©2012 ACM, Inc.
没有找到条目