Programování

Jak používat Redis pro zpracování streamů v reálném čase

Roshan Kumar je senior produktový manažer ve společnosti Redis Labs.

Přijímání datových proudů v reálném čase je běžným požadavkem pro mnoho případů použití velkých dat. V oborech, jako je IoT, elektronický obchod, bezpečnost, komunikace, zábava, finance a maloobchod, kde tolik záleží na včasném a přesném rozhodování založeném na datech, je sběr a analýza dat v reálném čase ve skutečnosti jádrem podnikání.

Sběr, ukládání a zpracování streamovaných dat ve velkých objemech a vysokou rychlostí však představuje architektonické výzvy. Důležitým prvním krokem při poskytování analýzy dat v reálném čase je zajištění toho, aby byly k dispozici adekvátní síťové, výpočetní, úložné a paměťové zdroje pro zachycení rychlých datových proudů. Softwarový zásobník společnosti však musí odpovídat výkonu její fyzické infrastruktury. V opačném případě budou podniky čelit obrovskému množství nevyřízených dat, v horším případě chybějícím nebo neúplným datům.

Redis se stala populární volbou pro takové rychlé scénáře příjmu dat. Odlehčená databázová platforma v paměti, Redis dosahuje propustnosti v milionech operací za sekundu s latencí pod milisekundami a přitom využívá minimální zdroje. Nabízí také jednoduché implementace, které umožňuje několik datových struktur a funkcí.

V tomto článku ukážu, jak může Redis Enterprise vyřešit běžné problémy spojené s příjmem a zpracováním velkých objemů dat s vysokou rychlostí. Projdeme si tři různé přístupy (včetně kódu) ke zpracování zdroje Twitter v reálném čase pomocí Redis Pub / Sub, Redis Lists a Redis Sorted Sets. Jak uvidíme, všechny tři metody hrají roli v rychlém příjmu dat, v závislosti na případu použití.

Výzvy při navrhování rychlých řešení pro příjem dat

Vysokorychlostní příjem dat často zahrnuje několik různých typů složitosti:

  • Velké objemy dat někdy přicházejí v nárazech. Bursty data vyžadují řešení, které je schopné zpracovávat velké objemy dat s minimální latencí. V ideálním případě by měl být schopen provádět miliony zápisů za sekundu s latencí milisekund za použití minimálních prostředků.
  • Data z více zdrojů. Řešení pro příjem dat musí být dostatečně flexibilní, aby zvládla data v mnoha různých formátech, v případě potřeby zachovala identitu zdroje a transformovala se nebo normalizovala v reálném čase.
  • Data, která je třeba filtrovat, analyzovat nebo předat dál. Většina řešení pro příjem dat má jednoho nebo více předplatitelů, kteří data konzumují. Často se jedná o různé aplikace, které fungují na stejném nebo odlišném místě s různou sadou předpokladů. V takových případech databáze nejen potřebuje transformovat data, ale také filtrovat nebo agregovat v závislosti na požadavcích náročných aplikací.
  • Data pocházející z geograficky distribuovaných zdrojů. V tomto scénáři je často vhodné distribuovat uzly pro sběr dat a umístit je blízko ke zdrojům. Samotné uzly se stávají součástí řešení rychlého přijímání dat, aby shromažďovaly, zpracovávaly, předávaly nebo přesměrovávaly přijímaná data.

Zpracování rychlého přijímání dat v Redis

Mnoho řešení podporujících rychlé přijímání dat je dnes složité, bohaté na funkce a přepracované pro jednoduché požadavky. Redis je naopak extrémně lehký, rychlý a snadno použitelný. Díky klientům dostupným ve více než 60 jazycích lze Redis snadno integrovat do oblíbených softwarových balíků.

Redis nabízí datové struktury, jako jsou seznamy, sady, seřazené sady a hash, které nabízejí jednoduché a všestranné zpracování dat. Redis poskytuje více než milion operací čtení / zápisu za sekundu s latencí milisekund na instanci komoditního cloudu se skromnou velikostí, díky čemuž je extrémně efektivní z hlediska zdrojů pro velké objemy dat. Redis také podporuje služby zasílání zpráv a klientské knihovny ve všech populárních programovacích jazycích, takže je vhodný pro kombinaci vysokorychlostního příjmu dat a analýzy v reálném čase. Příkazy Redis Pub / Sub umožňují hrát roli zprostředkovatele zpráv mezi vydavateli a předplatiteli, což je funkce často používaná k odesílání oznámení nebo zpráv mezi uzly příjmu distribuovaných dat.

