#!/usr/bin/env python # coding: utf-8 --- sidebar_label: Ethereum Blockchain Analysis sidebar_position: 1 --- # # Ethereum Blockchain Analysis with Ethereum-ETL and Bacalhau # # # [![stars - badge-generator](https://img.shields.io/github/stars/bacalhau-project/bacalhau?style=social)](https://github.com/bacalhau-project/bacalhau) # # Mature blockchains are difficult to analyze because of their size. Ethereum-ETL is a tool that makes it easy to extract information from an Ethereum node, but it's not easy to get working in a batch manner. It takes approximately 1 week for an Ethereum node to download the entire chain (event more in my experience) and importing and exporting data from the Ethereum node is slow. # # For this example, we ran an Ethereum node for a week and allowed it to synchronise. We then ran ethereum-etl to extract the information and pinned it on Filecoin. This means that we can both now access the data without having to run another ethereum node. # # But there's still a lot of data and these types of analyses typically need repeating or refining. So it makes absolute sense to use a decentralised network like Bacalhau to process the data in a scalable way. # # ## TD;LR # Running Ethereum-etl tool on Bacalhau to extract Ethereum node. # # ### Prerequisite # # To get started, you need to install the Bacalhau client, see more information [here](https://docs.bacalhau.org/getting-started/installation) # In[ ]: get_ipython().system('command -v bacalhau >/dev/null 2>&1 || (export BACALHAU_INSTALL_DIR=.; curl -sL https://get.bacalhau.org/install.sh | bash)') path=get_ipython().getoutput('echo $PATH') get_ipython().run_line_magic('env', 'PATH=./:{path[-1]}') # ## Analysing Ethereum Data Locally # # First let's download one of the IPFS files and inspect it locally. You can see the full list of IPFS CIDs in the appendix. # In[ ]: get_ipython().run_cell_magic('bash', '', 'wget -q -O file.tar.gz https://w3s.link/ipfs/bafybeifgqjvmzbtz427bne7af5tbndmvniabaex77us6l637gqtb2iwlwq\ntar -xvf file.tar.gz\n') # In[ ]: get_ipython().run_cell_magic('bash', '', 'pip install pandas\n') # In[ ]: # Use pandas to read in transaction data and clean up the columns import pandas as pd import glob file = glob.glob('output_*/transactions/start_block=*/end_block=*/transactions*.csv')[0] print("Loading file %s" % file) df = pd.read_csv(file) df['value'] = df['value'].astype('float') df['from_address'] = df['from_address'].astype('string') df['to_address'] = df['to_address'].astype('string') df['hash'] = df['hash'].astype('string') df['block_hash'] = df['block_hash'].astype('string') df['block_datetime'] = pd.to_datetime(df['block_timestamp'], unit='s') df.info() # The following code inspects the daily trading volume of Ethereum for a single chunk (100,000 blocks) of data. # # This is all good, but we can do better. We can use the Bacalhau client to download the data from IPFS and then run the analysis on the data in the cloud. This means that we can analyse the entire Ethereum blockchain without having to download it locally. # In[ ]: # Total volume per day df[['block_datetime', 'value']].groupby(pd.Grouper(key='block_datetime', freq='1D')).sum().plot() # ## Analysing Ethereum Data With Bacalhau # # To run jobs on the Bacalhau network you need to package your code. In this example I will package the code as a Docker image. # # But before we do that, we need to develop the code that will perform the analysis. The code below is a simple script to parse the incoming data and produce a CSV file with the daily trading volume of Ethereum. # In[ ]: get_ipython().run_cell_magic('writefile', 'main.py', 'import glob, os, sys, shutil, tempfile\nimport pandas as pd\n\ndef main(input_dir, output_dir):\n search_path = os.path.join(input_dir, "output*", "transactions", "start_block*", "end_block*", "transactions_*.csv")\n csv_files = glob.glob(search_path)\n if len(csv_files) == 0:\n print("No CSV files found in %s" % search_path)\n sys.exit(1)\n for transactions_file in csv_files:\n print("Loading %s" % transactions_file)\n df = pd.read_csv(transactions_file)\n df[\'value\'] = df[\'value\'].astype(\'float\')\n df[\'block_datetime\'] = pd.to_datetime(df[\'block_timestamp\'], unit=\'s\')\n \n print("Processing %d blocks" % (df.shape[0]))\n results = df[[\'block_datetime\', \'value\']].groupby(pd.Grouper(key=\'block_datetime\', freq=\'1D\')).sum()\n print("Finished processing %d days worth of records" % (results.shape[0]))\n\n save_path = os.path.join(output_dir, os.path.basename(transactions_file))\n os.makedirs(os.path.dirname(save_path), exist_ok=True)\n print("Saving to %s" % (save_path))\n results.to_csv(save_path)\n\ndef extractData(input_dir, output_dir):\n search_path = os.path.join(input_dir, "*.tar.gz")\n gz_files = glob.glob(search_path)\n if len(gz_files) == 0:\n print("No tar.gz files found in %s" % search_path)\n sys.exit(1)\n for f in gz_files:\n shutil.unpack_archive(filename=f, extract_dir=output_dir)\n\nif __name__ == "__main__":\n if len(sys.argv) != 3:\n print(\'Must pass arguments. Format: [command] input_dir output_dir\')\n sys.exit()\n with tempfile.TemporaryDirectory() as tmp_dir:\n extractData(sys.argv[1], tmp_dir)\n main(tmp_dir, sys.argv[2])\n') # Next, let's make sure the file works as expected... # In[ ]: get_ipython().run_cell_magic('bash', '', 'python main.py . outputs/\n') # And finally, package the code inside a Docker image to make the process reproducible. Here I'm passing the Bacalhau default `/inputs` and `/outputs` directories. The `/inputs` directory is where the data will be read from and the `/outputs` directory is where the results will be saved to. # In[ ]: get_ipython().run_cell_magic('writefile', 'Dockerfile', 'FROM python:3.11-slim-bullseye\nWORKDIR /src\nRUN pip install pandas==1.5.1\nADD main.py .\nCMD ["python", "main.py", "/inputs", "/outputs"]\n') # We've already pushed the container, but for posterity, the following command pushes this container to GHCR. # # ```bash # docker buildx build --platform linux/amd64 --push -t ghcr.io/bacalhau-project/examples/blockchain-etl:0.0.1 . # ``` # ## Running a Bacalhau Job # # To run our analysis on the Ethereum blockchain, we will use the `bacalhau docker run` command. # In[ ]: get_ipython().run_cell_magic('bash', '--out job_id', 'bacalhau docker run \\\n --id-only \\\n --input ipfs://bafybeifgqjvmzbtz427bne7af5tbndmvniabaex77us6l637gqtb2iwlwq:/inputs/data.tar.gz \\\n ghcr.io/bacalhau-project/examples/blockchain-etl:0.0.6\n') # The job has been submitted and Bacalhau has printed out the related job id. We store that in an environment variable so that we can reuse it later on. # In[ ]: get_ipython().run_line_magic('env', 'JOB_ID={job_id}') # The `bacalhau docker run` command allows to pass input data volume with a `-i ipfs://CID:path` argument just like Docker, except the left-hand side of the argument is a [content identifier (CID)](https://github.com/multiformats/cid). This results in Bacalhau mounting a *data volume* inside the container. By default, Bacalhau mounts the input volume at the path `/inputs` inside the container. # # Bacalhau also mounts a data volume to store output data. The `bacalhau docker run` command creates an output data volume mounted at `/outputs`. This is a convenient location to store the results of your job. # ## Checking the State of your Jobs # # - **Job status**: You can check the status of the job using `bacalhau list`. # In[ ]: get_ipython().run_cell_magic('bash', '', 'bacalhau list --id-filter ${JOB_ID}\n') # When it says `Published` or `Completed`, that means the job is done, and we can get the results. # # - **Job information**: You can find out more information about your job by using `bacalhau describe`. # In[ ]: get_ipython().run_cell_magic('bash', '', 'bacalhau describe ${JOB_ID}\n') # - **Job download**: You can download your job results directly by using `bacalhau get`. Alternatively, you can choose to create a directory to store your results. In the command below, we created a directory and downloaded our job output to be stored in that directory. # In[ ]: get_ipython().run_cell_magic('bash', '', 'rm -rf ./results && mkdir -p ./results # Temporary directory to store the results\nbacalhau get --output-dir ./results ${JOB_ID} # Download the results\n') # After the download has finished you should see the following contents in results directory. # # ## Viewing your Job Output # # To view the file, run the following command: # In[ ]: get_ipython().run_cell_magic('bash', '', 'ls -lah results/outputs\n') # ### Display the image # # To view the images, we will use **glob** to return all file paths that match a specific pattern. # In[ ]: import glob import pandas as pd # Get CSV files list from a folder csv_files = glob.glob("results/outputs/*.csv") df = pd.read_csv(csv_files[0], index_col='block_datetime') df.plot() # ### Massive Scale Ethereum Analysis # # Ok so that works. Let's scale this up! We can run the same analysis on the entire Ethereum blockchain (up to the point where I have uploaded the Ethereum data). To do this, we need to run the analysis on each of the chunks of data that we have stored on IPFS. We can do this by running the same job on each of the chunks. # # See the appendix for the `hashes.txt` file. # In[ ]: get_ipython().run_cell_magic('bash', '', 'printf "" > job_ids.txt\nfor h in $(cat hashes.txt); do \\\n bacalhau docker run \\\n --id-only \\\n --wait=false \\\n --input=ipfs://$h:/inputs/data.tar.gz \\\n ghcr.io/bacalhau-project/examples/blockchain-etl:0.0.6 >> job_ids.txt \ndone\n') # Now take a look at the job id's. You can use these to check the status of the jobs and download the results. You might want to double check that the jobs ran ok by doing a `bacalhau list`. # In[ ]: get_ipython().run_cell_magic('bash', '', 'cat job_ids.txt\n') # Wait until all of these jobs have completed: # In[ ]: get_ipython().run_cell_magic('bash', '', 'bacalhau list -n 50\n') # And then download all the results and merge them into a single directory. This might take a while, so this is a good time to treat yourself to a nice Dark Mild. There's also been some issues in the past communicating with IPFS, so if you get an error, try again. # In[ ]: get_ipython().run_cell_magic('bash', '', 'for id in $(cat job_ids.txt); do \\\n rm -rf results_$id && mkdir results_$id\n bacalhau get --output-dir results_$id $id &\ndone\nwait\n') # ### Display the image # # To view the images, we will use **glob** to return all file paths that match a specific pattern. # In[ ]: import os, glob import pandas as pd # Get CSV files list from a folder path = os.path.join("results_*", "outputs", "*.csv") csv_files = glob.glob(path) # Read each CSV file into a list of DataFrames df_list = (pd.read_csv(file, index_col='block_datetime') for file in csv_files) # Concatenate all DataFrames df_unsorted = pd.concat(df_list, ignore_index=False) # Some files will cross days, so group by day and sum the values df = df_unsorted.groupby(level=0).sum() # Plot df.plot(figsize=(16,9)) # That's it! There is several years of Ethereum transaction volume data. # In[ ]: get_ipython().run_cell_magic('bash', '', 'rm -rf results_* output_* outputs results temp # Remove temporary results\n') # ## Appendix 1: List Ethereum Data CIDs # # The following list is a list of IPFS CID's for the Ethereum data that we used in this tutorial. You can use these CID's to download the rest of the chain if you so desire. The CIDs are ordered by block number and they increase 50,000 blocks at a time. Here's a list of ordered CIDs: # In[ ]: get_ipython().run_cell_magic('writefile', 'hashes.txt', 'bafybeihvtzberlxrsz4lvzrzvpbanujmab3hr5okhxtbgv2zvonqos2l3i\nbafybeifb25fgxrzu45lsc47gldttomycqcsao22xa2gtk2ijbsa5muzegq\nbafybeig4wwwhs63ly6wbehwd7tydjjtnw425yvi2tlzt3aii3pfcj6hvoq\nbafybeievpb5q372q3w5fsezflij3wlpx6thdliz5xowimunoqushn3cwka\nbafybeih6te26iwf5kzzby2wqp67m7a5pmwilwzaciii3zipvhy64utikre\nbafybeicjd4545xph6rcyoc74wvzxyaz2vftapap64iqsp5ky6nz3f5yndm\nbafybeicgo3iofo3sw73wenc3nkdhi263yytjnds5cxjwvypwekbz4sk7ra\nbafybeihvep5xsvxm44lngmmeysihsopcuvcr34an4idz45ixl5slsqzy3y\nbafybeigmt2zwzrbzwb4q2kt2ihlv34ntjjwujftvabrftyccwzwdypama4\nbafybeiciwui7sw3zqkvp4d55p4woq4xgjlstrp3mzxl66ab5ih5vmeozci\nbafybeicpmotdsj2ambf666b2jkzp2gvg6tadr6acxqw2tmdlmsruuggbbu\nbafybeigefo3esovbveavllgv5wiheu5w6cnfo72jxe6vmfweco5eq5sfty\nbafybeigvajsumnfwuv7lp7yhr2sr5vrk3bmmuhhnaz53waa2jqv3kgkvsu\nbafybeih2xg2n7ytlunvqxwqlqo5l3daykuykyvhgehoa2arot6dmorstmq\nbafybeihnmq2ltuolnlthb757teihwvvw7wophoag2ihnva43afbeqdtgi4\nbafybeibb34hzu6z2xgo6nhrplt3xntpnucthqlawe3pmzgxccppbxrpudy\nbafybeigny33b4g6gf2hrqzzkfbroprqrimjl5gmb3mnsqu655pbbny6tou\nbafybeifgqjvmzbtz427bne7af5tbndmvniabaex77us6l637gqtb2iwlwq\nbafybeibryqj62l45pxjhdyvgdc44p3suhvt4xdqc5jpx474gpykxwgnw2e\nbafybeidme3fkigdjaifkjfbwn76jk3fcqdogpzebtotce6ygphlujaecla\nbafybeig7myc3eg3h2g5mk2co7ybte4qsuremflrjneer6xk3pghjwmcwbi\nbafybeic3x2r5rrd3fdpdqeqax4bszcciwepvbpjl7xdv6mkwubyqizw5te\nbafybeihxutvxg3bw7fbwohq4gvncrk3hngkisrtkp52cu7qu7tfcuvktnq\nbafybeicumr67jkyarg5lspqi2w4zqopvgii5dgdbe5vtbbq53mbyftduxy\nbafybeiecn2cdvefvdlczhz6i4afbkabf5pe5yqrcsgdvlw5smme2tw7em4\nbafybeiaxh7dhg4krgkil5wqrv5kdsc3oewwy6ym4n3545ipmzqmxaxrqf4\nbafybeiclcqfzinrmo3adr4lg7sf255faioxjfsolcdko3i4x7opx7xrqii\nbafybeicjmeul7c2dxhmaudawum4ziwfgfkvbgthgtliggfut5tsc77dx7q\nbafybeialziupik7csmhfxnhuss5vrw37kmte7rmboqovp4cpq5hj4insda\nbafybeid7ecwdrw7pb3fnkokq5adybum6s5ok3yi2lw4m3edjpuy65zm4ji\nbafybeibuxwnl5ogs4pwa32xriqhch24zbrw44rp22hrly4t6roh6rz7j4m\nbafybeicxvy47jpvv3fi5umjatem5pxabfrbkzxiho7efu6mpidjpatte54\nbafybeifynb4mpqrbsjbeqtxpbuf6y4frrtjrc4tm7cnmmui7gbjkckszrq\nbafybeidcgnbhguyfaahkoqbyy2z525d3qfzdtbjuk4e75wkdbnkcafvjei\nbafybeiefc67s6hpydnsqdgypbunroqwkij5j26sfmc7are7yxvg45uuh7i\nbafybeiefwjy3o42ovkssnm7iihbog46k5grk3gobvvkzrqvof7p6xbgowi\nbafybeihpydd3ivtza2ql5clatm5fy7ocych7t4czu46sbc6c2ykrbwk5uu\nbafybeiet7222lqfmzogur3zlxqavlnd3lt3qryw5yi5rhuiqeqg4w7c3qu\nbafybeihwomd4ygoydvj5kh24wfwk5kszmst5vz44zkl6yibjargttv7sly\nbafybeidbjt2ckr4oooio3jsfk76r3bsaza5trjvt7u36slhha5ksoc5gv4\nbafybeifyjrmopgtfmswq7b4pfscni46doy3g3z6vi5rrgpozc6duebpmuy\nbafybeidsrowz46yt62zs64q2mhirlc3rsmctmi3tluorsts53vppdqjj7e\nbafybeiggntql57bw24bw6hkp2yqd3qlyp5oxowo6q26wsshxopfdnzsxhq\nbafybeidguz36u6wakx4e5ewuhslsfsjmk5eff5q7un2vpkrcu7cg5aaqf4\nbafybeiaypwu2b45iunbqnfk2g7bku3nfqveuqp4vlmmwj7o7liyys42uai\nbafybeicaahv7xvia7xojgiecljo2ddrvryzh2af7rb3qqbg5a257da5p2y\nbafybeibgeiijr74rcliwal3e7tujybigzqr6jmtchqrcjdo75trm2ptb4e\nbafybeiba3nrd43ylnedipuq2uoowd4blghpw2z7r4agondfinladcsxlku\nbafybeif3semzitjbxg5lzwmnjmlsrvc7y5htekwqtnhmfi4wxywtj5lgoe\nbafybeiedmsig5uj7rgarsjans2ad5kcb4w4g5iurbryqn62jy5qap4qq2a\nbafybeidyz34bcd3k6nxl7jbjjgceg5eu3szbrbgusnyn7vfl7facpecsce\nbafybeigmq5gch72q3qpk4nipssh7g7msk6jpzns2d6xmpusahkt2lu5m4y\nbafybeicjzoypdmmdt6k54wzotr5xhpzwbgd3c4oqg6mj4qukgvxvdrvzye\nbafybeien55egngdpfvrsxr2jmkewdyha72ju7qaaeiydz2f5rny7drgzta\n') # ## Appendix 2: Setting up an Ethereum Node # # In the course of writing this example I had to setup an Ethereum node. It was a slow and painful process so I thought I would share the steps I took to make it easier for others. # ### Geth setup and sync # # Geth supports Ubuntu by default, so use that when creating a VM. Use Ubuntu 22.04 LTS. # # ```bash # gcloud compute instances create phil-ethereum-node \ # --project=bacalhau-development --zone=europe-west2-c \ # --machine-type=c2-standard-4 --tags=geth \ # --create-disk=auto-delete=yes,boot=yes,device-name=phil-ethereum-node,image=projects/ubuntu-os-cloud/global/images/ubuntu-2204-jammy-v20221101a,mode=rw,size=50,type=projects/bacalhau-development/zones/europe-west2-c/diskTypes/pd-balanced \ # --create-disk=auto-delete=yes,device-name=phil-ethereum-disk,mode=rw,name=phil-ethereum-disk,size=3000,type=projects/bacalhau-development/zones/europe-west2-c/diskTypes/pd-standard # ``` # # Mount the disk: # # ```bash # sudo mkfs.ext4 -m 0 -E lazy_itable_init=0,lazy_journal_init=0,discard /dev/sdb # sudo mkdir -p /mnt/disks/ethereum # sudo mount -o discard,defaults /dev/sdb /mnt/disks/ethereum # sudo chmod a+w /mnt/disks/ethereum # ``` # # ```bash # sudo add-apt-repository -y ppa:ethereum/ethereum # sudo apt-get update # sudo apt-get install -y ethereum # sudo mkdir /prysm && cd /prysm # sudo curl https://raw.githubusercontent.com/prysmaticlabs/prysm/master/prysm.sh --output prysm.sh && sudo chmod +x prysm.sh # ``` # # Run as a new user: # # ```bash # sudo useradd -d /home/ethuser -m --uid 10000 ethuser # sudo chown -R ethuser /prysm # ``` # # ``` # sudo tee "/etc/systemd/system/geth.service" > /dev/null <<'EOF' # [Unit] # Description=Geth # # [Service] # Type=simple # User=ethuser # Restart=always # RestartSec=12 # ExecStart=/bin/geth --syncmode "full" --datadir /mnt/disks/ethereum # # [Install] # WantedBy=default.target # EOF # # sudo tee "/etc/systemd/system/prysm.service" > /dev/null <<'EOF' # [Unit] # Description=Prysm # # [Service] # Type=simple # User=ethuser # Restart=always # RestartSec=12 # ExecStart=/prysm/prysm.sh beacon-chain --execution-endpoint=/mnt/disks/ethereum/geth.ipc --suggested-fee-recipient=0x7f68cb1cdE000AF82291A0D0c21E0f88FD7dB440 --checkpoint-sync-url=https://beaconstate.info # --genesis-beacon-api-url=https://beaconstate.info --accept-terms-of-use --datadir /mnt/disks/ethereum/prysm # # [Install] # WantedBy=default.target # EOF # # sudo systemctl daemon-reload # sudo systemctl enable prysm.service # sudo systemctl enable geth.service # sudo systemctl daemon-reload # sudo service prysm start # sudo service geth start # ``` # # Check they are running: # # ```bash # service prysm status # service geth status # ``` # # Watch the logs: # # ```bash # journalctl -u prysm -f # ``` # # Prysm will need to finish synchronising before geth will start synchronising. # # In Prysm you will see lots of log messages saying: `Synced new block`, and in Geth you will see: `Syncing beacon headers downloaded=11,920,384 left=4,054,753 eta=2m25.903s`. This tells you how long it will take to sync the beacons. Once that's done, get will start synchronising the blocks. # # Bring up the ethereum javascript console with: # # ``` # sudo geth --datadir /mnt/disks/ethereum/ attach # ``` # # Once the block sync has started, `eth.syncing` will return values. Before it starts, this value will be `false`. # # Note that by default, geth will perform a fast sync, without downloading the full blocks. The `syncmode=flull` flag forces geth to do a full sync. If we didn't do this, then we wouldn't be able to backup the data properly. # # ### Extracting the Data # # ```bash # # Install pip and ethereum-etl # sudo apt-get install -y python3-pip # sudo pip3 install ethereum-etl # cd # mkdir ethereum-etl # cd ethereum-etl # # # Export data with one 50000-item batch in a directory. Up to this point we've processed about 3m. # # The full chain is about 16m blocks # for i in $(seq 0 50000 16000000); do sudo ethereumetl export_all --partition-batch-size 50000 --start $i --end $(expr $i + 50000 - 1) --provider-uri file:///mnt/disks/ethereum/geth.ipc -o output_$i; done # ``` # # ### Upload the data # # Tar and compress the directories to make them easier to upload: # # ```bash # sudo apt-get install -y jq # Install jq to parse the cid # cd # cd ethereum-etl # for i in $(seq 0 50000 16000000); do tar cfz output_$i.tar.gz output_$i; done # ``` # # Export your Web3.storage JWT API key as an environment variable called `TOKEN`: # # ```bash # printf "" > hashes.txt # for i in $(seq 0 50000 16000000); do curl -X POST https://api.web3.storage/upload -H "Authorization: Bearer ${TOKEN}" -H 'accept: application/json' -H 'Content-Type: text/plain' -H "X-NAME: ethereum-etl-block-$i" --data-binary "@output_$i.tar.gz" >> raw.json; done # ``` #