본문 바로가기
기술스택을 쌓아보자/Python

pandas 번역: Scaling to large datasets, pandas 최적화(pandas User guide 번역/pandas 기초 입문/Pandas 간단 요약/데이터 최적화/ pandas 최적화)

by 소리331 2021. 1. 9.
반응형

Scaling to large datasets

 

pandas 는 메모리 데이터셋보다 더욱 크고 다루기 까다로운 인메모리 분석을 위한 데이터 구조들을 제공합니다(series, dataframe 등).  가끔 pandas의 함수들 중에서는 데이터 셋의 복사본을 만들어서 연산을 진행하기 때문에 다룰 수 있는 규모의 데이터들도 다루기 어려워 질 때도 있습니다.

이 문서에서는 큰 규모의 데이터셋을 다루는 몇몇 방법을 추천할 예정입니다. 해당 문서는 분석 속도를 높이고, 메모리사이즈에 맞는 데이터셋에 집중한 Enhancing performance의 내용에서 추가적으로 보완하는 문서입니다.

그렇지만 먼저, (빅데이터를 다루는데에) pandas 를 사용하지 않는 것 또한 하나의 방법이 될 수 있다는 점을 기억해주세요, pandas 는 모든 상황에 적합한 툴은 아닙니다. 여러분이 PostgreSQL 처럼 엄청 큰 데이터 셋을 다루는 것이 아니라면 pandas가 적합하겠지만, 과도하게 큰 데이터셋에 pandas가 만능은 아니라는 것입니다. 그저여기에서는 여러분이 pandas 로 빅데이터를 다루는 방법을 알고 싶어한다는 것을 전제로 알려드리는 겁니다. 가시죵!

In [1]: import pandas as pd

In [2]: import numpy as np

 

 

Load less data

 

디스크 내의 raw dataset에 많은 columns을 가지고 있다고 가정해봅시다.

                     id_0    name_0       x_0       y_0  id_1   name_1       x_1  ...  name_8       x_8       y_8  id_9   name_9       x_9       y_9
timestamp                                                                         ...
2000-01-01 00:00:00  1015   Michael -0.399453  0.095427   994    Frank -0.176842  ...     Dan -0.315310  0.713892  1025   Victor -0.135779  0.346801
2000-01-01 00:01:00   969  Patricia  0.650773 -0.874275  1003    Laura  0.459153  ...  Ursula  0.913244 -0.630308  1047    Wendy -0.886285  0.035852
2000-01-01 00:02:00  1016    Victor -0.721465 -0.584710  1046  Michael  0.524994  ...     Ray -0.656593  0.692568  1064   Yvonne  0.070426  0.432047
2000-01-01 00:03:00   939     Alice -0.746004 -0.908008   996   Ingrid -0.414523  ...   Jerry -0.958994  0.608210   978    Wendy  0.855949 -0.648988
2000-01-01 00:04:00  1017       Dan  0.919451 -0.803504  1048    Jerry -0.569235  ...   Frank -0.577022 -0.409088   994      Bob -0.270132  0.335176
...                   ...       ...       ...       ...   ...      ...       ...  ...     ...       ...       ...   ...      ...       ...       ...
2000-12-30 23:56:00   999       Tim  0.162578  0.512817   973    Kevin -0.403352  ...     Tim -0.380415  0.008097  1041  Charlie  0.191477 -0.599519
2000-12-30 23:57:00   970     Laura -0.433586 -0.600289   958   Oliver -0.966577  ...   Zelda  0.971274  0.402032  1038   Ursula  0.574016 -0.930992
2000-12-30 23:58:00  1065     Edith  0.232211 -0.454540   971      Tim  0.158484  ...   Alice -0.222079 -0.919274  1022      Dan  0.031345 -0.657755
2000-12-30 23:59:00  1019    Ingrid  0.322208 -0.615974   981   Hannah  0.607517  ...   Sarah -0.424440 -0.117274   990   George -0.375530  0.563312
2000-12-31 00:00:00   937    Ursula -0.906523  0.943178  1018    Alice -0.564513  ...   Jerry  0.236837  0.807650   985   Oliver  0.777642  0.783392