Redis Enterprise vylepšuje Redis plynulým škálováním, stálou dostupností, automatickým nasazením a schopností využívat nákladově efektivní flash paměť jako rozšiřovač RAM, aby bylo možné nákladově efektivně provádět zpracování velkých datových sad.

V následujících částech načrtnu, jak používat Redis Enterprise k řešení běžných výzev při přijímání dat.

Redis rychlostí Twitteru

Abychom ilustrovali jednoduchost Redisu, prozkoumáme ukázkové rychlé řešení pro příjem dat, které shromažďuje zprávy z kanálu Twitter. Cílem tohoto řešení je zpracovávat tweety v reálném čase a tlačit je dolu, jak jsou zpracovávány.

Data Twitter přijatá řešením jsou poté spotřebována více procesory v řadě. Jak je znázorněno na obrázku 1, tento příklad se týká dvou procesorů - anglického Tweet Processoru a Influencer Processoru. Každý procesor tweety filtruje a předává je svými příslušnými kanály dalším spotřebitelům. Tento řetězec může jít tak daleko, jak to řešení vyžaduje. V našem příkladu se však zastavíme na třetí úrovni, kde agregujeme populární diskuse mezi mluvčími angličtiny a nejlepšími ovlivňujícími.

Redis Labs

Všimněte si, že používáme příklad zpracování twitterových kanálů z důvodu rychlosti příjmu dat a jednoduchosti. Upozorňujeme také, že data z Twitteru se dostávají k našemu rychlému příjmu dat prostřednictvím jediného kanálu. V mnoha případech, jako je IoT, může být více zdrojů dat odesílajících data do hlavního přijímače.

Existují tři možné způsoby, jak implementovat toto řešení pomocí Redis: ingest s Redis Pub / Sub, ingest s datovou strukturou List nebo ingest s datovou strukturou Seted Set. Podívejme se na každou z těchto možností.

Požití s ​​Redis Pub / Sub

Toto je nejjednodušší implementace rychlého příjmu dat. Toto řešení využívá funkci Redis Pub / Sub, která aplikacím umožňuje publikovat a odebírat zprávy. Jak je znázorněno na obrázku 2, každá fáze zpracovává data a publikuje je na kanál. Následující fáze se přihlásí k odběru kanálu a obdrží zprávy pro další zpracování nebo filtrování.

Redis Labs

Profesionálové

  • Snadná implementace.
  • Funguje dobře, když jsou zdroje dat a procesory geograficky distribuovány.

Nevýhody

  • Řešení vyžaduje, aby vydavatelé a předplatitelé byli neustále vzhůru. Předplatitelé ztratí data při zastavení nebo při ztrátě připojení.
  • Vyžaduje více připojení. Program nemůže publikovat a přihlásit se k odběru stejného připojení, takže každý zprostředkující datový procesor vyžaduje dvě připojení - jedno k přihlášení a jedno k publikování. Pokud používáte Redis na platformě DBaaS, je důležité ověřit, zda váš balíček nebo úroveň služby má nějaká omezení počtu připojení.

Poznámka k připojení

Pokud se více než jeden klient přihlásí k odběru kanálu, Redis posílá data každému klientovi lineárně, jeden po druhém. Velké datové zatížení dat a mnoho připojení může zavést latenci mezi vydavatelem a jeho předplatiteli. I když je výchozí pevný limit pro maximální počet připojení 10 000, musíte otestovat a porovnat, kolik připojení je vhodné pro vaše užitečné zatížení.

Redis udržuje výstupní vyrovnávací paměť klienta pro každého klienta. Výchozí limity pro výstupní vyrovnávací paměť klienta pro Pub / Sub jsou nastaveny jako:

klient-output-buffer-limit pubsub 32mb 8mb 60

S tímto nastavením Redis vynutí klienty, aby se odpojili za dvou podmínek: pokud výstupní vyrovnávací paměť roste nad 32 MB nebo pokud výstupní vyrovnávací paměť uchovává trvale 8 MB dat po dobu 60 sekund.

To jsou náznaky, že klienti data spotřebovávají pomaleji, než jsou publikována. Pokud by taková situace nastala, zkuste nejprve optimalizovat spotřebitele tak, aby při konzumaci dat nepřidávali latenci. Pokud si všimnete, že se vaši klienti stále odpojují, můžete zvýšit limity pro pubsub client-output-buffer-limit pubsub vlastnost v redis.conf. Pamatujte, že jakékoli změny nastavení mohou zvýšit latenci mezi vydavatelem a odběratelem. Jakékoli změny musí být důkladně otestovány a ověřeny.

Návrh kódu pro řešení Redis Pub / Sub

Redis Labs

