Programování

Jak vytvářet stavové streamovací aplikace s Apache Flink

Fabian Hueske je redaktorem a členem PMC projektu Apache Flink a spoluzakladatelem Data Artisans.

Apache Flink je rámec pro implementaci stavových aplikací pro zpracování streamů a jejich spouštění v měřítku na výpočetním klastru. V předchozím článku jsme zkoumali, co je stavové zpracování streamu, jaké případy použití řeší a proč byste měli implementovat a spouštět své streamovací aplikace s Apache Flink.

V tomto článku představím příklady dvou běžných případů použití stavového zpracování proudu a proberu, jak je lze implementovat pomocí Flinku. Prvním případem použití jsou aplikace řízené událostmi, tj. Aplikace, které přijímají nepřetržité proudy událostí a na tyto události používají určitou obchodní logiku. Druhým je případ použití streamovací analýzy, kde představím dva analytické dotazy implementované pomocí Flink's SQL API, které agregují streamovaná data v reálném čase. My ve společnosti Data Artisans poskytujeme zdrojový kód všech našich příkladů ve veřejném úložišti GitHub.

Než se ponoříme do podrobností příkladů, představím proud událostí, který je přijímán ukázkovými aplikacemi, a vysvětlím, jak můžete spustit kód, který poskytujeme.

Proud událostí jízdy taxíkem

Naše ukázkové aplikace jsou založeny na veřejném datovém souboru o jízdách taxíkem, ke kterému došlo v New Yorku v roce 2013. Organizátoři Grand Challenge DEBS (ACM International Conference on Distributed Event-Based Systems) 2015 uspořádali původní datový soubor a převedli jej na jediný soubor CSV, ze kterého čteme následujících devět polí.

  • Medailon - ID součtu taxíku MD5
  • Hack_license - identifikační číslo MD5 taxislužby
  • Pickup_datetime - čas, kdy byli cestující vyzvednuti
  • Dropoff_datetime - čas, kdy byli cestující vysazeni
  • Pickup_longitude - zeměpisná délka místa vyzvednutí
  • Pickup_latitude - zeměpisná šířka místa vyzvednutí
  • Dropoff_longitude - zeměpisná délka místa předání
  • Dropoff_latitude - zeměpisná šířka místa předání
  • Total_amount - celková částka zaplacená v dolarech

Soubor CSV ukládá záznamy ve vzestupném pořadí podle atributu doby odchodu. Se souborem lze tedy zacházet jako se seřazeným protokolem událostí, které byly publikovány po ukončení cesty. Chcete-li spustit příklady, které poskytujeme na GitHubu, musíte si stáhnout datovou sadu výzvy DEBS z Disku Google.

Všechny ukázkové aplikace postupně načítají soubor CSV a přijímají jej jako proud událostí jízdy taxíkem. Od té doby aplikace zpracovávají události stejně jako jakýkoli jiný stream, tj. Jako stream, který je přijímán ze systému publikování a odběru založeného na protokolu, jako je Apache Kafka nebo Kinesis. Ve skutečnosti je čtení souboru (nebo jiného typu trvalých dat) a jeho zacházení s datovým proudem základním kamenem přístupu společnosti Flink ke sjednocení dávkového a streamového zpracování.

Spuštění příkladů Flink

Jak již bylo zmíněno dříve, zdrojový kód našich ukázkových aplikací jsme publikovali v úložišti GitHub. Doporučujeme vám rozvětvit a naklonovat úložiště. Příklady lze snadno spustit z vašeho IDE podle vašeho výběru; pro jejich spouštění nemusíte nastavovat a konfigurovat klastr Flink. Nejprve importujte zdrojový kód příkladů jako projekt Maven. Poté spusťte hlavní třídu aplikace a jako programový parametr zadejte umístění datového souboru (odkaz na stažení dat viz výše).

Jakmile spustíte aplikaci, spustí místní, vloženou instanci Flink uvnitř procesu JVM aplikace a odešle aplikaci k jejímu provedení. Během spouštění Flink a plánování úkolů úlohy uvidíte spoustu výpisů z protokolu. Jakmile je aplikace spuštěna, její výstup bude zapsán na standardní výstup.

Vytváření aplikace řízené událostmi ve Flinku