[525601 rows x 40 columns]

 

우리가 원하는 열만 가져오기 위해서는 2가지 옵션이 있습니다. 옵션1은 모든 데이터를 로드하고, 그 이후 필요한 것만 필터링하는 방법입니다.

In [3]: columns = ["id_0", "name_0", "x_0", "y_0"]

In [4]: pd.read_parquet("timeseries_wide.parquet")[columns]
Out[4]: 
                     id_0    name_0       x_0       y_0
timestamp                                              
2000-01-01 00:00:00  1015   Michael -0.399453  0.095427
2000-01-01 00:01:00   969  Patricia  0.650773 -0.874275
2000-01-01 00:02:00  1016    Victor -0.721465 -0.584710
2000-01-01 00:03:00   939     Alice -0.746004 -0.908008
2000-01-01 00:04:00  1017       Dan  0.919451 -0.803504
...                   ...       ...       ...       ...
2000-12-30 23:56:00   999       Tim  0.162578  0.512817
2000-12-30 23:57:00   970     Laura -0.433586 -0.600289
2000-12-30 23:58:00  1065     Edith  0.232211 -0.454540
2000-12-30 23:59:00  1019    Ingrid  0.322208 -0.615974
2000-12-31 00:00:00   937    Ursula -0.906523  0.943178

[525601 rows x 4 columns]

 

옵션2는 우리가 필요한 columns만 요청하는 겁니다.

In [5]: pd.read_parquet("timeseries_wide.parquet", columns=columns)
Out[5]: 
                     id_0    name_0       x_0       y_0
timestamp                                              
2000-01-01 00:00:00  1015   Michael -0.399453  0.095427
2000-01-01 00:01:00   969  Patricia  0.650773 -0.874275
2000-01-01 00:02:00  1016    Victor -0.721465 -0.584710
2000-01-01 00:03:00   939     Alice -0.746004 -0.908008
2000-01-01 00:04:00  1017       Dan  0.919451 -0.803504
...                   ...       ...       ...       ...
2000-12-30 23:56:00   999       Tim  0.162578  0.512817
2000-12-30 23:57:00   970     Laura -0.433586 -0.600289
2000-12-30 23:58:00  1065     Edith  0.232211 -0.454540
2000-12-30 23:59:00  1019    Ingrid  0.322208 -0.615974
2000-12-31 00:00:00   937    Ursula -0.906523  0.943178

[525601 rows x 4 columns]

만일 우리가 두가지 옵션에서 메모리 사용량을 측정한다면 같은 함수에서 columns 옵션을 사용한 옵션 2가 1/10의 메모리를 사용합니다.

 pandas.read_csv() 함수에서는 usecols 옵션을 사용해서 우리가 읽고 싶은 columns만 메모리를 사용해서 읽을 수 있습니다. 하지만 모든 파일포팻에서 특정 columns만 추출 할 수 있는 것은 아닙니다.

 

 

Use efficient datatypes

 

pandas에서 제공하는 기본 데이터 타입은 메모리에 최적화 된 타입은 아닙니다. 특히 이러한 특징은 상대적으로 적은 unique 값을 가진 text data columns에서 더욱 돋보입니다(보통 low-cardinality data라고 부릅니다). 하지만 효율적인 데이터 타입을 이용하므로서 데이터 셋을 우리의 메모리 자원 안에서 다룰 수 있습니다. 

In [6]: ts = pd.read_parquet("timeseries.parquet")

In [7]: ts
Out[7]: 
                       id      name         x         y
timestamp                                              
2000-01-01 00:00:00  1029   Michael  0.278837  0.247932
2000-01-01 00:00:30  1010  Patricia  0.077144  0.490260
2000-01-01 00:01:00  1001    Victor  0.214525  0.258635
2000-01-01 00:01:30  1018     Alice -0.646866  0.822104
2000-01-01 00:02:00   991       Dan  0.902389  0.466665
...                   ...       ...       ...       ...
2000-12-30 23:58:00   992     Sarah  0.721155  0.944118
2000-12-30 23:58:30  1007    Ursula  0.409277  0.133227
2000-12-30 23:59:00  1009    Hannah -0.452802  0.184318
2000-12-30 23:59:30   978     Kevin -0.904728 -0.179146
2000-12-31 00:00:00   973    Ingrid -0.370763 -0.794667

