# @title
from IPython.core.display import HTML
HTML("""
Note:
While the MiniCluster is useful for many development and testing scenarios, it's important to note that it does not fully replicate the complexities and challenges of a true multi-node Hadoop cluster. For production-scale testing or performance evaluations, a larger, more representative cluster setup is recommended.
""")
import urllib.request
import os
import shutil
import tarfile
import logging
import subprocess
import time
import sys
############
# COSTANTS #
############
# URL for downloading Hadoop (archive site https://archive.apache.org/dist/)
HADOOP_URL = "https://archive.apache.org/dist/hadoop/core/hadoop-3.4.0/hadoop-3.4.0.tar.gz"
# logging level (should be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL)
LOGGING_LEVEL = "INFO" #@param ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
# setup logging
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging_level = getattr(logging, LOGGING_LEVEL.upper(), 10)
logging.basicConfig(level=logging_level, \
format='%(asctime)s - %(levelname)s: %(message)s', \
datefmt='%d-%b-%y %I:%M:%S %p')
logger = logging.getLogger('my_logger')
JAVA_PATH = '/usr/lib/jvm/java-11-openjdk-amd64'
# true if running on Google Colab
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
from google.colab import output
# setup logging
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging_level = getattr(logging, LOGGING_LEVEL.upper(), 10)
logging.basicConfig(level=logging_level, \
format='%(asctime)s - %(levelname)s: %(message)s', \
datefmt='%d-%b-%y %I:%M:%S %p')
logger = logging.getLogger('my_logger')
# set variable JAVA_HOME (install Java if necessary)
def is_java_installed():
os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
return os.environ['JAVA_HOME']
def install_java():
# Uncomment and modify the desired version
# java_version= 'openjdk-11-jre-headless'
# java_version= 'default-jre'
# java_version= 'openjdk-17-jre-headless'
# java_version= 'openjdk-18-jre-headless'
java_version= 'openjdk-19-jre-headless'
print(f"Java not found. Installing {java_version} ... (this might take a while)")
try:
cmd = f"apt install -y {java_version}"
subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
stdout_result = subprocess_output.stdout
# Process the results as needed
logger.info("Done installing Java {}".format(java_version))
os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
logger.info("JAVA_HOME is {}".format(os.environ['JAVA_HOME']))
except subprocess.CalledProcessError as e:
# Handle the error if the command returns a non-zero exit code
logger.warn("Command failed with return code {}".format(e.returncode))
logger.warn("stdout: {}".format(e.stdout))
# Install Java if not available
if is_java_installed():
logger.info("Java is already installed: {}".format(os.environ['JAVA_HOME']))
else:
logger.info("Installing Java")
install_java()
# download Hadoop
file_name = os.path.basename(HADOOP_URL)
if os.path.isfile(file_name):
logger.info("{} already exists, not downloading".format(file_name))
else:
logger.info("Downloading {}".format(file_name))
urllib.request.urlretrieve(HADOOP_URL, file_name)
# uncompress archive
dir_name = file_name[:-7]
if os.path.exists(dir_name):
logger.info("{} is already uncompressed".format(file_name))
else:
logger.info("Uncompressing {}".format(file_name))
tar = tarfile.open(file_name)
tar.extractall()
tar.close()
# environment variables
os.environ['HADOOP_HOME'] = os.path.join(os.getcwd(), dir_name)
logger.info("HADOOP_HOME is {}".format(os.environ['HADOOP_HOME']))
os.environ['PATH'] = ':'.join([os.path.join(os.environ['HADOOP_HOME'], 'bin'), os.environ['PATH']])
logger.info("PATH is {}".format(os.environ['PATH']))
!mapred -h
!mapred envvars
os.environ['HADOOP_TOOLS_LIB_JARS_DIR'] = os.path.join(os.environ['HADOOP_HOME'], 'share/hadoop/tools/lib/') #IMPORTANT
!find hadoop-3.4.0 -name "mockito*"
!wget --no-clobber https://repo1.maven.org/maven2/org/mockito/mockito-core/2.28.2/mockito-core-2.28.2.jar
shutil.copy('mockito-core-2.28.2.jar', os.path.join(os.environ['HADOOP_HOME'],'share/hadoop/mapreduce/'))
os.listdir(os.path.join(os.environ['HADOOP_HOME'],'share/hadoop/mapreduce/'))
!mkdir -p ./target/test/data/dfs/{name-0-1,name-0-2}
!ls ./target/test/data/dfs/
!mapred minicluster -help
# check if the file is there
!find $HADOOP_HOME -name "core-site.xml"
# view the contents of the file
!cat $(find $HADOOP_HOME -name "core-site.xml")
with open(os.environ['HADOOP_HOME']+'/etc/hadoop/core-site.xml', 'w') as file:
file.write("\n")
!cat $(find $HADOOP_HOME -name "core-site.xml")
#!mapred minicluster -format
!lsof -n -i -P +c0 -sTCP:LISTEN
import subprocess
with open('out.txt', "w") as stdout_file, open('err.txt', "w") as stderr_file:
process = subprocess.Popen(
["mapred", "minicluster", "-format"],
stdout=stdout_file,
stderr=stderr_file
)
if not IN_COLAB:
time.sleep(30)
else:
time.sleep(10)
!lsof -n -i -P +c0 -sTCP:LISTEN
!wget http://localhost:8088
if IN_COLAB:
# serve the Web UI on Colab
print("Click on the link below to open the Resource Manager Web UI 🚀")
output.serve_kernel_port_as_window(8088, path='/node')
!wget http://localhost:19888
if IN_COLAB:
# serve the Web UI on Colab
print("Click on the link below to open the MapReduce JobHistory Server Web UI 🚀")
output.serve_kernel_port_as_window(19888, path='/node')
# you should set this to True
NGROK = False #@param {type:"boolean"}
if NGROK:
!pip install pyngrok
from pyngrok import ngrok, conf
import getpass
print("Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass()
if NGROK:
# Open a ngrok tunnel to the HTTP server
public_url = ngrok.connect(19888).public_url
if NGROK:
print(f'Click on {public_url} to open the MapReduce JobHistory Server Web UI')
process.kill()
!lsof -n -i -P +c0 -sTCP:LISTEN
!pkill -f java
import subprocess
with open('out.txt', "w") as stdout_file, open('err.txt', "w") as stderr_file:
process = subprocess.Popen(
["mapred", "minicluster", "-format", "-jhsport", "8900", "-nnhttpport", "8901", "-nnport", "8902", "-rmport", "8903"],
stdout=stdout_file,
stderr=stderr_file
)
if not IN_COLAB:
time.sleep(30)
else:
time.sleep(10)
!lsof -n -i -P +c0 -sTCP:LISTEN | grep "^COMMAND\|java"
!tail err.txt
!wget http://localhost:8901
if IN_COLAB and not NGROK:
# serve the Web UI on Colab
print("Click on the link below to open the NameNode Web UI 🚀")
output.serve_kernel_port_as_window(8901, path='/index.html')
else:
if NGROK:
# disconnect previous tunnels (note: you can have max 3 tunnels open!)
# see: https://pyngrok.readthedocs.io/en/latest/index.html#get-active-tunnels
tunnels = ngrok.get_tunnels()
for t in tunnels:
ngrok.disconnect(t.public_url)
# Open a ngrok tunnel to the HTTP server on port 8901
public_url = ngrok.connect(8901).public_url
print(f'Click on {public_url} to open the NameNode Web UI 🚀')
%%bash
# create a folder my_dir
hdfs dfs -mkdir hdfs://localhost:8902/my_dir
!hdfs dfs -ls hdfs://localhost:8902/my_dir
!ls -lh sample_data
!du -h sample_data
!hdfs dfs -put sample_data hdfs://localhost:8902/my_dir/
!hdfs dfs -ls hdfs://localhost:8902/my_dir
!hdfs dfs -du -h hdfs://localhost:8902/my_dir
!hdfs dfs -ls -R -h hdfs://localhost:8902/my_dir
!hdfs dfs -rm -r hdfs://localhost:8902/my_dir
!hdfs dfs -fs hdfs://localhost:8902/ -ls /
!hdfs dfs -ls hdfs://localhost:8902/
!hdfs dfs -D fs.defaultFS=hdfs://localhost:8902/ -ls /
with open(os.environ['HADOOP_HOME']+'/etc/hadoop/core-site.xml', 'w') as file:
file.write("""fs.defaultFShdfs://localhost:8902/""")
!cat $HADOOP_HOME/etc/hadoop/core-site.xml
!hdfs dfs -ls /
with open(os.environ['HADOOP_HOME']+'/etc/hadoop/core-site.xml', 'w') as file:
file.write("""
fs.defaultFSfile:///""")
!hdfs dfs -ls /
!hdfs dfsadmin -h
!hdfs dfsadmin -fs hdfs://localhost:8902/ -report
!find . -name "*examples*.jar"
!lsof -n -i -P +c0 -sTCP:LISTEN -ac java
# @title
from IPython.core.display import HTML
HTML("""
In the realm of networks, where processes twine,
A command unfolds, a symphony of lines.
"Lsof," it whispers, with a mystic hum,
A dance of flags, each one has its own drum.
"-n -i -P," the conductor commands,
Navigate swiftly, across distant lands.
"+c0" echoes softly, a chorus of glee,
Embrace all processes, as far as eyes can see.
"-sTCP:LISTEN," a stanza profound,
Seeking the echoes of ports, a network's sound.
Processes in repose, in a state so keen,
A tapestry of LISTEN, a poetic scene.
""")
!yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar
!yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar pi
!yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar pi \
5 1000
!yarn application -D yarn.resourcemanager.address=localhost:8903 -list -appStates ALL
file_mapred_site = os.path.join(os.environ['HADOOP_HOME'],'etc/hadoop/mapred-site.xml')
file_core_site = os.path.join(os.environ['HADOOP_HOME'],'etc/hadoop/core-site.xml')
file_yarn_site = os.path.join(os.environ['HADOOP_HOME'],'etc/hadoop/yarn-site.xml')
%%bash
cat > $HADOOP_HOME/'etc/hadoop/mapred-site.xml' << 🐸
mapreduce.framework.nameyarnmapreduce.application.classpath${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/mapreduce/lib/*
🐸
!cat $HADOOP_HOME/'etc/hadoop/mapred-site.xml'
with open(file_core_site, 'w') as file:
file.write("""
fs.defaultFShdfs://localhost:8902/""")
with open(file_yarn_site, 'w') as file:
file.write("""
yarn.resourcemanager.addresslocalhost:8903yarn.log-aggregation-enabletrue""")
process.kill()
!pkill -f java # kill java processes
with open('out.txt', "w") as stdout_file, open('err.txt', "w") as stderr_file:
process = subprocess.Popen(
["mapred", "minicluster", "-format", "-jhsport", "8900", "-nnhttpport", "8901", "-nnport", "8902", "-rmport", "8903"],
stdout=stdout_file,
stderr=stderr_file
)
for att in range(10):
with open('err.txt') as myfile:
if 'Started MiniMRCluster' in myfile.read():
print('MiniCluster is up and running')
break
else:
time.sleep(2)
!lsof -n -i -P +c0 -sTCP:LISTEN -ac java
!yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar pi \
5 1000
!yarn application -list -appStates ALL
with open('job_out.txt', "w") as stdout_file, open('job_err.txt', "w") as stderr_file:
process = subprocess.Popen(
["yarn", "jar", "./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar", "pi",
"-D", "localhost:8903",
"50", "1000000"],
stdout=stdout_file,
stderr=stderr_file
)
time.sleep(10)
!yarn application -list -appStates ALL
!yarn application -list -appStates ALL
!yarn application -list -appStates FINISHED 2>/dev/null|grep SUCCEEDED|tail -1| cut -f1
%%bash
app_id=$(yarn application -list -appStates FINISHED 2>/dev/null|grep SUCCEEDED|tail -1| cut -f1)
yarn logs -applicationId $app_id