SQL Server 2008 R2 StreamInsight, Reactive Programming och IObservable

Med releasen av SQL Server 2008 R2 följer en ny plattform för att monitorera och
agera på förändringar i dataströmmar och att hitta nyckeltal i historisk data. Plattformen heter StreamInsight och är en fristående del av SQL Server 2008 R2. I StreamInsight kan vi skapa applikationer som identifierar och analyserar händelser ur data och därefter startar processer för att agera på dem. Exempel på data kan vara aktiekurser, försäljningsdata, klick på en website, data från tillverkningsmaskiner, data från RFIDsensorer eller annan data som skapas kontinuerligt.

System för att hantera dataströmmar har funnits länge men har traditionellt varit specifika lösningar för varje typ av dataström, t ex styrsystem för tillverkningsindustri och system för automatisk trading i finansvärlden. En gemensam nämnare för dessa system är att man vill fånga och agera på vissa specifika händelser. Complex Event Processing (CEP) kallas processen som generaliserat denna idé att man ur en stor mängd händelser destillerar ut ett mindre antal som är just de vi är intresserade av. StreamInsight är en plattform för Complex Event Processing.

StreamInsight

Med StreamInsight får vi en möjlighet att hantera stora dataströmmar i nära realtid, något vi inte tidigare kunnat göra i relationsdatabaser. Hur går det till? Jo, i StreamInsight har man vänt på hur vi accessar data. I traditionella databaser så ställer vi enskilda frågor (”Pull”) mot en databas som svarar för varje fråga vi ställer. I StreamInsight har vi istället en stående fråga som besvaras kontinuerligt ( ”Push” ) ifrån strömmen av data som vi monitorerar. Detta gör att vi får mycket mindre latency, typiskt millisekunder eller mindre, i våra svar och att vi kan hantera mycket snabbare strömmar av data, typiskt tiotusentals värden i sekunden eller mer.

Detta öppnar möjligheter för nya typer av applikationer, t ex för personalisering  av webcontent baserat på användningsmönster, fraud detection för kreditkort och andra betalningsmedel, business activity monitoring, säkerhetsmonitorering, processtyrning eller hur många andra som helst. Ju mer av tillvaron som digitaliseras ju mer data skapas kontinuerligt som vi kan monitorera, förädla och agera utifrån.

StreamInsight använder adaptrar eller IObservable interfacet för att kommunicera. Input-adaptrar för att ta in dataströmmar och Output-adaptrar för att skicka ut dataströmmar.
I ett system som använder StreamInsight vill vi normalt spara originaldataströmmarna ner i ett Stream Data Store. Strömmarna skickas sedan in i en Event Processing Engine i vilken analysen av dataströmmen görs. När vi når vissa värden( händelser ) i vår analys så skickar vi ut ett nytt värde ( händelse ) som kan trigga en process. Ett vanligt mönster i analysen är att vi letar efter vissa KPI ( Key Performance Indicators ) och jämför med tröskelvärden för dessa för att bestämma en ny händelse som vi vill kommunicera vidare. Denna programmeringsmodell kallas Event Driven Programming eller Reactive Programming.

Koden som skrivs för att analysera strömmen skrivs i LINQ. Ett exempel är:

var outputStream = from e in inputStream
                   where e.value < 10
                   select e;

för att i strömmen outputStream ta ut händelserna att värdet på e i en inputStream är mindre än 10. Vi kan givetvis ha mycket mer komplexa frågor än den ovan. Ett exempel:

var sevenDayAveragePriceStream = from e in priceStream
                              group e by e.FieldID into eGroup
                              from w in

eGroup.HoppingWindow(TimeSpan.FromDays(7), TimeSpan.FromDays(1), HoppingWindowOutputPolicy.ClipToWindowEnd)
                                        select new Price()
                                        {
                                            StockID = "7-day avg",
                                            FieldID = eGroup.Key,
                                            Value = w.Avg(e => e.Value)
                                        };

i vilket vi tar ut en ström av 7 dagars löpande medelvärden av ett pris. Genom att spara originalströmmen kan vi analysera den igen om vi har ändrat på analyskoden och vi kan på detta sätt iterativt förbättra vår analyskod.

Reactive Programming