[1051201 rows x 4 columns]

 

자 , 지금부터 어떤 데이터타입을 사용해야 하고, 메모리를 얼마나 사용하는지 알아봅시다.

In [8]: ts.dtypes
Out[8]: 
id        int64
name     object
x       float64
y       float64
dtype: object
In [9]: ts.memory_usage(deep=True)  # memory usage in bytes
Out[9]: 
Index     8409608
id        8409608
name     65537768
x         8409608
y         8409608
dtype: int64

 

한눈에 name column이 상당한 양의 메로리를 먹는 다는 것을 알 수 있습니다. unique 값이 적기때문에 이러한 text 데이터를 Categorical(명목형) 데이터로 바꿔주기 좋다는 것을 알 수 있습니다. 명목형 데이터에, 우리는 각각의 text 값을 메모리에 최적화 될 수 있도록 각 열에 있는 이름들을 정수형 데이터로 바꿔줄 수 있습니다.

In [10]: ts2 = ts.copy()

In [11]: ts2["name"] = ts2["name"].astype("category")

In [12]: ts2.memory_usage(deep=True)
Out[12]: 
Index    8409608
id       8409608
name     1053894
x        8409608
y        8409608
dtype: int64

 

더 나아가서 pandas.to_numeric()를 이용해서 숫자로 이루어진 열을 더 가장 작은 타입으로 변환할 수 있습니다.

In [13]: ts2["id"] = pd.to_numeric(ts2["id"], downcast="unsigned")

In [14]: ts2[["x", "y"]] = ts2[["x", "y"]].apply(pd.to_numeric, downcast="float")

In [15]: ts2.dtypes
Out[15]: 
id        uint16
name    category
x        float32
y        float32
dtype: object
In [16]: ts2.memory_usage(deep=True)
Out[16]: 
Index    8409608
id       2102402
name     1053894
x        4204804
y        4204804
dtype: int64
In [17]: reduction = ts2.memory_usage(deep=True).sum() / ts.memory_usage(deep=True).sum()

In [18]: print(f"{reduction:0.2f}")
0.20

이를 통해 우리는 데이터 규모를 원래 대비 1/5로 줄였습니다!

 명목형 데이터와 데이터 타입에 관해서는 Categorical data dtypes 문서를 확인해보세요!

 

 

Use chunking

 

몇몇 작업들은 chunking(덩어리로 쪼개기)을 통해 이룰 수 있습니다: csv 경로들을 parquet 으로 쪼개기 같은 큰 문제부터 하나의 csv를 parquet으로 쪼개기같은 작은 문제까지 해결 할 수 있습니다. 각각의 파일 조각이 메모리가 다루기에 적합한 사이즈가 될 때 까지 진행한다면 큰 규모의 데이터들을 다룰 수 있습니다

  📎  Note!

  

  chunking은 chunk 간의 최소한의 조작이 필요한 경우에 더욱 빛을 발합니다. 좀 더 복잡한 워크플로우가 요구되는 경우에 

  는 다른 라이브러리를 사용하는 것을 권유드립니다.

  

  Chunking works well when the operation you’re performing requires zero or minimal coordination between chunks.
  For more complicated workflows, you’re better off 
using another library.

 

우리가 각각 경로마다 logical 한 데이터셋을 가지고 있다고 가정해봅시다. 게다가 엄청 큰 규모로요!!! locigal 한 데이터셋은 아래 예시와 같다고 보시면 될 것 같습니다. 아래 코드블럭에서 각각의 경로는 각기 다른 년도의 전체 데이터를 위미합니다.

data
└── timeseries
    ├── ts-00.parquet
    ├── ts-01.parquet
    ├── ts-02.parquet
    ├── ts-03.parquet
    ├── ts-04.parquet
    ├── ts-05.parquet
    ├── ts-06.parquet
    ├── ts-07.parquet
    ├── ts-08.parquet
    ├── ts-09.parquet
    ├── ts-10.parquet
    └── ts-11.parquet

 

