Scaling to large datasets
pandas 는 메모리 데이터셋보다 더욱 크고 다루기 까다로운 인메모리 분석을 위한 데이터 구조들을 제공합니다(series, dataframe 등). 가끔 pandas의 함수들 중에서는 데이터 셋의 복사본을 만들어서 연산을 진행하기 때문에 다룰 수 있는 규모의 데이터들도 다루기 어려워 질 때도 있습니다.
이 문서에서는 큰 규모의 데이터셋을 다루는 몇몇 방법을 추천할 예정입니다. 해당 문서는 분석 속도를 높이고, 메모리사이즈에 맞는 데이터셋에 집중한 Enhancing performance의 내용에서 추가적으로 보완하는 문서입니다.
그렇지만 먼저, (빅데이터를 다루는데에) pandas 를 사용하지 않는 것 또한 하나의 방법이 될 수 있다는 점을 기억해주세요, pandas 는 모든 상황에 적합한 툴은 아닙니다. 여러분이 PostgreSQL 처럼 엄청 큰 데이터 셋을 다루는 것이 아니라면 pandas가 적합하겠지만, 과도하게 큰 데이터셋에 pandas가 만능은 아니라는 것입니다. 그저여기에서는 여러분이 pandas 로 빅데이터를 다루는 방법을 알고 싶어한다는 것을 전제로 알려드리는 겁니다. 가시죵!
In [1]: import pandas as pd
In [2]: import numpy as np
Load less data
디스크 내의 raw dataset에 많은 columns을 가지고 있다고 가정해봅시다.
id_0 name_0 x_0 y_0 id_1 name_1 x_1 ... name_8 x_8 y_8 id_9 name_9 x_9 y_9
timestamp ...
2000-01-01 00:00:00 1015 Michael -0.399453 0.095427 994 Frank -0.176842 ... Dan -0.315310 0.713892 1025 Victor -0.135779 0.346801
2000-01-01 00:01:00 969 Patricia 0.650773 -0.874275 1003 Laura 0.459153 ... Ursula 0.913244 -0.630308 1047 Wendy -0.886285 0.035852
2000-01-01 00:02:00 1016 Victor -0.721465 -0.584710 1046 Michael 0.524994 ... Ray -0.656593 0.692568 1064 Yvonne 0.070426 0.432047
2000-01-01 00:03:00 939 Alice -0.746004 -0.908008 996 Ingrid -0.414523 ... Jerry -0.958994 0.608210 978 Wendy 0.855949 -0.648988
2000-01-01 00:04:00 1017 Dan 0.919451 -0.803504 1048 Jerry -0.569235 ... Frank -0.577022 -0.409088 994 Bob -0.270132 0.335176
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
2000-12-30 23:56:00 999 Tim 0.162578 0.512817 973 Kevin -0.403352 ... Tim -0.380415 0.008097 1041 Charlie 0.191477 -0.599519
2000-12-30 23:57:00 970 Laura -0.433586 -0.600289 958 Oliver -0.966577 ... Zelda 0.971274 0.402032 1038 Ursula 0.574016 -0.930992
2000-12-30 23:58:00 1065 Edith 0.232211 -0.454540 971 Tim 0.158484 ... Alice -0.222079 -0.919274 1022 Dan 0.031345 -0.657755
2000-12-30 23:59:00 1019 Ingrid 0.322208 -0.615974 981 Hannah 0.607517 ... Sarah -0.424440 -0.117274 990 George -0.375530 0.563312
2000-12-31 00:00:00 937 Ursula -0.906523 0.943178 1018 Alice -0.564513 ... Jerry 0.236837 0.807650 985 Oliver 0.777642 0.783392
[525601 rows x 40 columns]
우리가 원하는 열만 가져오기 위해서는 2가지 옵션이 있습니다. 옵션1은 모든 데이터를 로드하고, 그 이후 필요한 것만 필터링하는 방법입니다.
In [3]: columns = ["id_0", "name_0", "x_0", "y_0"]
In [4]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[4]:
id_0 name_0 x_0 y_0
timestamp
2000-01-01 00:00:00 1015 Michael -0.399453 0.095427
2000-01-01 00:01:00 969 Patricia 0.650773 -0.874275
2000-01-01 00:02:00 1016 Victor -0.721465 -0.584710
2000-01-01 00:03:00 939 Alice -0.746004 -0.908008
2000-01-01 00:04:00 1017 Dan 0.919451 -0.803504
... ... ... ... ...
2000-12-30 23:56:00 999 Tim 0.162578 0.512817
2000-12-30 23:57:00 970 Laura -0.433586 -0.600289
2000-12-30 23:58:00 1065 Edith 0.232211 -0.454540
2000-12-30 23:59:00 1019 Ingrid 0.322208 -0.615974
2000-12-31 00:00:00 937 Ursula -0.906523 0.943178
[525601 rows x 4 columns]
옵션2는 우리가 필요한 columns만 요청하는 겁니다.
In [5]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
Out[5]:
id_0 name_0 x_0 y_0
timestamp
2000-01-01 00:00:00 1015 Michael -0.399453 0.095427
2000-01-01 00:01:00 969 Patricia 0.650773 -0.874275
2000-01-01 00:02:00 1016 Victor -0.721465 -0.584710
2000-01-01 00:03:00 939 Alice -0.746004 -0.908008
2000-01-01 00:04:00 1017 Dan 0.919451 -0.803504
... ... ... ... ...
2000-12-30 23:56:00 999 Tim 0.162578 0.512817
2000-12-30 23:57:00 970 Laura -0.433586 -0.600289
2000-12-30 23:58:00 1065 Edith 0.232211 -0.454540
2000-12-30 23:59:00 1019 Ingrid 0.322208 -0.615974
2000-12-31 00:00:00 937 Ursula -0.906523 0.943178
[525601 rows x 4 columns]
만일 우리가 두가지 옵션에서 메모리 사용량을 측정한다면 같은 함수에서 columns 옵션을 사용한 옵션 2가 1/10의 메모리를 사용합니다.
pandas.read_csv() 함수에서는 usecols 옵션을 사용해서 우리가 읽고 싶은 columns만 메모리를 사용해서 읽을 수 있습니다. 하지만 모든 파일포팻에서 특정 columns만 추출 할 수 있는 것은 아닙니다.
Use efficient datatypes
pandas에서 제공하는 기본 데이터 타입은 메모리에 최적화 된 타입은 아닙니다. 특히 이러한 특징은 상대적으로 적은 unique 값을 가진 text data columns에서 더욱 돋보입니다(보통 low-cardinality data라고 부릅니다). 하지만 효율적인 데이터 타입을 이용하므로서 데이터 셋을 우리의 메모리 자원 안에서 다룰 수 있습니다.
In [6]: ts = pd.read_parquet("timeseries.parquet")
In [7]: ts
Out[7]:
id name x y
timestamp
2000-01-01 00:00:00 1029 Michael 0.278837 0.247932
2000-01-01 00:00:30 1010 Patricia 0.077144 0.490260
2000-01-01 00:01:00 1001 Victor 0.214525 0.258635
2000-01-01 00:01:30 1018 Alice -0.646866 0.822104
2000-01-01 00:02:00 991 Dan 0.902389 0.466665
... ... ... ... ...
2000-12-30 23:58:00 992 Sarah 0.721155 0.944118
2000-12-30 23:58:30 1007 Ursula 0.409277 0.133227
2000-12-30 23:59:00 1009 Hannah -0.452802 0.184318
2000-12-30 23:59:30 978 Kevin -0.904728 -0.179146
2000-12-31 00:00:00 973 Ingrid -0.370763 -0.794667
[1051201 rows x 4 columns]
자 , 지금부터 어떤 데이터타입을 사용해야 하고, 메모리를 얼마나 사용하는지 알아봅시다.
In [8]: ts.dtypes
Out[8]:
id int64
name object
x float64
y float64
dtype: object
In [9]: ts.memory_usage(deep=True) # memory usage in bytes
Out[9]:
Index 8409608
id 8409608
name 65537768
x 8409608
y 8409608
dtype: int64
한눈에 name column이 상당한 양의 메로리를 먹는 다는 것을 알 수 있습니다. unique 값이 적기때문에 이러한 text 데이터를 Categorical(명목형) 데이터로 바꿔주기 좋다는 것을 알 수 있습니다. 명목형 데이터에, 우리는 각각의 text 값을 메모리에 최적화 될 수 있도록 각 열에 있는 이름들을 정수형 데이터로 바꿔줄 수 있습니다.
In [10]: ts2 = ts.copy()
In [11]: ts2["name"] = ts2["name"].astype("category")
In [12]: ts2.memory_usage(deep=True)
Out[12]:
Index 8409608
id 8409608
name 1053894
x 8409608
y 8409608
dtype: int64
더 나아가서 pandas.to_numeric()를 이용해서 숫자로 이루어진 열을 더 가장 작은 타입으로 변환할 수 있습니다.
In [13]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")
In [14]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")
In [15]: ts2.dtypes
Out[15]:
id uint16
name category
x float32
y float32
dtype: object
In [16]: ts2.memory_usage(deep=True)
Out[16]:
Index 8409608
id 2102402
name 1053894
x 4204804
y 4204804
dtype: int64
In [17]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()
In [18]: print(f"{reduction:0.2f}")
0.20
이를 통해 우리는 데이터 규모를 원래 대비 1/5로 줄였습니다!
명목형 데이터와 데이터 타입에 관해서는 Categorical data와 dtypes 문서를 확인해보세요!
Use chunking
몇몇 작업들은 chunking(덩어리로 쪼개기)을 통해 이룰 수 있습니다: csv 경로들을 parquet 으로 쪼개기 같은 큰 문제부터 하나의 csv를 parquet으로 쪼개기같은 작은 문제까지 해결 할 수 있습니다. 각각의 파일 조각이 메모리가 다루기에 적합한 사이즈가 될 때 까지 진행한다면 큰 규모의 데이터들을 다룰 수 있습니다
📎 Note!
chunking은 chunk 간의 최소한의 조작이 필요한 경우에 더욱 빛을 발합니다. 좀 더 복잡한 워크플로우가 요구되는 경우에 는 다른 라이브러리를 사용하는 것을 권유드립니다.
Chunking works well when the operation you’re performing requires zero or minimal coordination between chunks. |
우리가 각각 경로마다 logical 한 데이터셋을 가지고 있다고 가정해봅시다. 게다가 엄청 큰 규모로요!!! locigal 한 데이터셋은 아래 예시와 같다고 보시면 될 것 같습니다. 아래 코드블럭에서 각각의 경로는 각기 다른 년도의 전체 데이터를 위미합니다.
data
└── timeseries
├── ts-00.parquet
├── ts-01.parquet
├── ts-02.parquet
├── ts-03.parquet
├── ts-04.parquet
├── ts-05.parquet
├── ts-06.parquet
├── ts-07.parquet
├── ts-08.parquet
├── ts-09.parquet
├── ts-10.parquet
└── ts-11.parquet
이제, out-of-core value_counts를 적용할 겁니다. 이 워크플로우에서 가장 메모리를 많이 사용하는 것은 가장 큰 규모의 chunk와, 각 행마다 unique value를 가지고 있는 작은 series들입니다. 각각의 파일들은 memory에 맞는 사이즈이기 대문에, arbitrary-sized datasets에서효과가있을것입니다.
In [19]: %%time
....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
....: counts = pd.Series(dtype=int)
....: for path in files:
....: df = pd.read_parquet(path)
....: counts = counts.add(df["name"].value_counts(), fill_value=0)
....: counts.astype(int)
....:
CPU times: user 850 ms, sys: 82.8 ms, total: 932 ms
Wall time: 687 ms
Out[19]:
Alice 229802
Bob 229211
Charlie 229303
Dan 230621
Edith 230349
...
Victor 230502
Wendy 230038
Xavier 229553
Yvonne 228766
Zelda 229909
Length: 26, dtype: int64
pandas.read_csv()와 같은 reader들은 하나의 파일을 읽을 때 chunksize 인자를 제공합니다.
메뉴얼에 따르면, chunking은 정교한 작업이 필요하지않을 경우에 대해서는 좋은 옵션입니다. groupby와 같은 함수들도 chunking을 하는 방법이 되기도 합니다. 이 경우에 여러분은 out-of-core algorithms을 적용하기 쉬운 다른 라이브러리를 사용하는 것을 권유합니다.
Use other libraries
pandas 는 DataFrame API를 제공하는 유일한 라이브러리입니다. 많은 개발자분들이 사용하시는 만큼 유명하기 때문에 , pandas' API는 다른 라이브러리의 하나의 기준이 되어 사용되기도 합니다. pandas 문서는 이러한 pandas 생태계(our ecosystem page.) 안에 있는 dataframe api를 사용하는 라이브러리들을 포함하기도 합니다.
예시로 병렬 컴퓨팅 라이브러리(parallel computing library,)인 Dask 는 dask.dataframe을 제공합니다. dask.dataframe는 pandas와 유사한api로,메모리보다 더 큰 규모의 dataset을 병렬로 작업할 수 있도록 도와줍니다.Dask는여러개의thread나process를하나의머신에서사용할수있도록도와주고,여러개의 머신들을 병렬로 작업할 수 있도록 도와줍니다.
이제 우리는 dask.dataframe을 import 하고, api가 pandas와 유사하다는 것으 보여줄 겁니다. 우리는 Dask의 read_parquet 함수를 이용할 수 있습니다. 하지만 읽고자 하는 파일의 파일의 globstring을 제공합니다.
In [20]: import dask.dataframe as dd
In [21]: ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")
In [22]: ddf
Out[22]:
Dask DataFrame Structure:
id name x y
npartitions=12
int64 object float64 float64
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: read-parquet, 12 tasks
위 예시의 ddf 객체를 조사해보면 우리는 아래와 같은 점들을 발견할 수 있습니다:(pandas의 객체들과 유사한 점)
-
There are familiar attributes like .columns and .dtypes
-
There are familiar methods like .groupby, .sum, etc.
-
There are new attributes like .npartitions and .divisions
각각의 parition, division 들은 Dask가 어떻게 병렬로 computing을 진행하는지 보여줍니다. Dask의 데이어프레임은 수많은 pansdas의 dataframe으로 이루어져있습니다. Dask DataFrame에서 호출한 하나의 작은 함수는 pandas에서 여러개의 함수를 호출한 효과를 낼 수 있습니다. 그리고 Dask는 어떻게 각각의 결과들이 호응하는지 알고 있습니다.
In [23]: ddf.columns
Out[23]: Index(['id', 'name', 'x', 'y'], dtype='object')
In [24]: ddf.dtypes
Out[24]:
id int64
name object
x float64
y float64
dtype: object
In [25]: ddf.npartitions
Out[25]: 12
한가지 주된 차이점은: "dask.dataframe API is lazy" 라는 겁니다! 위의 레퍼런스를 보면, 여러분은 실재로 dff의 value가 실제로 출력되지 않았다는 것을 알 수있습니다. 우리는 위의 예시에서 그저 dtype과 columns들을 출력한 것에 불과합니다. 왜냐하면 dask는 아직 실제로 data를 읽지 않았습니다. (그러니까 우리는 pandas 처럼 df를 전체 다 읽어온 후에 columns와 dtype을 읽은 것이 아니라, 파일에서 필요한 정보만 가져왔다는 뜻입니다. ) task를 즉시 실행하는 것보다 task graph를 그린 것과 동일한 것입니다.
In [26]: ddf
Out[26]:
Dask DataFrame Structure:
id name x y
npartitions=12
int64 object float64 float64
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: read-parquet, 12 tasks
In [27]: ddf["name"]
Out[27]:
Dask Series Structure:
npartitions=12
object
...
...
...
...
Name: name, dtype: object
Dask Name: getitem, 24 tasks
In [28]: ddf["name"].value_counts()
Out[28]:
Dask Series Structure:
npartitions=1
int64
...
Name: name, dtype: int64
Dask Name: value-counts-agg, 39 tasks
아직 계산되지 않았기 때문에 각각의 call은 즉시결과를 보여주는 것을 확인할 수 있습니다. 우리는 단지 누군가가 결과를 필요로 할 때 할 수 있는 연산 목록을 만들고 있을 뿐입니다. Dask는 pandas.Series.value_counts의 결과물의 type이 name과 특정 dtype으로 구성된 pandas의 series형태라는 것을 알고 있습니다. 때문에 Dask에서의 value_counts 또한 동일한 결과물을 출력합니다.
실제 결과물을 보기위해서는 , .compute()를 사용합니다.
In [29]: %time ddf["name"].value_counts().compute()
CPU times: user 982 ms, sys: 84.6 ms, total: 1.07 s
Wall time: 744 ms
Out[29]:
Laura 230906
Ingrid 230838
Kevin 230698
Dan 230621
Frank 230595
...
Ray 229603
Xavier 229553
Charlie 229303
Bob 229211
Yvonne 228766
Name: name, Length: 26, dtype: int64
이때, 결과물은 Dask에서의 결과물과 pandas에서 value_counts를 사용했을 때의 결과물과 동일합니다. 위의 경우에서는 pandas에서의 name(index)와 count 값이 동일하게 출력되는 것입니다.
.compute 함수를 사용하는 것은 호출하고자 하는 full task graph를 실행한 것과 동일합니다. 이 과정에는 데이터를 읽어들이는 것, 열을 선택하는 것 그리고 value_count 까지 포함됩니다. .compute를 통해 비로소 모든 과정이 시작되는 것입니다. 이 실행은 병렬로 이루어지며, Dask는 전반적으로 메모리가 차지하는 공간을 작게하고자 하는 것입니다. 만일 여러분이 사용하고자 하는 데이터셋이 다룰 수 있는 크기의 파티션으로 이루어져 있다면, 충분히 다룰 수 있습니다.
기본적으로 dask.dataframe은 병렬로 작업을 진행하기 위해 thread pool을 이용합니다. 또한 여러 기계들에게 work 를 분배할 수도 있습니다. 이경우에는 우리는 각각 기기에 process를 만들 수 있도록 로컬 "cluster"에 연결해야합니다!
>>> from dask.distributed import Client, LocalCluster
>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>
한번 client가 만들어지고 나면, 모든 Dask의 연산들은 cluster에 배치됩니다(이 경우에서는 process)
Dask는 pandas 의 대부분의 API를 사용할 수 있습니다. 아래 예시는 우리가 흔히 사용하는 groupby aggregation입니다.
In [30]: %time ddf.groupby("name")[["x", "y"]].mean().compute().head()
CPU times: user 1.63 s, sys: 289 ms, total: 1.92 s
Wall time: 1.02 s
Out[30]:
x y
name
Alice 0.000086 -0.001170
Bob -0.000843 -0.000799
Charlie 0.000564 -0.000038
Dan 0.000584 0.000818
Edith -0.000116 -0.000044
groupping과 aggreation이 out-of-core이자 병렬적으로 실행되었습니다.
Dask가 데이터셋의 division을 다룰 때, 몇몇 최적화가 가능합니다. 읽고자 하는 parquet이 Dask로 쓰였을 때, 자동적으로 division이 구분됩니다. 이 경우 우리가 parquet 파일을 manually하게 만들었기 때문에 우리는 division또한 manually 하게 다룰 필요가 있습니다.
In [31]: N = 12
In [32]: starts = [f"20{i:>02d}-01-01" for i in range(N)]
In [33]: ends = [f"20{i:>02d}-12-13" for i in range(N)]
In [34]: divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)
In [35]: ddf.divisions = divisions
In [36]: ddf
Out[36]:
Dask DataFrame Structure:
id name x y
npartitions=12
2000-01-01 int64 object float64 float64
2001-01-01 ... ... ... ...
... ... ... ... ...
2011-01-01 ... ... ... ...
2011-12-13 ... ... ... ...
Dask Name: read-parquet, 12 tasks
이제 .loc을 사용해보겠습니다.
In [37]: ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
Out[37]:
id name x y
timestamp
2002-01-01 12:01:00 983 Laura 0.243985 -0.079392
2002-01-01 12:02:00 1001 Laura -0.523119 -0.226026
2002-01-01 12:03:00 1059 Oliver 0.612886 0.405680
2002-01-01 12:04:00 993 Kevin 0.451977 0.332947
2002-01-01 12:05:00 1014 Yvonne -0.948681 0.361748
Dask 에서는 2002년의 데이터를 찾기 위해 모든 파일을 훑는 것이 아니라 3번째 파티션만 보면 됩니다!(위의 디렉토리목록을 확인해보세요!) 많은 워크플로우가 대량의 데이터를 처리하고 메모리에 맞는 크기로 크기를 줄이는 방식으로 데이터를 처리합니다. 이번 케이스에서 우리는 하루동안의 빈도를 측정하고 평균을 구할 것입니다. 한번 평균을 구하고 나면, 우리는 해당 결과물이 메모리에 충분하다는 것을 알 수 있습니다. 때문에 우리는 안전하게 compute를 memory를 이용해 구할 수 있습니다. 연산이후에는 평범한 pandas 의 객체와 동일합니다.
In [38]: ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
Out[38]: <AxesSubplot:xlabel='timestamp'>
이러한 Dask 예시들은 한개의 기계에서 여러가지 작업들을 실행할 수 있게 해줍니다. Dask는 큰 데이터셋을 다루기 위해 한번에 여러 기계들에서도 작업이 배치될 수잇습니다.
더 많은 dask 예시들은 https://examples.dask.org.에서!
오늘도 어째저째 하나 끝!
출처
'기술스택을 쌓아보자 > Python' 카테고리의 다른 글
[찾는중]Backend TkAgg is interactive backend. Turning interactive mode on. (0) | 2021.06.14 |
---|---|
[pandas] ffill과 bfill (0) | 2021.01.12 |
pandas 번역: Computational tools, pandas의 연산툴 (pandas User guide 번역/pandas 기초 입문/Pandas 간단 요약) (0) | 2021.01.06 |
pandas user guide 번역: 목차 (0) | 2021.01.06 |
[파이썬기초] Effective Python item4: 파이썬스러운 생각- 복잡한 표현보다 Helper function을 이용하기 / 파이썬 코딩 요령? (0) | 2020.12.29 |
댓글