Dask-bag 擅长处理可以表示为任意输入序列的数据。我们将其称为“杂乱”数据,因为它可能包含复杂的嵌套结构、缺失的字段、数据类型的混合等。
默认情况下,dask.bag使用dask.multiprocessing的计算。作为一个好处,Dask 绕过GIL并在纯 Python 对象上使用多个内核。作为一个缺点,Dask Bag 在包含大量工作间通信的计算中表现不佳。
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1)
client
创建一组随机的记录数据,并将其以JSON形式存储到磁盘。
import dask
import json
import os
os.makedirs('data', exist_ok=True)
b = dask.datasets.make_people()
b.map(json.dumps).to_textfiles('data/*.json')
['/Users/kefei/demo/dask-learn/data/0.json', '/Users/kefei/demo/dask-learn/data/1.json', '/Users/kefei/demo/dask-learn/data/2.json', '/Users/kefei/demo/dask-learn/data/3.json', '/Users/kefei/demo/dask-learn/data/4.json', '/Users/kefei/demo/dask-learn/data/5.json', '/Users/kefei/demo/dask-learn/data/6.json', '/Users/kefei/demo/dask-learn/data/7.json', '/Users/kefei/demo/dask-learn/data/8.json', '/Users/kefei/demo/dask-learn/data/9.json']
!head -n 2 data/0.json
{"age": 51, "name": ["Chas", "Ratliff"], "occupation": "Seaman", "telephone": "344-219-9130", "address": {"address": "1135 Hawthorne Concession road", "city": "Springfield"}, "credit-card": {"number": "3416 044674 28287", "expiration-date": "06/20"}} {"age": 53, "name": ["Wilbur", "Cannon"], "occupation": "Geophysicist", "telephone": "(219) 049-8427", "address": {"address": "871 Bernal Heights Lane", "city": "Hallandale Beach"}, "credit-card": {"number": "4315 9929 2231 3721", "expiration-date": "04/22"}}
import dask.bag as db
import json
b = db.read_text('data/*.json').map(json.loads)
b
dask.bag<loads, npartitions=20>
# 查看前两个数据
b.take(2)
({'age': 51, 'name': ['Chas', 'Ratliff'], 'occupation': 'Seaman', 'telephone': '344-219-9130', 'address': {'address': '1135 Hawthorne Concession road', 'city': 'Springfield'}, 'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}}, {'age': 53, 'name': ['Wilbur', 'Cannon'], 'occupation': 'Geophysicist', 'telephone': '(219) 049-8427', 'address': {'address': '871 Bernal Heights Lane', 'city': 'Hallandale Beach'}, 'credit-card': {'number': '4315 9929 2231 3721', 'expiration-date': '04/22'}})
# 查看年龄超过30的人
b.filter(lambda record: record['age'] > 30).take(2)
({'age': 51, 'name': ['Chas', 'Ratliff'], 'occupation': 'Seaman', 'telephone': '344-219-9130', 'address': {'address': '1135 Hawthorne Concession road', 'city': 'Springfield'}, 'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}}, {'age': 53, 'name': ['Wilbur', 'Cannon'], 'occupation': 'Geophysicist', 'telephone': '(219) 049-8427', 'address': {'address': '871 Bernal Heights Lane', 'city': 'Hallandale Beach'}, 'credit-card': {'number': '4315 9929 2231 3721', 'expiration-date': '04/22'}})
# 查看每个记录中的occupation值
b.map(lambda record: record['occupation']).take(2)
('Seaman', 'Geophysicist')
# 统计数量
b.count().compute()
17025
# 计算年龄大于30的按occupation排序的前10位
result = (b.filter(lambda record: record['age'] > 30)
.map(lambda record: record['occupation'])
.frequencies(sort=True)
.topk(10, key=1))
result
dask.bag<topk-aggregate, npartitions=1>
result.compute()
[('Stonemason', 25), ('Nursing Manager', 24), ('Theatrical Agent', 23), ('Horticulturalist', 23), ('Valve Technician', 23), ('Landworker', 22), ('Mortician', 22), ('Blind Fitter', 21), ('Care Assistant', 21), ('Seamstress', 21)]
(b.filter(lambda record: record['age'] > 30)
.map(json.dumps)
.to_textfiles('data/processed.*.json'))
['/Users/kefei/demo/dask-learn/data/processed.00.json', '/Users/kefei/demo/dask-learn/data/processed.01.json', '/Users/kefei/demo/dask-learn/data/processed.02.json', '/Users/kefei/demo/dask-learn/data/processed.03.json', '/Users/kefei/demo/dask-learn/data/processed.04.json', '/Users/kefei/demo/dask-learn/data/processed.05.json', '/Users/kefei/demo/dask-learn/data/processed.06.json', '/Users/kefei/demo/dask-learn/data/processed.07.json', '/Users/kefei/demo/dask-learn/data/processed.08.json', '/Users/kefei/demo/dask-learn/data/processed.09.json', '/Users/kefei/demo/dask-learn/data/processed.10.json', '/Users/kefei/demo/dask-learn/data/processed.11.json', '/Users/kefei/demo/dask-learn/data/processed.12.json', '/Users/kefei/demo/dask-learn/data/processed.13.json', '/Users/kefei/demo/dask-learn/data/processed.14.json', '/Users/kefei/demo/dask-learn/data/processed.15.json', '/Users/kefei/demo/dask-learn/data/processed.16.json', '/Users/kefei/demo/dask-learn/data/processed.17.json', '/Users/kefei/demo/dask-learn/data/processed.18.json', '/Users/kefei/demo/dask-learn/data/processed.19.json']
b.take(1)
({'age': 51, 'name': ['Chas', 'Ratliff'], 'occupation': 'Seaman', 'telephone': '344-219-9130', 'address': {'address': '1135 Hawthorne Concession road', 'city': 'Springfield'}, 'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}},)
def flatten(record):
return {
'age': record['age'],
'occupation': record['occupation'],
'telephone': record['telephone'],
'credit-card-number': record['credit-card']['number'],
'credit-card-expiration': record['credit-card']['expiration-date'],
'name': ' '.join(record['name']),
'street-address': record['address']['address'],
'city': record['address']['city']
}
b.map(flatten).take(1)
({'age': 51, 'occupation': 'Seaman', 'telephone': '344-219-9130', 'credit-card-number': '3416 044674 28287', 'credit-card-expiration': '06/20', 'name': 'Chas Ratliff', 'street-address': '1135 Hawthorne Concession road', 'city': 'Springfield'},)
df = b.map(flatten).to_dataframe()
df.head()
age | occupation | telephone | credit-card-number | credit-card-expiration | name | street-address | city | |
---|---|---|---|---|---|---|---|---|
0 | 51 | Seaman | 344-219-9130 | 3416 044674 28287 | 06/20 | Chas Ratliff | 1135 Hawthorne Concession road | Springfield |
1 | 53 | Geophysicist | (219) 049-8427 | 4315 9929 2231 3721 | 04/22 | Wilbur Cannon | 871 Bernal Heights Lane | Hallandale Beach |
2 | 24 | Hospital Worker | 1-899-405-6481 | 4494 7877 3692 6723 | 03/22 | Spring Case | 820 St. Paul Street-Calvert Hill | Chesapeake |
3 | 60 | Osteopath | 1-677-679-4179 | 4848 0637 1092 6516 | 12/25 | Lou Galloway | 1076 Ellis Viaduct | Kennesaw |
4 | 37 | Art Historian | 514-516-9013 | 3455 426227 43867 | 06/17 | Thaddeus Kirk | 576 Candyland Nene | Peabody |
# dataframe操作
df[df.age > 30].occupation.value_counts().nlargest(10).compute()
Stonemason 25 Nursing Manager 24 Valve Technician 23 Horticulturalist 23 Theatrical Agent 23 Landworker 22 Mortician 22 Blind Fitter 21 Care Assistant 21 Almoner 21 Name: occupation, dtype: int64
client.shutdown()