Reactive Programming, med LINQ på detta sätt, har sina rötter i Functional Programming och har det sköna komponerbara flytet att data strömmar genom koden som blir väldigt deklarativ. Reactive Programming kan inte bara användas för CEP i StreamInsight. Det har också stor potential för att vi lättare skall kunna programmera asynkront på alla ställen det behövs. T ex i ett MVC-baserat GUI för att få händelser i modellen ( data ) att reflekteras i vyn. Eller att kunna asynkront anropa tjänster över internet ( mycket användbart i Windows Azure ) och sedan kunna reagera på den efterfrågade datan alltefter som den kommer inströmmande. Reactive Programming har stora likheter med Observer Pattern i objektorienterad programmering men strömmandet av data skiljer.

Microsofts initiativ för Reactive Programming kommer inte bara från StreamInsight. Samtidigt och parallellt utvecklades också Reactive Framework ( Rx ) för att, som de själva skriver:
”Rx is a library for composing asynchronous and event-based programs using observable collections.”

Grundstommen i Reactive Framework är IObservable-interfacet som Erik Meijer, som också skapade LINQ, hittade på. Hur han löste de sista detaljerna är ett bra exempel på något vi nog känner igen, nämligen att saker klarnar när vi skall förklara det för andra. Hur ofta kommer vi inte själva på lösningen till ett problem då vi skall förklara det när vi ber om hjälp? För Erik Meijer föll de sista bitarna på plats när han skulle förklara sin nya idé om ”The Dual” av IEnumerable för en kollega.

IObservable

IObservable är “the Dual” av IEnumerable, vilket betyder att det funkar likt IEnumerable fast tvärtom. På samma sätt som vi sade att StreamInsight-frågor vänder på databasfrågor från ”Pull” till ”Push”, så vänder IObservable från ”Pull” i IEnumerble ( IEnumerator.Current ) till ”Push” i IObservable ( IObserver.OnNext() ). Data strömmar till en Observer allt eftersom och dyker upp i OnNext metoden. Interfacet definieras:

interface IObservable<out T> {
   IDisposable Subscribe( IObserver o );
}

interface IObserver<in T> {
   void OnCompleted();
   void OnNext( T v );
   void OnError( Exception e );
}
 

Där metoden OnNext används för att ta emot nya värden, OnCompleted för att avsluta en ström och OnError för att fånga Exceptions i flödet. Ett enkelt exempel på hur man kan använda det för att fånga musklick och skriva dem i Debug-strömmen är:

var mouseLeftDown = Observable.FromEvent<MouseButtonEventArgs>( sender, "MouseLeftButtonDown" );
      mouseLeftDown.Subscribe( mouseButtonEventArg => Debug.WriteLine( mouseButtonEventArg.EventArgs.ButtonState.ToString()));

Eller lite mer komplext för Auto Suggestions i en webbapplikation, dvs. att föreslå uttryck efterhand som vi matar in bokstäver i en textbox. De inmatade bokstäverna fångas i ett fragment och slås upp mot ett dictionary av förslag:

IObservable<Html> q = from fragment in textBox
               from definitions in dictionary.Lookup(fragment, 10).Until(textBox)
               select definitions.FormatAsHtml();
q.Subscribe( suggestions => { div.InnerHtml = suggestions; })

  Vi sa tidigare att IObservable är ett sätt att koppla ihop in - och utdataströmmar i StreamInsight. Men i RTM-releasen av StreamInsight så är stödet för IObservable borttaget pga. av att man bara ville ha beroenden till .NET 3.5 i releasen av StreamInsight medan IObservable först är officiellt releasad i .NET 4.0. Det skall komma en fix för detta i StreamInsight inom kort.

  Reactive Programming är en mycket spännande programmeringsmodell som jag tror kommer användas mycket i framtiden. Speciellt den asynkrona aspekten och komponerbarheten ger unika möjligheter. StreamInsight låter oss analysera data i nära realtid och att agera från det, vilket jag tror kommer bli viktigare i framtiden med fler och fler dataflöden att lyssna in.


Per Eklund, konsult och kursledare inom arkitektur och systemutveckling.


Totalt

0SEK
Till Kassan
exkl. moms
Addskills
Stockholm
Olof Palmes gata 31
Telefon: +46 8 440 11 00
E-post: info@addskills.se
Göteborg
Lindholmspiren 5
Telefon: +46 31 46 72 00
E-post: info@addskills.se
Malmö
Västergatan 38
Telefon: +46 40 92 38 00
E-post: info@addskills.se
Uppsala
Bangårdsgatan 13
Telefon: +46 18 50 00 30
E-post: info@addskills.se
Linköping
Teknikringen 1E
Telefon: +46 13 37 67 50
E-post: info@addskills.se

Addskills AB, Säte: Stockholm, Org.nr: 556404-6133

webmaster@addskills.se
Clear