原創|使用教程|編輯:陳俊吉|2016-07-13 10:17:20.000|閱讀 498 次
概述:IBM InfoSphere Streams是一個高級計算平臺,幫助用戶開發的應用程序快速攝取、分析和關聯來自數千個實時源的信息。該解決方案可處理非常高的數據吞吐率,最高可達每秒數百萬個事件或消息。
# 界面/圖表報表/文檔/IDE等千款熱門軟控件火熱銷售中 >>
相關鏈接:
是一個高級計算平臺,幫助用戶開發的應用程序快速攝取、分析和關聯來自數千個實時源的信息。該解決方案可處理非常高的數據吞吐率,最高可達每秒數百萬個事件或消息。該平臺支持流數據的實時處理,支持不斷更新持續查詢的結果,可在仍在移動的數據流中檢測洞察。Streams旨在從一個幾分鐘到幾小時的窗口中的移動信息(數據流)中揭示有意義的模式。該平臺能夠獲取低延遲洞察,并為注重時效的應用程序(比如欺詐檢測或網絡管理)獲取更好的成果,從而提供業務價值。流處理的演示如下圖所示:
Streams 的主要設計目的是:
提供了一種編程模型和 IDE 來定義數據來源,還提供了已融合到處理執行單元中的稱為運算符的軟件分析模塊。它還提供了基礎架構來支持從這些組件合成可擴展的流處理應用程序。主要平臺組件包括:
Streams Processing Language (SPL),Streams 的編程語言,是一種分布式數據流合成語言。它是一種類似 C++ 或 Java™ 的可擴展且全功能的語言,支持用戶定義的數據類型。您可以使用 SPL 或原生語言(C++ 或Java)編寫自定義函數。也可以使用 C++ 或 Java 編寫用戶定義的運算符。
Streams 通過SPL將應用程序會描述一個導向圖,該圖由各個互聯且處理多個數據流的運算符組成。數據流可來自系統外部,或者在應用程序內部生成。SPL 程序的基本構建塊包括:
Java 作為面向對象的高級編程語言,以其使用簡單、完全面象對象、平臺可移植性、健壯的沙盒安全機制、動態性,以及大量可用的開發包等一系列優勢,在互聯網分布式環境下得到了極其廣泛的應用,具有廣泛的用戶基礎。為了Streams用戶重用已有的Java開發技能、保護已有的Java資產,IBM Streams平臺提供了使用 Java 編程語言來構建 Streams 應用程序的框架,具體包括 Java 運算符模型描述文件以及 Java 運算符 API(JavaOp)兩種方式。這兩種方式在一定程度上讓開發人員集成Java功能模塊。
雖然Streams所提供的Java運算符模型描述文件以及Java運算符API(JavaOp)方式支持了Java代碼調用,但是,傳統的Java是面向對象的編程語言,它只能幫助開發人員實現業務邏輯或重用Java代碼,但它無法以“流處理”的思維,直接進行類似SPL的流應用開發。
streamsx.topology開源項目的出現,豐富了Streams的開發方式,為流應用的開發者提供更多的語言選擇。streamsx.topology項目提供Java Application API,面向流處理應用的將Java封裝成一套類庫,使得開發者完全使用Java和Scala語言并按照“流處理”的思維創建IBM Streams流處理應用。
streamsx.topology開源項目參考網址:
//ibmstreams.github.io/streamsx.topology/
1. 從www.ibm.com/software/data/infosphere/stream-computing/trials.html下載“IBM InfoSphere Streams 4.0 Java API BetaQuickStart VM Image”。Streams Quick StartEdition 是 InfoSphere Streams 的一個免費的、可下載的非生產版本,它沒有數據或時間限制,支持您在自己的獨特環境中試驗流計算,構建一個強大的分析平臺。該平臺能夠處理難以置信的高數據吞吐量,高達每秒數百萬個事件或消息。InfoSphere Streams QuickStart Edition 沒有提供支持選項,僅適用于非生產用途。要獲得相應的支持,請購買 InfoSphereStreams。
2.解壓VM鏡像,并在VMPlayer啟動VM。
該VM已經安裝com.ibm.streamsx.topology工具箱,工具箱位于/home/streamsadmin/streamx.topology/streamsx.topology,包含:
1)在桌面雙擊InfoSphere Streams Studio (Eclipse)圖標啟動Streams Studio.
2)指定workspace為:/home/streamsadmin/Workspaces/topology/
3) 運行"Hello World" 示例程序:在Project Explorer標簽, 打開src->simple->HelloWorld->HelloWorld.java,代碼如下:
package simple; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; publicclass HelloWorld { publicstaticvoid main(String[] args) throws Exception { /* * Create the container for the topology that will * hold the streams of tuples. */ Topology topology = new Topology("HelloWorld"); /* * Declare a source stream (hw) with String tuples containing two tuples, * "Hello" and "World!". */ TStream<String> hw = topology.strings("Hello", "World!"); /* * Sink hw by printing each of its tuples to System.out. */ hw.print(); if (args.length == 0) StreamsContextFactory.getEmbedded().submit(topology).get(); else StreamsContextFactory.getStreamsContext(args[0]).submit(topology) .get(); } } |
4) 運行"Hello World" 示例程序:右擊HelloWorld.java,選擇Run As-> Run Configurations. 在Run Configurations 'Main' 標簽頁面,確保Main class填 simple.HelloWorld. 在 arguments標簽頁面, 設置Program arguments為EMBEDDED (EMBEDDED表示程序獨立編譯并嵌入到JVM運行,而不依賴Streams運行時環境)。
5) 設置必要參數后,運行該應用您會看到以下的輸出:
Hello
world!
我們創建一個名叫MyGrep的Sample應用,用于指導關鍵字搜索某個文件夾下的文件,搜索到則顯示相應內容所在的行數和內容。具體步驟如下:
1)創建Java項目: File->New->Project->Java->JavaProject,點擊Next,在Create a Java Project填寫MySamples,點擊Next。
2)在Libraies標簽頁:
點擊External Jar按鈕,選擇com.ibm.streams.topology.jar
點擊Add Library按鈕,選擇IBM InfoSphere Streams
點擊Next和Finish完成項目的創建。新創建項目視圖如下圖所示:
3)創建命名空間:右擊src->New->Package->JavaPackage的Name填寫:mysapce
4)創建Java主類:src->右擊myspace->New->Class,在Name填寫:mysapce,確保勾選“public static void main(String[]args)”。確定后生成MyGrep.java。
5)創建Java類:src->右擊myspace->New->Class,在Name填寫:GrepInfo,不要勾選“public static void main(String[]args)”,確定后生成GrepInfo.java。
6)MyGrep.java和GrepInfo.java的代碼內容如下:
MyGrep.java
package myspace; import java.io.ObjectStreamException; import java.util.Arrays; import java.util.concurrent.Future; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; import com.ibm.streamsx.topology.file.FileStreams; import com.ibm.streamsx.topology.function7.Function; publicclass MyGrep { publicstaticvoid main(String[] args) throws Exception { String contextType = args[0]; String directory = args[1]; final String term = args[2]; Topology topology = new Topology("MyGrep"); TStream<String> filePaths = FileStreams.directoryWatcher(topology, directory); TStream<String> lines = FileStreams.textFileReader(filePaths); TStream<GrepInfo> grepInfo = lines.multiTransform( new Function<String, Iterable<GrepInfo>>() { privatestaticfinallongserialVersionUID = 1L; privateintlineNum = 0; @Override public Iterable<GrepInfo> apply(String line) { ++lineNum; if(line.contains(term)){ return Arrays.asList(new GrepInfo(lineNum, line)); } else returnnull; } private Object readResolve() throws ObjectStreamException { returnthis; } }, GrepInfo.class); grepInfo.print(); Future<?> future = StreamsContextFactory.getStreamsContext(contextType) .submit(topology); Thread.sleep(30 * 1000); future.cancel(true); } } |
GrepInfo.java
package myspace; import java.io.Serializable; import com.ibm.streamsx.topology.tuple.Keyable; publicclass GrepInfo implements Keyable<GrepInfo>, Serializable { privatestaticfinallongserialVersionUID = 1L; intlineNum; String lineStr; public GrepInfo(int ln, String ls) { this.lineNum = ln; this.lineStr = ls; } @Override public String toString() { return"Line Num " + lineNum + " : " + lineStr; } @Override public GrepInfo getKey() { // TODO Auto-generated method stub returnnull; } } |
7)運行MyGrep之前,請確保Streams Instance已經啟動,并在/home/streamsadmin/test創建一個文本文件并寫如若干內容。
8)運行程序:右擊MyGrep.java,選擇Run As -> RunConfigurations. 在Run Configurations 'Main' 標簽頁面,確保Project填寫MySamples和Main class填 myspace.MyGrep。
在 arguments標簽頁面, 設置Program arguments為DISTRIBUTED /home/streamsadmin/test China (DISTRIBUTED 表示程序部署到Streams運行時環境,/home/streamsadmin/test是程序搜索關鍵的目錄;China是搜索關鍵字)。
9)查看結果:
在Streams Exploere -> StreamsInstances ->右擊default:<instance>@<Domain>,選擇Show Instance Graph
在Instance Graph窗口,我們能看到MyGrep最終運行圖。右擊最后的Print PE->Show Log->Show PEConsole
在Console將會顯現MyGrep運行的結果
streams.topology開源項目所提供的Java Application API使得Streams開發者對流應用的編程語言有了新的選擇,它能幫助開發者重用Java編程能力,并按照“流處理”的思路簡化流應用的開發過程,讓開發者更專注于業務的處理邏輯而不是流處理的框架。然而,該項目還處于早期階段,很多功能和接口尚未實現;對比成熟的、完善的SPL,Java Application API的功能和成熟性還有很大差距。相信在不久的將來,streams.topology將會逐漸完善并成為IBM Streams平臺的一個重要補充。
詳情請咨詢!
客服熱線:023-66090381
本站文章除注明轉載外,均為本站原創或翻譯。歡迎任何形式的轉載,但請務必注明出處、不得修改原文相關鏈接,如果存在內容上的異議請郵件反饋至chenjj@fc6vip.cn