Wpis z mikrobloga

Cześć, dostałem mały projekt w pracy w pysparku. Mała transformacja danych i wrzucenie wyników do nowej tabeli. W punktach opiszę co trzeba zrobić:
1. 4 uniony
2. Pobrać tabelę z mappingiem
3. Zrobić joina (te 4 uniony i mapping)
5. Przerobić 2 kolumny na podstawie wartości, i dodać dwie puste kolumny
4. Wykonać prostą agregacje danych -> groupby po 5 kolumnach.
6. Wrzucić wyniki do nowej tabeli

Tabela z unionami ma łącznie 7k rekordów. (7k wierszy i 7 kolumn)
Tabela z mappingiem max 300 wierszy
Jednak cały czas mam problem z pamięcią, GC czy timeoutami. Próbowałem zwiększać limity pamięci jednak nic nie pomaga. Przeważnie dostaje error: gc overhead limit exceeded. Troche dziwne bo to jest raptowanie 7k rekordów, po agregacji ma wyjść ~~3k. Ktoś wie jak to rozwiązać?

#pyspark #spark #dataengineering #hadoop
#programowanie #python #scala
  • 20
@MerytorycznieNiepoprawny: A jesteś pewien, że dobrze joina robisz? Bardzo trudno jest cokolwiek podpowiedzieć na bazie szczątkowych informacji, które podałeś ale:

1. Przede wszsytkim dobrze już koledzy napisali: ogarnij to w pandasie - ale rozumiem, ze polityka firmy. Niemniej jednak możesz ogarnąć sobie: "toPandas()" i jedź ze wszystkim na pojedynczym nodzie w tedy.
2. Sprawdz joina. Nie masz multiplikowania wierszy? Joinujesz się po unikalnych kluczach?
3. Jaką masz configuracje clustra? Masz jakiś
@inny_89: 1. Ok, postaram się. Jednak chciałbym dowiedzieć się co robię źle.
2. Zrobie left joina po jednej kolumnie:
dfjoined = resultdf.join(dfmapping, resultdf['bill'].cast('int') == df_mapping['stp'], how='left')
3. Tego nie wiem, nie ja zarządzam architekturą
4. GC wyrzuca albo po agregacji jak chciałem zrobić df.count() na tabeli zagregowanej, lub przy inserInto jak chciałem wrzucić dataframe do tabeli SQL (jeżeli nie robie counta to zatrzymuje się na insercie, jak
@PiotrokeJ a weź zrób count() na resultdf oraz na dfmapping przed tym jointem i na dfjoined od razu po joinie. I podaj nam wyniki.

Tam castujesz też do int jakaś kolumnę. Masz pewność, że nie ucinasz tym sposobem jakichś zer wiodących itp?

Df
mapping['stp'] to kolumna typu int i jest unikalna? Zrob na tej kolumnie count i distinct count i podaj wyniki.

Zrób printSchema dla tych dwóch dataFrame, które joinujesz.
@inny_89: @LollyPoop: @ostrykuc666: Przerzuciłem wszystko na sql, jednak cały czas otrzymuje błędy podczas wykonywania insertInto do tabeli. "An error occured while calling o73.insertInto. Job aborted due to stage failure: task 0 i insta 83.0 failed 4 times, most recent failure lost task 0.3 in stage 83.0 TID xxx. java.lang.IndexOutOfBoundsException: toIndex =212". Jakieś pomysły na to? Łącze 4 tabele unionem i ma wyjść 9k rekordów
@PiotrokeJ sprawdź czy przypadkiem nie masz pustego tego dfa, którego zapisujesz.

Sprawdź scheme wszystkich df które joinujesz lub robisz uniona
. Ten błąd przy zapisie dotyczy bezpośrednio danych samych w sobie.
Albo Ci brakuje jakichś kolumn albo coś podobnego.
@inny_89: No w jakis dziwny sposób kwerenta działa w Ambari a przez pysparka juz nie działa. Ostatnia część uniona nie działa tak jak by nie było rekordów w tabeli?

Jednak teraz mam problem z kwerendą agregującą. Wszystko mi działa w Ambari - cała kwerenda wypluwa fajną tabelę jednak wrzucam to do pysparka i gówno. Wywala mi błąd. "Aggregate functions are not allowed in GROUP BY, but found sum(sap.cases). Masz pomysł co
@PiotrokeJ pewnie Mirek. Znajdziemy rozwiązanie ale musiałbym zobaczyć dokładny kod chociaż tego zapytania SQL które odpalasz w pysparku.

Masz to gdzieś na repo i możesz udostępnić? A jak nie to chociaż wrzuć na jakiś pastebin i podeślij.
@PiotrokeJ tak strzelam, że na 95% jest tak, że nie wpisałeś wszystkich pól z selecta, których używasz i group by robisz tylko po jakimś wybranym a używasz właśnie funkcji agregujących.

Robisz coś na wzór?:

Select field1, field1, field3, sum(SAP.cases), field3
From table as SAP
Group by field3
@PiotrokeJ: Co do tych case when kolumn, które wypisujesz w group by to łątwiej Ci będzie ogarnąć co się dzieje w kodzie jeśli napiszesz to z sub-query albo wyrażeniem CTE.
Sprawdz to:
https://pastebin.com/HZF4BF2G

Powinno zadziałać zamiast Twojego zapytania. Wklej do swojego kodu.

Jak nie zaskoczy to znaczy, ze słabo mi się myśli po północy i pomyślimy nad tym jutro :P
@inny_89: Dzięki, jedno i drugie działa :) Teraz pytanie z innej beczki, wiesz dlaczego w Apache Ambari kiedy puszczam zapytanie dostaje wyniki, jak puszę identyczne zapytanie przez pysparka i dam np. df.count() albo chce wrzucć dataframe do nowej tabeli do dostaje error An error occured while calling o73.count. Job aborted due to stage failure: task 0 i insta 83.0 failed 4 times, most recent failure lost task 0.3 in stage 83.0
Apache Ambari


@PiotrokeJ: To już jest kwestia samego zarządzania (wybacz ale nie mam pojęcia jak to inaczej ubrać w słowa) translacją samego SQL na MapReduce na Hadoopie.

W tym przypadku Hadoop trochę inaczej ogarnia takie zapytanie niż spark. Dlatego jak odpalasz coś bezpośrednio na hadoopie to możesz dostać wyniki (błędne lub dobre), a w sparku już nie.

W bardzo prostych słowach i bardzo ogólnie (pomijając wszelkie niuanse) wytłumaczono to tutaj:
https://searchdatamanagement.techtarget.com/definition/SQL-on-Hadoop