Nyní pojďme diskutovat o našem prvním případu použití, kterým je aplikace řízená událostmi. Aplikace založené na událostech přijímají proudy událostí, provádějí výpočty při příjmu událostí a mohou vydávat nové události nebo spouštět externí akce. Více aplikací řízených událostmi lze skládat jejich vzájemným propojením prostřednictvím systémů protokolu událostí, podobně jako je možné skládat velké systémy z mikroslužeb. Aplikace založené na událostech, protokoly událostí a snímky stavu aplikace (známé jako body uložení ve Flinku) obsahují velmi výkonný vzor návrhu, protože můžete obnovit jejich stav a přehrát jejich vstup, abyste se zotavili ze selhání, opravili chybu nebo migrovali aplikace do jiného klastru.

V tomto článku budeme zkoumat aplikaci řízenou událostmi, která podporuje službu, která sleduje pracovní dobu taxikářů. V roce 2016 se NYC Taxi and Limousine Commission rozhodla omezit pracovní dobu taxikářů na 12hodinové směny a vyžadovat přestávku nejméně osm hodin před zahájením další směny. Posun začíná začátkem první jízdy. Od té doby může řidič zahájit nové jízdy do 12 hodin. Naše aplikace sleduje jízdy řidičů, označuje čas ukončení jejich 12hodinového okna (tj. Čas, kdy mohou zahájit poslední jízdu) a označuje jízdy, které porušily nařízení. Celý zdrojový kód tohoto příkladu najdete v našem úložišti GitHub.

Naše aplikace je implementována pomocí rozhraní Flink DataStream API a KeyedProcessFunction. DataStream API je funkční API a je založeno na konceptu typovaných datových proudů. A Datový tok je logická reprezentace proudu událostí typu T. Proud se zpracovává aplikací funkce, která vytváří jiný datový proud, případně jiného typu. Flink zpracovává proudy paralelně distribucí událostí do proudových oddílů a aplikací různých instancí funkcí na každý oddíl.

Následující fragment kódu ukazuje tok na vysoké úrovni naší monitorovací aplikace.

// nasávat proud jízd taxíkem.

DataStream rides = TaxiRides.getRides (env, inputPath);

Datový tok upozornění = jízdy

// stream oddílu podle ID řidičského průkazu

.keyBy (r -> r.licenseId)

// sledovat události jízdy a generovat oznámení

.process (new MonitorWorkTime ());

// tisk oznámení

notificationss.print ();

Aplikace začne přijímat proud událostí jízdy taxíkem. V našem příkladu jsou události čteny z textového souboru, analyzovány a uloženy v TaxiRide POJO objekty. Aplikace v reálném světě by obvykle přijímala události z fronty zpráv nebo protokolu událostí, jako je Apache Kafka nebo Pravega. Dalším krokem je zadání klíče TaxiRide události od licenseId řidiče. The keyBy operace rozděluje proud na deklarované pole, takže všechny události se stejným klíčem jsou zpracovávány stejnou paralelní instancí následující funkce. V našem případě rozdělíme na licenseId protože chceme sledovat pracovní dobu každého jednotlivého řidiče.

Dále použijeme MonitorWorkTime funkce na oddílu TaxiRide Události. Tato funkce sleduje jízdy na řidiče a sleduje jejich posuny a doby přestávek. Vydává události typu Tuple2, kde každá n-tice představuje oznámení skládající se z ID licence ovladače a zprávy. Nakonec naše aplikace vydá zprávy tak, že je vytiskne na standardní výstup. Aplikace v reálném světě by zapisovala oznámení do externí zprávy nebo do úložného systému, jako je Apache Kafka, HDFS nebo do databázového systému, nebo by spustila externí volání, aby je okamžitě vytlačila.

Nyní, když jsme diskutovali o celkovém toku aplikace, pojďme se podívat na MonitorWorkTime funkce, která obsahuje většinu skutečné obchodní logiky aplikace. The MonitorWorkTime funkce je stavová KeyedProcessFunction které přijímají TaxiRide události a emise Tuple2 evidence. The KeyedProcessFunction rozhraní obsahuje dvě metody zpracování dat: processElement () a na časovači(). The processElement () metoda je volána pro každou příchozí událost. The na časovači() metoda se volá, když se spustí dříve registrovaný časovač. Následující úryvek zobrazuje kostru souboru MonitorWorkTime funkce a vše, co je deklarováno mimo metody zpracování.

veřejná statická třída MonitorWorkTime

