Aktywne Wpisy
urarthone +111
News dnia to...
- Zginął rekordzista w maratonie 16.1% (211)
- Piłkarza trafił piorun 17.2% (225)
- Rozegrano finał futbolu amerykańskiego 1.6% (21)
- Pierwsza ofiara ospy alaskańskiej 2.9% (38)
- Koniec malowania twarzy w telewizyjnym show 25.3% (331)
- Rolnicy wysypali ukraińskie zboże 6.0% (79)
- Malcolm będzie jeździł a NASCAR 4.6% (60)
- Będą pierwsze powietrzne taksówki 9.9% (129)
- Kary za fotografowanie zabytków 4.6% (60)
- Wypadki w pracy, dane GUS 11.8% (155)
agareas +2
1. 4 uniony
2. Pobrać tabelę z mappingiem
3. Zrobić joina (te 4 uniony i mapping)
5. Przerobić 2 kolumny na podstawie wartości, i dodać dwie puste kolumny
4. Wykonać prostą agregacje danych -> groupby po 5 kolumnach.
6. Wrzucić wyniki do nowej tabeli
Tabela z unionami ma łącznie 7k rekordów. (7k wierszy i 7 kolumn)
Tabela z mappingiem max 300 wierszy
Jednak cały czas mam problem z pamięcią, GC czy timeoutami. Próbowałem zwiększać limity pamięci jednak nic nie pomaga. Przeważnie dostaje error: gc overhead limit exceeded. Troche dziwne bo to jest raptowanie 7k rekordów, po agregacji ma wyjść ~~3k. Ktoś wie jak to rozwiązać?
#pyspark #spark #dataengineering #hadoop
#programowanie #python #scala
Komentarz usunięty przez autora
1. Przede wszsytkim dobrze już koledzy napisali: ogarnij to w pandasie - ale rozumiem, ze polityka firmy. Niemniej jednak możesz ogarnąć sobie: "toPandas()" i jedź ze wszystkim na pojedynczym nodzie w tedy.
2. Sprawdz joina. Nie masz multiplikowania wierszy? Joinujesz się po unikalnych kluczach?
3. Jaką masz configuracje clustra? Masz jakiś
2. Zrobie left joina po jednej kolumnie:
dfjoined = resultdf.join(dfmapping, resultdf['bill'].cast('int') == df_mapping['stp'], how='left')
3. Tego nie wiem, nie ja zarządzam architekturą
4. GC wyrzuca albo po agregacji jak chciałem zrobić df.count() na tabeli zagregowanej, lub przy inserInto jak chciałem wrzucić dataframe do tabeli SQL (jeżeli nie robie counta to zatrzymuje się na insercie, jak
Tam castujesz też do int jakaś kolumnę. Masz pewność, że nie ucinasz tym sposobem jakichś zer wiodących itp?
Dfmapping['stp'] to kolumna typu int i jest unikalna? Zrob na tej kolumnie count i distinct count i podaj wyniki.
Zrób printSchema dla tych dwóch dataFrame, które joinujesz.
Sprawdź scheme wszystkich df które joinujesz lub robisz uniona
. Ten błąd przy zapisie dotyczy bezpośrednio danych samych w sobie.
Albo Ci brakuje jakichś kolumn albo coś podobnego.
Jednak teraz mam problem z kwerendą agregującą. Wszystko mi działa w Ambari - cała kwerenda wypluwa fajną tabelę jednak wrzucam to do pysparka i gówno. Wywala mi błąd. "Aggregate functions are not allowed in GROUP BY, but found sum(sap.cases). Masz pomysł co
Masz to gdzieś na repo i możesz udostępnić? A jak nie to chociaż wrzuć na jakiś pastebin i podeślij.
Robisz coś na wzór?:
Select field1, field1, field3, sum(SAP.cases), field3
From table as SAP
Group by field3
co do kodu to: df = spark.sql(tutajkwerendaz_linku).cache().
Masz jak to teraz sprawdzić?
Sprawdz to:
https://pastebin.com/HZF4BF2G
Powinno zadziałać zamiast Twojego zapytania. Wklej do swojego kodu.
Jak nie zaskoczy to znaczy, ze słabo mi się myśli po północy i pomyślimy nad tym jutro :P
@PiotrokeJ: To już jest kwestia samego zarządzania (wybacz ale nie mam pojęcia jak to inaczej ubrać w słowa) translacją samego SQL na MapReduce na Hadoopie.
W tym przypadku Hadoop trochę inaczej ogarnia takie zapytanie niż spark. Dlatego jak odpalasz coś bezpośrednio na hadoopie to możesz dostać wyniki (błędne lub dobre), a w sparku już nie.
W bardzo prostych słowach i bardzo ogólnie (pomijając wszelkie niuanse) wytłumaczono to tutaj:
https://searchdatamanagement.techtarget.com/definition/SQL-on-Hadoop