Како имплементирати стримовање података у реалном времену у Питхон-у

Kako Implementirati Strimovane Podataka U Realnom Vremenu U Pithon U



Овладавање имплементацијом стримовања података у реалном времену у Питхон-у делује као суштинска вештина у данашњем свету који укључује податке. Овај водич истражује основне кораке и основне алате за коришћење стримовања података у реалном времену са аутентичношћу у Питхон-у. Од одабира одговарајућег оквира као што је Апацхе Кафка или Апацхе Пулсар до писања Питхон кода за лаку потрошњу података, обраду и ефикасну визуализацију, стећи ћемо потребне вештине за изградњу агилних и ефикасних канала података у реалном времену.

Пример 1: Имплементација стримовања података у реалном времену у Питхон-у

Имплементација стримовања података у реалном времену у Питхон-у је кључна у данашњем добу и свету заснованом на подацима. У овом детаљном примеру ћемо проћи кроз процес изградње система за стриминг података у реалном времену користећи Апацхе Кафка и Питхон у Гоогле Цолаб-у.







Да бисмо иницијализовали пример пре него што почнемо да кодирамо, неопходна је изградња специфичног окружења у Гоогле Цолаб-у. Прво што треба да урадимо је да инсталирамо потребне библиотеке. Користимо библиотеку „кафка-питхон“ за Кафка интеграцију.



! пип инсталирај кафка-питхон


Ова команда инсталира библиотеку „кафка-питхон“ која обезбеђује Питхон функције и везе за Апацхе Кафку. Затим увозимо потребне библиотеке за наш пројекат. Увоз потребних библиотека укључујући „КафкаПродуцер“ и „КафкаЦонсумер“ су класе из библиотеке „кафка-питхон“ које нам омогућавају интеракцију са Кафка брокерима. ЈСОН је Питхон библиотека за рад са ЈСОН подацима које користимо за серијализацију и десеријализацију порука.



од кафка импорт КафкаПродуцер, КафкаЦонсумер
импорт јсон


Стварање Кафкиног продуцента





Ово је важно јер Кафка произвођач шаље податке Кафкиној теми. У нашем примеру, креирамо произвођача да пошаље симулиране податке у реалном времену на тему која се зове „тема у реалном времену“.

Креирамо инстанцу „КафкаПродуцер“ која наводи адресу Кафка брокера као „лоцалхост:9092“. Затим користимо „валуе_сериализатор“, функцију која серијализује податке пре него што их пошаље Кафки. У нашем случају, ламбда функција кодира податке као ЈСОН кодиран УТФ-8. Сада, хајде да симулирамо неке податке у реалном времену и пошаљемо их на тему Кафке.



произвођач = КафкаПродуцер ( боотстрап_серверс = 'лоцалхост:9092' ,
валуе_сериализатор =ламбда в: јсон.думпс ( ин ) .енцоде ( 'утф-8' ) )
# Симулирани подаци у реалном времену
подаци = { 'сенсор_ид' : 1 , 'температура' : 25.5 , 'влажност' : 60.2 }
# Слање података у тему
произвођач.пошаљи ( 'тема у реалном времену' , подаци )


У овим редовима дефинишемо речник „података“ који представља симулиране податке сензора. Затим користимо методу „пошаљи“ да објавимо ове податке у „теми у реалном времену“.

Затим желимо да креирамо Кафка потрошача, а Кафка потрошач чита податке из Кафкине теме. Креирамо потрошача да конзумира и обрађује поруке у „теми у реалном времену“. Креирамо инстанцу „КафкаЦонсумер“, наводећи тему коју желимо да користимо, нпр. (тема у реалном времену) и адресу Кафка брокера. Затим, „валуе_десериализер“ је функција која десеријализује податке који се примају од Кафке. У нашем случају, ламбда функција декодира податке као ЈСОН кодиран УТФ-8.

потрошач = КафкаЦонсумер ( 'тема у реалном времену' ,
боотстрап_серверс = 'лоцалхост:9092' ,
валуе_десериализер =ламбда к: јсон.лоадс ( к.децоде ( 'утф-8' ) ) )


Користимо итеративну петљу да континуирано конзумирамо и обрађујемо поруке из теме.

# Читање и обрада података у реалном времену
за порука ин потрошач:
подаци = порука.вредност
принт ( ф „Примљени подаци: {дата}“ )


Ми преузимамо вредност сваке поруке и наше симулиране податке сензора унутар петље и штампамо их на конзоли. Покретање Кафка произвођача и потрошача укључује покретање овог кода у Гоогле Цолаб-у и појединачно извршавање ћелија кода. Произвођач шаље симулиране податке Кафкиној теми, а потрошач чита и штампа примљене податке.


Анализа излаза током рада кода

Посматраћемо податке у реалном времену који се производе и троше. Формат података може да варира у зависности од наше симулације или стварног извора података. У овом детаљном примеру покривамо цео процес постављања система за стриминг података у реалном времену користећи Апацхе Кафка и Питхон у Гоогле Цолаб-у. Објаснићемо сваку линију кода и њен значај у изградњи овог система. Стреам података у реалном времену је моћна могућност, а овај пример служи као основа за сложеније апликације у стварном свету.

Пример 2: Имплементација стримовања података у реалном времену у Питхон-у помоћу података са берзе