Toto je nejjednodušší ze tří řešení popsaných v tomto článku. Zde jsou důležité třídy Java implementované pro toto řešení. Stáhněte si zdrojový kód s plnou implementací zde: //github.com/redislabsdemo/IngestPubSub.

The Odběratel třída je základní třídou tohoto designu. Každý Odběratel objekt udržuje nové spojení s Redis.

třída Subscriber rozšiřuje JedisPubSub implementuje Runnable {

soukromé jméno řetězce;

private RedisConnection conn = null;

soukromý Jedis jedis = null;

private String subscriberChannel;

public Subscriber (String subscriberName, String channelName) throws Exception {

name = subscriberName;

subscriberChannel = channelName;

Vlákno t = nové vlákno (toto);

t.start ();

       }

@ Přepis

public void run () {

Snaž se{

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

while (true) {

jedis.subscribe (this, this.subscriberChannel);

                      }

} úlovek (výjimka e) {

e.printStackTrace ();

              }

       }

@ Přepis

public void onMessage (řetězec kanál, řetězec zpráva) {

super.onMessage (kanál, zpráva);

       }

}

The Vydavatel třída udržuje samostatné připojení k Redis pro publikování zpráv na kanál.

public class Publisher {

RedisConnection conn = null;

Jedis jedis = null;

soukromý řetězec kanál;

public Publisher (String channelName) vyvolá výjimku {

channel = channelName;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

public void publish (String msg) throws Exception {

jedis.publish (kanál, zpráva);

       }

}

The EnglishTweetFilter, InfluencerTweetFilter, HashTagCollector, a Sběratel vlivů filtry se prodlužují Odběratel, což jim umožňuje poslouchat příchozí kanály. Protože pro odběr a publikování potřebujete samostatná připojení Redis, každá třída filtru má vlastní RedisConnection objekt. Filtry poslouchají nové zprávy na jejich kanálech ve smyčce. Zde je ukázkový kód EnglishTweetFilter třída:

public class EnglishTweetFilter rozšiřuje Subscriber

{

private RedisConnection conn = null;

soukromý Jedis jedis = null;

private String publisherChannel = null;

public EnglishTweetFilter (název řetězce, řetězec subscriberChannel, řetězec publisherChannel) vyvolá výjimku {

super (name, subscriberChannel);

this.publisherChannel = publisherChannel;

conn = RedisConnection.getRedisConnection ();

jedis = conn.getJedis ();

       }

@ Přepis

public void onMessage (String subscriberChannel, String message) {

JsonParser jsonParser = nový JsonParser ();

JsonElement jsonElement = jsonParser.parse (zpráva);

JsonObject jsonObject = jsonElement.getAsJsonObject ();

// filtrovat zprávy: publikovat pouze anglické tweety

if (jsonObject.get (“lang”)! = null &&

jsonObject.get („lang“). getAsString (). equals („en“)) {

jedis.publish (publisherChannel, zpráva);

              }

       }

}

The Vydavatel třída má metodu publikování, která publikuje zprávy na požadovaný kanál.

public class Publisher {

.

.     

public void publish (String msg) throws Exception {

jedis.publish (kanál, zpráva);

       }

.

}

Hlavní třída čte data z ingestového proudu a odesílá je do Všechny data kanál. Hlavní metoda této třídy spouští všechny objekty filtru.

veřejná třída IngestPubSub

{

.

public void start () vyvolá výjimku {

       .

       .

vydavatel = nový vydavatel („AllData“);

englishFilter = nový EnglishTweetFilter („Anglický filtr“, „AllData“,

„EnglishTweets“);

influencerFilter = nový InfluencerTweetFilter („Filtr vlivu“,

„AllData“, „InfluencerTweets“);

hashtagCollector = nový HashTagCollector („sběratel Hashtag“,

„EnglishTweets“);

influencerCollector = nový InfluencerCollector („Influencer Collector“,

„InfluencerTweets“);

       .

       .

}

Prohlédnout se seznamy Redis

Datová struktura Seznam v Redisu usnadňuje a zjednodušuje implementaci řešení pro řazení do fronty. V tomto řešení producent posílá každou zprávu do zadní části fronty a odběratel dotazuje frontu a stahuje nové zprávy z druhého konce.

Redis Labs

Profesionálové

  • Tato metoda je spolehlivá v případě ztráty připojení. Jakmile jsou data vložena do seznamů, jsou tam uchována, dokud si je předplatitelé nepřečtou. To platí, i když jsou předplatitelé zastaveni nebo ztratí spojení se serverem Redis.
  • Výrobci a spotřebitelé mezi nimi nevyžadují žádné spojení.

Nevýhody

  • Jakmile jsou data stažena ze seznamu, jsou odstraněna a nelze je znovu načíst. Pokud spotřebitelé data nepřetrvávají, jsou ztracena, jakmile jsou spotřebována.
  • Každý spotřebitel vyžaduje samostatnou frontu, která vyžaduje uložení více kopií dat.

Návrh kódu pro řešení Redis Lists

Redis Labs

Zdrojový kód pro řešení Redis Lists si můžete stáhnout zde: //github.com/redislabsdemo/IngestList. Níže jsou vysvětleny hlavní třídy tohoto řešení.

Seznam zpráv vloží datovou strukturu seznamu Redis. The tam() metoda posune novou zprávu nalevo od fronty a pop () čeká na novou zprávu zprava, pokud je fronta prázdná.

veřejná třída MessageList {

protected String name = “MyList”; // Název

.

.     

public void push (String msg) throws Exception {

jedis.lpush (name, msg); // Levý stisk

       }

public String pop () vyvolá výjimku {

vrátit jedis.brpop (0, name) .toString ();

       }

.

.

}

MessageListener je abstraktní třída, která implementuje logiku posluchače a vydavatele. A MessageListener objekt poslouchá pouze jeden seznam, ale může publikovat na více kanálů (MessageFilter předměty). Toto řešení vyžaduje samostatný MessageFilter objekt pro každého předplatitele v potrubí.

třída MessageListener implementuje Runnable {

private String name = null;

soukromý MessageList inboundList = null;

Map outBoundMsgFilters = new HashMap ();

.

.     

public void registerOutBoundMessageList (MessageFilter msgFilter) {

if (msgFilter! = null) {

if (outBoundMsgFilters.get (msgFilter.name) == null) {

outBoundMsgFilters.put (msgFilter.name, msgFilter);

                      }

              }

       }

.

.

@ Přepis

public void run () {

.

while (true) {

Řetězcová zpráva = inboundList.pop ();

processMessage (msg);

                      }                                  

.

       }

.

Protected Void pushMessage (String msg) throws Exception {

Set outBoundMsgNames = outBoundMsgFilters.keySet ();

pro (název řetězce: outBoundMsgNames) {

MessageFilter msgList = outBoundMsgFilters.get (name);

msgList.filterAndPush (msg);

              }

       }

}

MessageFilter je mateřská třída usnadňující filterAndPush () metoda. Protože data procházejí systémem ingest, jsou před odesláním do další fáze často filtrována nebo transformována. Třídy, které rozšiřují MessageFilter třída přepsat filterAndPush () metodu a implementovat vlastní logiku, která posune filtrovanou zprávu na další seznam.

veřejná třída MessageFilter {

MessageList messageList = null;

.

.

public void filterAndPush (String msg) throws Exception {

messageList.push (msg);

       }

.

.     

}

AllTweetsListener je ukázková implementace a MessageListener třída. Tím se poslouchají všechny tweety na webu Všechny data kanálu a data publikuje na EnglishTweetsFilter a Filtr vlivu.

veřejná třída AllTweetsListener rozšiřuje MessageListener {

.

.     

public static void main (String [] args) vyvolá výjimku {

MessageListener allTweetsProcessor = AllTweetsListener.getInstance ();

allTweetsProcessor.registerOutBoundMessageList (nový

EnglishTweetsFilter („EnglishTweetsFilter“, „EnglishTweets“));

allTweetsProcessor.registerOutBoundMessageList (nový

InfluencerFilter („InfluencerFilter“, „Influencer“));

allTweetsProcessor.start ();

       }

.

.

}

EnglishTweetsFilter rozšiřuje MessageFilter. Tato třída implementuje logiku pro výběr pouze těch tweetů, které jsou označeny jako anglické tweety. Filtr zahodí neanglické tweety a posune anglické tweety na další seznam.

public class EnglishTweetsFilter rozšiřuje MessageFilter {

public EnglishTweetsFilter (String name, String listName) vyvolá výjimku {

super (jméno, název seznamu);

       }

@ Přepis

public void filterAndPush (String message) vyvolá výjimku {

JsonParser jsonParser = nový JsonParser ();

JsonElement jsonElement = jsonParser.parse (zpráva);

JsonArray jsonArray = jsonElement.getAsJsonArray ();

JsonObject jsonObject = jsonArray.get (1) .getAsJsonObject ();

if (jsonObject.get (“lang”)! = null &&

jsonObject.get („lang“). getAsString (). equals („en“)) {

Jedis jedis = super.getJedisInstance ();

if (jedis! = null) {

jedis.lpush (super.name, jsonObject.toString ());

                             }

              }

       }

}

$config[zx-auto] not found$config[zx-overlay] not found