이제, out-of-core value_counts를 적용할 겁니다. 이 워크플로우에서 가장 메모리를 많이 사용하는 것은 가장 큰 규모의 chunk와, 각 행마다 unique value를 가지고 있는 작은 series들입니다. 각각의 파일들은 memory에 맞는 사이즈이기 대문에, arbitrary-sized datasets에서효과가있을것입니다.

In [19]: %%time
   ....: files = pathlib.Path("data/timeseries/").glob("ts*.parquet")
   ....: counts = pd.Series(dtype=int)
   ....: for path in files:
   ....:     df = pd.read_parquet(path)
   ....:     counts = counts.add(df["name"].value_counts(), fill_value=0)
   ....: counts.astype(int)
   ....: 
CPU times: user 850 ms, sys: 82.8 ms, total: 932 ms
Wall time: 687 ms
Out[19]: 
Alice      229802
Bob        229211
Charlie    229303
Dan        230621
Edith      230349
            ...  
Victor     230502
Wendy      230038
Xavier     229553
Yvonne     228766
Zelda      229909
Length: 26, dtype: int64

 

pandas.read_csv()와 같은 reader들은 하나의 파일을 읽을 때 chunksize 인자를 제공합니다.

메뉴얼에 따르면, chunking은 정교한 작업이 필요하지않을 경우에 대해서는 좋은 옵션입니다. groupby와 같은 함수들도 chunking을 하는 방법이 되기도 합니다. 이 경우에 여러분은 out-of-core algorithms을 적용하기 쉬운 다른 라이브러리를 사용하는 것을 권유합니다.

 

 

Use other libraries

 

pandas 는 DataFrame API를 제공하는 유일한 라이브러리입니다. 많은 개발자분들이 사용하시는 만큼 유명하기 때문에 , pandas' API는 다른 라이브러리의 하나의 기준이 되어 사용되기도 합니다. pandas 문서는 이러한 pandas 생태계(our ecosystem page.) 안에 있는 dataframe api를 사용하는 라이브러리들을 포함하기도 합니다.

예시로 병렬 컴퓨팅 라이브러리(parallel computing library,)인 Dask dask.dataframe을 제공합니다.  dask.dataframe는 pandas와 유사한api로,메모리보다 더 큰 규모의 dataset을 병렬로 작업할 수 있도록 도와줍니다.Dask는여러개의thread나process를하나의머신에서사용할수있도록도와주고,여러개의 머신들을 병렬로 작업할 수 있도록 도와줍니다.

이제 우리는 dask.dataframe을 import 하고, api가 pandas와 유사하다는 것으 보여줄 겁니다. 우리는 Dask의 read_parquet 함수를 이용할 수 있습니다. 하지만 읽고자 하는 파일의 파일의 globstring을 제공합니다.

In [20]: import dask.dataframe as dd

In [21]: ddf = dd.read_parquet("data/timeseries/ts*.parquet", engine="pyarrow")

In [22]: ddf
Out[22]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
                int64  object  float64  float64
                  ...     ...      ...      ...
...               ...     ...      ...      ...
                  ...     ...      ...      ...
                  ...     ...      ...      ...
Dask Name: read-parquet, 12 tasks

 

위 예시의 ddf 객체를 조사해보면 우리는 아래와 같은 점들을 발견할 수 있습니다:(pandas의 객체들과 유사한 점)

  • There are familiar attributes like .columns and .dtypes

  • There are familiar methods like .groupby, .sum, etc.

  • There are new attributes like .npartitions and .divisions

 

각각의 parition, division 들은 Dask가 어떻게 병렬로 computing을 진행하는지 보여줍니다. Dask의 데이어프레임은 수많은 pansdas의 dataframe으로 이루어져있습니다. Dask DataFrame에서 호출한 하나의 작은 함수는 pandas에서 여러개의 함수를 호출한 효과를 낼 수 있습니다. 그리고 Dask는 어떻게 각각의 결과들이 호응하는지 알고 있습니다.  