Хајде да урадимо још један јединствен пример имплементације стримовања података у реалном времену у Питхон-у користећи другачији сценарио; овог пута ћемо се фокусирати на податке о берзи. Креирамо систем за стриминг података у реалном времену који бележи промене цена акција и обрађује их користећи Апацхе Кафка и Питхон у Гоогле Цолаб-у. Као што је показано у претходном примеру, почињемо са конфигурисањем нашег окружења у Гоогле Цолаб-у. Прво, инсталирамо потребне библиотеке:

! пип инсталирај кафка-питхон ифинанце


Овде додајемо библиотеку „ифинанце“ која нам омогућава да добијемо податке о берзи у реалном времену. Затим увозимо потребне библиотеке. Настављамо да користимо класе „КафкаПродуцер“ и „КафкаЦонсумер“ из библиотеке „кафка-питхон“ за Кафка интеракцију. Увозимо ЈСОН да бисмо радили са ЈСОН подацима. Такође користимо „ифинанце“ за добијање података о берзи у реалном времену. Такође увозимо библиотеку „време“ да бисмо додали временско одлагање за симулацију ажурирања у реалном времену.

од кафка импорт КафкаПродуцер, КафкаЦонсумер
импорт јсон
импорт ифинанце као иф
увоз време


Сада креирамо Кафка произвођача за податке о залихама. Наш Кафка произвођач добија податке о залихама у реалном времену и шаље их Кафкиној теми под називом „цена акција“.

произвођач = КафкаПродуцер ( боотстрап_серверс = 'лоцалхост:9092' ,
валуе_сериализатор =ламбда в: јсон.думпс ( ин ) .енцоде ( 'утф-8' ) )

док Истина:
залиха = иф.Тицкер ( 'ААПЛ' ) # Пример: акције компаније Аппле Инц
стоцк_дата = стоцк.хистори ( раздобље = '1д' )
последња_цена = подаци о залихама [ 'Близу' ] .илоц [ - 1 ]
подаци = { 'симбол' : 'ААПЛ' , 'Цена' : Последња цена }
произвођач.пошаљи ( 'Цена акција' , подаци )
време.спавање ( 10 ) # Симулирајте ажурирања у реалном времену сваких 10 секунди


Креирамо инстанцу „КафкаПродуцер“ са адресом Кафка брокера у овом коду. Унутар петље користимо „ифинанце“ да бисмо добили најновију цену акција за Аппле Инц. („ААПЛ“). Затим издвајамо последњу цену на затварању и шаљемо је на тему „цена акција“. На крају, уводимо временско одлагање да симулирамо ажурирања у реалном времену сваких 10 секунди.

Хајде да креирамо Кафка потрошача да чита и обрађује податке о ценама акција из теме „цена акција“.

потрошач = КафкаЦонсумер ( 'Цена акција' ,
боотстрап_серверс = 'лоцалхост:9092' ,
валуе_десериализер =ламбда к: јсон.лоадс ( к.децоде ( 'утф-8' ) ) )

за порука ин потрошач:
стоцк_дата = порука.вредност
принт ( ф „Примљени подаци о залихама: {стоцк_дата['симбол']} - Цена: {стоцк_дата['прице']}' )


Овај код је сличан корисничком подешавању претходног примера. Континуирано чита и обрађује поруке из теме „цена акција“ и штампа симбол акције и цену на конзоли. Ми извршавамо ћелије кода секвенцијално, на пример, једну по једну у Гоогле Цолаб-у да бисмо покренули произвођача и потрошача. Произвођач добија и шаље ажуриране цене акција у реалном времену док потрошач чита и приказује ове податке.

! пип инсталирај кафка-питхон ифинанце
од кафка импорт КафкаПродуцер, КафкаЦонсумер
импорт јсон
импорт ифинанце као иф
увоз време
произвођач = КафкаПродуцер ( боотстрап_серверс = 'лоцалхост:9092' ,
валуе_сериализатор =ламбда в: јсон.думпс ( ин ) .енцоде ( 'утф-8' ) )

док Истина:
залиха = иф.Тицкер ( 'ААПЛ' ) # Акције Аппле Инц
стоцк_дата = стоцк.хистори ( раздобље = '1д' )
последња_цена = подаци о залихама [ 'Близу' ] .илоц [ - 1 ]

подаци = { 'симбол' : 'ААПЛ' , 'Цена' : Последња цена }

произвођач.пошаљи ( 'Цена акција' , подаци )

време.спавање ( 10 ) # Симулирајте ажурирања у реалном времену сваких 10 секунди
потрошач = КафкаЦонсумер ( 'Цена акција' ,
боотстрап_серверс = 'лоцалхост:9092' ,
валуе_десериализер =ламбда к: јсон.лоадс ( к.децоде ( 'утф-8' ) ) )

за порука ин потрошач:
стоцк_дата = порука.вредност
принт ( ф „Примљени подаци о залихама: {стоцк_дата['симбол']} - Цена: {стоцк_дата['прице']}' )


У анализи излаза након покретања кода, посматраћемо ажурирања цена акција за Аппле Инц. у реалном времену која се производе и конзумирају.

Закључак

У овом јединственом примеру, демонстрирали смо имплементацију стримовања података у реалном времену у Питхон-у користећи Апацхе Кафка и библиотеку „ифинанце“ за прикупљање и обраду података са берзе. Детаљно смо објаснили сваки ред кода. Стримовање података у реалном времену може се применити на различита поља за изградњу апликација из стварног света у финансијама, Интернету ствари и још много тога.