rozšiřuje KeyedProcessFunction {

// časové konstanty v milisekundách

soukromé statické konečné dlouhé ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 hodin

soukromá statická konečná dlouhá REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 hodin

soukromá statická konečná dlouhá CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 hodin

soukromý přechodný formátovač DateTimeFormatter;

// popisovač stavu pro uložení počátečního času směny

ValueState shiftStart;

@ Přepis

public void open (Configuration conf) {

// popisovač stavu registrace

shiftStart = getRuntimeContext (). getState (

nový ValueStateDescriptor („shiftStart“, Types.LONG));

// inicializuje formátovač času

this.formatter = DateTimeFormat.forPattern („rrrr-MM-dd HH: mm: ss“);

  }

// processElement () a onTimer () jsou podrobně popsány níže.

}

Funkce deklaruje několik konstant pro časové intervaly v milisekundách, formátovač času a popisovač stavu pro klíčovaný stav, který spravuje Flink. Spravovaný stav je pravidelně kontrolován a automaticky obnoven v případě selhání. Stav klíče je uspořádán podle klíče, což znamená, že funkce bude udržovat jednu hodnotu na popisovač a klíč. V našem případě MonitorWorkTime funkce udržuje a Dlouho hodnota pro každý klíč, tj. pro každý licenseId. The shiftStart stav ukládá počáteční čas směny řidiče. Stavový popisovač je inicializován v otevřeno() metoda, která je volána jednou před zpracováním první události.

Nyní se podívejme na processElement () metoda.

@ Přepis

public void processElement (

Jízda taxíkem,

Kontext ctx,

Kolektor out) vyvolá výjimku {

// vyhledá čas začátku poslední směny

Dlouhý startTs = shiftStart.value ();

if (startTs == null ||

startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

// toto je první jízda nové směny.

startTs = ride.pickUpTime;

shiftStart.update (startTs);

long endTs = startTs + ALLOWED_WORK_TIME;

out.collect (Tuple2.of (ride.licenseId,

„Můžete přijímat nové cestující do„ + formatter.print (endTs)));

// registrační časovač k vyčištění stavu za 24 hodin

ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

} else if (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

// tato jízda začala po skončení povolené pracovní doby.

// je to porušení předpisů!

out.collect (Tuple2.of (ride.licenseId,

„Tato jízda porušila předpisy o pracovní době.“));

  }

}

The processElement () metoda je volána pro každého TaxiRide událost. Nejprve metoda načte počáteční čas posunu řidiče ze stavového ovladače. Pokud stát neobsahuje čas zahájení (startTs == null) nebo pokud poslední směna začala více než 20 hodin (ALLOWED_WORK_TIME + REQ_BREAK_TIME) dříve než aktuální jízda, je aktuální jízda první jízdou nové směny. V obou případech funkce zahájí novou směnu aktualizací počátečního času směny na počáteční čas aktuální jízdy, vydá řidiči zprávu s časem ukončení nové směny a zaregistruje časovač pro vyčištění stav do 24 hodin.

Pokud aktuální jízda není první jízdou nové směny, funkce zkontroluje, zda neporušuje regulaci pracovní doby, tj. Zda začala o více než 12 hodin později než začátek aktuální směny řidiče. V takovém případě funkce vydá zprávu, která informuje řidiče o porušení.

The processElement () metoda MonitorWorkTime funkce registruje časovač k vyčištění stavu 24 hodin po začátku směny. Odebrání stavu, který již není potřeba, je důležité, aby se zabránilo narůstající velikosti stavu kvůli netěsnému stavu. Časovač se spustí, když čas aplikace projde časovým razítkem časovače. V tomto bodě na časovači() metoda se nazývá. Podobně jako u stavu, časovače jsou udržovány na klíč a funkce je uvedena do kontextu přidruženého klíče před na časovači() metoda se nazývá. Proto je veškerý přístup ke stavu směrován na klíč, který byl aktivní při registraci časovače.

Pojďme se podívat na na časovači() metoda MonitorWorkTime.

@ Přepis

public void onTimer (

dlouhý časovač,

OnTimerContext ctx,

Kolektor out) vyvolá výjimku {

// odstranit stav posunu, pokud již nebyl zahájen nový posun.

Dlouhý startTs = shiftStart.value ();

if (startTs == timerTs - CLEAN_UP_INTERVAL) {

shiftStart.clear ();

  }

}

The processElement () metoda registruje časovače po dobu 24 hodin poté, co směna začala vyčistit stav, který již není potřeba. Vyčištění stavu je jedinou logikou, kterou na časovači() metoda implementuje. Když se spustí časovač, zkontrolujeme, zda řidič mezitím zahájil novou směnu, tj. Zda se změnil čas zahájení řazení. Pokud tomu tak není, zrušíme stav řazení řidiče.

$config[zx-auto] not found$config[zx-overlay] not found