In [23]: ddf.columns
Out[23]: Index(['id', 'name', 'x', 'y'], dtype='object')

In [24]: ddf.dtypes
Out[24]: 
id        int64
name     object
x       float64
y       float64
dtype: object

In [25]: ddf.npartitions
Out[25]: 12

 

한가지 주된 차이점은: "dask.dataframe API is lazy" 라는 겁니다! 위의 레퍼런스를 보면, 여러분은 실재로 dff의 value가 실제로 출력되지 않았다는 것을 알 수있습니다. 우리는 위의 예시에서 그저 dtype과 columns들을 출력한 것에 불과합니다. 왜냐하면 dask는 아직 실제로 data를 읽지 않았습니다. (그러니까 우리는 pandas 처럼 df를 전체 다 읽어온 후에 columns와 dtype을 읽은 것이 아니라, 파일에서 필요한 정보만 가져왔다는 뜻입니다. ) task를 즉시 실행하는 것보다 task graph를 그린 것과 동일한 것입니다. 

In [26]: ddf
Out[26]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
                int64  object  float64  float64
                  ...     ...      ...      ...
...               ...     ...      ...      ...
                  ...     ...      ...      ...
                  ...     ...      ...      ...
Dask Name: read-parquet, 12 tasks

In [27]: ddf["name"]
Out[27]: 
Dask Series Structure:
npartitions=12
    object
       ...
     ...  
       ...
       ...
Name: name, dtype: object
Dask Name: getitem, 24 tasks

In [28]: ddf["name"].value_counts()
Out[28]: 
Dask Series Structure:
npartitions=1
    int64
      ...
Name: name, dtype: int64
Dask Name: value-counts-agg, 39 tasks

아직 계산되지 않았기 때문에 각각의 call은 즉시결과를 보여주는 것을 확인할 수 있습니다. 우리는 단지 누군가가 결과를 필요로 할 때 할 수 있는 연산 목록을 만들고 있을 뿐입니다. Dask는 pandas.Series.value_counts의 결과물의 type이 name과 특정 dtype으로 구성된 pandas의 series형태라는 것을 알고 있습니다. 때문에 Dask에서의 value_counts 또한 동일한 결과물을 출력합니다.

 

실제 결과물을 보기위해서는 , .compute()를 사용합니다.

In [29]: %time ddf["name"].value_counts().compute()
CPU times: user 982 ms, sys: 84.6 ms, total: 1.07 s
Wall time: 744 ms
Out[29]: 
Laura      230906
Ingrid     230838
Kevin      230698
Dan        230621
Frank      230595
            ...  
Ray        229603
Xavier     229553
Charlie    229303
Bob        229211
Yvonne     228766
Name: name, Length: 26, dtype: int64

 

이때, 결과물은 Dask에서의 결과물과 pandas에서 value_counts를 사용했을 때의 결과물과 동일합니다. 위의 경우에서는 pandas에서의 name(index)와 count 값이 동일하게 출력되는 것입니다.

.compute 함수를 사용하는 것은 호출하고자 하는 full task  graph를 실행한 것과 동일합니다. 이 과정에는 데이터를 읽어들이는 것, 열을 선택하는 것 그리고 value_count 까지 포함됩니다. .compute를 통해 비로소 모든 과정이 시작되는 것입니다. 이 실행은 병렬로 이루어지며, Dask는 전반적으로 메모리가 차지하는 공간을 작게하고자 하는 것입니다. 만일 여러분이 사용하고자 하는 데이터셋이 다룰 수 있는 크기의 파티션으로 이루어져 있다면, 충분히 다룰 수 있습니다.

기본적으로 dask.dataframe은 병렬로 작업을 진행하기 위해 thread pool을 이용합니다. 또한 여러 기계들에게 work 를 분배할 수도 있습니다. 이경우에는 우리는 각각 기기에 process를 만들 수 있도록 로컬 "cluster"에 연결해야합니다!

>>> from dask.distributed import Client, LocalCluster

>>> cluster = LocalCluster()
>>> client = Client(cluster)
>>> client
<Client: 'tcp://127.0.0.1:53349' processes=4 threads=8, memory=17.18 GB>

 

한번 client가 만들어지고 나면, 모든 Dask의 연산들은 cluster에 배치됩니다(이 경우에서는 process)

Dask는 pandas 의 대부분의 API를 사용할 수 있습니다. 아래 예시는 우리가 흔히 사용하는 groupby aggregation입니다.

In [30]: %time ddf.groupby("name")[["x", "y"]].mean().compute().head()
CPU times: user 1.63 s, sys: 289 ms, total: 1.92 s
Wall time: 1.02 s
Out[30]: 
                x         y
name                       
Alice    0.000086 -0.001170
Bob     -0.000843 -0.000799
Charlie  0.000564 -0.000038
Dan      0.000584  0.000818
Edith   -0.000116 -0.000044

  

groupping과 aggreation이 out-of-core이자 병렬적으로 실행되었습니다.

Dask가 데이터셋의 division을 다룰 때, 몇몇 최적화가 가능합니다. 읽고자 하는 parquet이 Dask로 쓰였을 때, 자동적으로 division이 구분됩니다. 이 경우 우리가 parquet 파일을 manually하게 만들었기 때문에 우리는 division또한 manually 하게 다룰 필요가 있습니다.

In [31]: N = 12

In [32]: starts = [f"20{i:>02d}-01-01" for i in range(N)]

In [33]: ends = [f"20{i:>02d}-12-13" for i in range(N)]

In [34]: divisions = tuple(pd.to_datetime(starts)) + (pd.Timestamp(ends[-1]),)

In [35]: ddf.divisions = divisions

In [36]: ddf
Out[36]: 
Dask DataFrame Structure:
                   id    name        x        y
npartitions=12                                 
2000-01-01      int64  object  float64  float64
2001-01-01        ...     ...      ...      ...
...               ...     ...      ...      ...
2011-01-01        ...     ...      ...      ...
2011-12-13        ...     ...      ...      ...
Dask Name: read-parquet, 12 tasks

 

이제 .loc을 사용해보겠습니다.

In [37]: ddf.loc["2002-01-01 12:01":"2002-01-01 12:05"].compute()
Out[37]: 
                       id    name         x         y
timestamp                                            
2002-01-01 12:01:00   983   Laura  0.243985 -0.079392
2002-01-01 12:02:00  1001   Laura -0.523119 -0.226026
2002-01-01 12:03:00  1059  Oliver  0.612886  0.405680
2002-01-01 12:04:00   993   Kevin  0.451977  0.332947
2002-01-01 12:05:00  1014  Yvonne -0.948681  0.361748

 

Dask 에서는 2002년의 데이터를 찾기 위해 모든 파일을 훑는 것이 아니라 3번째 파티션만 보면 됩니다!(위의 디렉토리목록을 확인해보세요!) 많은 워크플로우가 대량의 데이터를 처리하고 메모리에 맞는 크기로 크기를 줄이는 방식으로 데이터를 처리합니다. 이번 케이스에서 우리는 하루동안의 빈도를 측정하고 평균을 구할 것입니다. 한번 평균을 구하고 나면, 우리는 해당 결과물이 메모리에 충분하다는 것을 알 수 있습니다. 때문에 우리는 안전하게 compute를 memory를 이용해 구할 수 있습니다. 연산이후에는 평범한 pandas 의 객체와 동일합니다.

In [38]: ddf[["x", "y"]].resample("1D").mean().cumsum().compute().plot()
Out[38]: <AxesSubplot:xlabel='timestamp'>

 

이러한 Dask 예시들은 한개의 기계에서 여러가지 작업들을 실행할 수 있게 해줍니다. Dask는 큰 데이터셋을 다루기 위해 한번에 여러 기계들에서도 작업이 배치될 수잇습니다.

더 많은 dask 예시들은  https://examples.dask.org.에서!

 


 

오늘도 어째저째 하나 끝!

 

출처

pandas.pydata.org/docs/user_guide/scale.html

반응형

댓글