# @title from IPython.core.display import HTML HTML("""
Note: While the MiniCluster is useful for many development and testing scenarios, it's important to note that it does not fully replicate the complexities and challenges of a true multi-node Hadoop cluster. For production-scale testing or performance evaluations, a larger, more representative cluster setup is recommended.
""") import urllib.request import os import shutil import tarfile import logging import subprocess import time import sys ############ # COSTANTS # ############ # URL for downloading Hadoop (archive site https://archive.apache.org/dist/) HADOOP_URL = "https://archive.apache.org/dist/hadoop/core/hadoop-3.4.0/hadoop-3.4.0.tar.gz" # logging level (should be one of: DEBUG, INFO, WARNING, ERROR, CRITICAL) LOGGING_LEVEL = "INFO" #@param ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] # setup logging for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) logging_level = getattr(logging, LOGGING_LEVEL.upper(), 10) logging.basicConfig(level=logging_level, \ format='%(asctime)s - %(levelname)s: %(message)s', \ datefmt='%d-%b-%y %I:%M:%S %p') logger = logging.getLogger('my_logger') JAVA_PATH = '/usr/lib/jvm/java-11-openjdk-amd64' # true if running on Google Colab IN_COLAB = 'google.colab' in sys.modules if IN_COLAB: from google.colab import output # setup logging for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) logging_level = getattr(logging, LOGGING_LEVEL.upper(), 10) logging.basicConfig(level=logging_level, \ format='%(asctime)s - %(levelname)s: %(message)s', \ datefmt='%d-%b-%y %I:%M:%S %p') logger = logging.getLogger('my_logger') # set variable JAVA_HOME (install Java if necessary) def is_java_installed(): os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0] return os.environ['JAVA_HOME'] def install_java(): # Uncomment and modify the desired version # java_version= 'openjdk-11-jre-headless' # java_version= 'default-jre' # java_version= 'openjdk-17-jre-headless' # java_version= 'openjdk-18-jre-headless' java_version= 'openjdk-19-jre-headless' print(f"Java not found. Installing {java_version} ... (this might take a while)") try: cmd = f"apt install -y {java_version}" subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) stdout_result = subprocess_output.stdout # Process the results as needed logger.info("Done installing Java {}".format(java_version)) os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0] logger.info("JAVA_HOME is {}".format(os.environ['JAVA_HOME'])) except subprocess.CalledProcessError as e: # Handle the error if the command returns a non-zero exit code logger.warn("Command failed with return code {}".format(e.returncode)) logger.warn("stdout: {}".format(e.stdout)) # Install Java if not available if is_java_installed(): logger.info("Java is already installed: {}".format(os.environ['JAVA_HOME'])) else: logger.info("Installing Java") install_java() # download Hadoop file_name = os.path.basename(HADOOP_URL) if os.path.isfile(file_name): logger.info("{} already exists, not downloading".format(file_name)) else: logger.info("Downloading {}".format(file_name)) urllib.request.urlretrieve(HADOOP_URL, file_name) # uncompress archive dir_name = file_name[:-7] if os.path.exists(dir_name): logger.info("{} is already uncompressed".format(file_name)) else: logger.info("Uncompressing {}".format(file_name)) tar = tarfile.open(file_name) tar.extractall() tar.close() # environment variables os.environ['HADOOP_HOME'] = os.path.join(os.getcwd(), dir_name) logger.info("HADOOP_HOME is {}".format(os.environ['HADOOP_HOME'])) os.environ['PATH'] = ':'.join([os.path.join(os.environ['HADOOP_HOME'], 'bin'), os.environ['PATH']]) logger.info("PATH is {}".format(os.environ['PATH'])) !mapred -h !mapred envvars os.environ['HADOOP_TOOLS_LIB_JARS_DIR'] = os.path.join(os.environ['HADOOP_HOME'], 'share/hadoop/tools/lib/') #IMPORTANT !find hadoop-3.4.0 -name "mockito*" !wget --no-clobber https://repo1.maven.org/maven2/org/mockito/mockito-core/2.28.2/mockito-core-2.28.2.jar shutil.copy('mockito-core-2.28.2.jar', os.path.join(os.environ['HADOOP_HOME'],'share/hadoop/mapreduce/')) os.listdir(os.path.join(os.environ['HADOOP_HOME'],'share/hadoop/mapreduce/')) !mkdir -p ./target/test/data/dfs/{name-0-1,name-0-2} !ls ./target/test/data/dfs/ !mapred minicluster -help # check if the file is there !find $HADOOP_HOME -name "core-site.xml" # view the contents of the file !cat $(find $HADOOP_HOME -name "core-site.xml") with open(os.environ['HADOOP_HOME']+'/etc/hadoop/core-site.xml', 'w') as file: file.write("\n") !cat $(find $HADOOP_HOME -name "core-site.xml") #!mapred minicluster -format !lsof -n -i -P +c0 -sTCP:LISTEN import subprocess with open('out.txt', "w") as stdout_file, open('err.txt', "w") as stderr_file: process = subprocess.Popen( ["mapred", "minicluster", "-format"], stdout=stdout_file, stderr=stderr_file ) if not IN_COLAB: time.sleep(30) else: time.sleep(10) !lsof -n -i -P +c0 -sTCP:LISTEN !wget http://localhost:8088 if IN_COLAB: # serve the Web UI on Colab print("Click on the link below to open the Resource Manager Web UI 🚀") output.serve_kernel_port_as_window(8088, path='/node') !wget http://localhost:19888 if IN_COLAB: # serve the Web UI on Colab print("Click on the link below to open the MapReduce JobHistory Server Web UI 🚀") output.serve_kernel_port_as_window(19888, path='/node') # you should set this to True NGROK = False #@param {type:"boolean"} if NGROK: !pip install pyngrok from pyngrok import ngrok, conf import getpass print("Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken") conf.get_default().auth_token = getpass.getpass() if NGROK: # Open a ngrok tunnel to the HTTP server public_url = ngrok.connect(19888).public_url if NGROK: print(f'Click on {public_url} to open the MapReduce JobHistory Server Web UI') process.kill() !lsof -n -i -P +c0 -sTCP:LISTEN !pkill -f java import subprocess with open('out.txt', "w") as stdout_file, open('err.txt', "w") as stderr_file: process = subprocess.Popen( ["mapred", "minicluster", "-format", "-jhsport", "8900", "-nnhttpport", "8901", "-nnport", "8902", "-rmport", "8903"], stdout=stdout_file, stderr=stderr_file ) if not IN_COLAB: time.sleep(30) else: time.sleep(10) !lsof -n -i -P +c0 -sTCP:LISTEN | grep "^COMMAND\|java" !tail err.txt !wget http://localhost:8901 if IN_COLAB and not NGROK: # serve the Web UI on Colab print("Click on the link below to open the NameNode Web UI 🚀") output.serve_kernel_port_as_window(8901, path='/index.html') else: if NGROK: # disconnect previous tunnels (note: you can have max 3 tunnels open!) # see: https://pyngrok.readthedocs.io/en/latest/index.html#get-active-tunnels tunnels = ngrok.get_tunnels() for t in tunnels: ngrok.disconnect(t.public_url) # Open a ngrok tunnel to the HTTP server on port 8901 public_url = ngrok.connect(8901).public_url print(f'Click on {public_url} to open the NameNode Web UI 🚀') %%bash # create a folder my_dir hdfs dfs -mkdir hdfs://localhost:8902/my_dir !hdfs dfs -ls hdfs://localhost:8902/my_dir !ls -lh sample_data !du -h sample_data !hdfs dfs -put sample_data hdfs://localhost:8902/my_dir/ !hdfs dfs -ls hdfs://localhost:8902/my_dir !hdfs dfs -du -h hdfs://localhost:8902/my_dir !hdfs dfs -ls -R -h hdfs://localhost:8902/my_dir !hdfs dfs -rm -r hdfs://localhost:8902/my_dir !hdfs dfs -fs hdfs://localhost:8902/ -ls / !hdfs dfs -ls hdfs://localhost:8902/ !hdfs dfs -D fs.defaultFS=hdfs://localhost:8902/ -ls / with open(os.environ['HADOOP_HOME']+'/etc/hadoop/core-site.xml', 'w') as file: file.write(""" fs.defaultFS hdfs://localhost:8902/ """) !cat $HADOOP_HOME/etc/hadoop/core-site.xml !hdfs dfs -ls / with open(os.environ['HADOOP_HOME']+'/etc/hadoop/core-site.xml', 'w') as file: file.write(""" fs.defaultFS file:/// """) !hdfs dfs -ls / !hdfs dfsadmin -h !hdfs dfsadmin -fs hdfs://localhost:8902/ -report !find . -name "*examples*.jar" !lsof -n -i -P +c0 -sTCP:LISTEN -ac java # @title from IPython.core.display import HTML HTML("""

In the realm of networks, where processes twine,
A command unfolds, a symphony of lines.
"Lsof," it whispers, with a mystic hum,
A dance of flags, each one has its own drum.

"-n -i -P," the conductor commands,
Navigate swiftly, across distant lands.
"+c0" echoes softly, a chorus of glee,
Embrace all processes, as far as eyes can see.

"-sTCP:LISTEN," a stanza profound,
Seeking the echoes of ports, a network's sound.
Processes in repose, in a state so keen,
A tapestry of LISTEN, a poetic scene.

""") !yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar !yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar pi !yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar pi \ 5 1000 !yarn application -D yarn.resourcemanager.address=localhost:8903 -list -appStates ALL file_mapred_site = os.path.join(os.environ['HADOOP_HOME'],'etc/hadoop/mapred-site.xml') file_core_site = os.path.join(os.environ['HADOOP_HOME'],'etc/hadoop/core-site.xml') file_yarn_site = os.path.join(os.environ['HADOOP_HOME'],'etc/hadoop/yarn-site.xml') %%bash cat > $HADOOP_HOME/'etc/hadoop/mapred-site.xml' << 🐸 mapreduce.framework.name yarn mapreduce.application.classpath ${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/mapreduce/lib/* 🐸 !cat $HADOOP_HOME/'etc/hadoop/mapred-site.xml' with open(file_core_site, 'w') as file: file.write(""" fs.defaultFS hdfs://localhost:8902/ """) with open(file_yarn_site, 'w') as file: file.write(""" yarn.resourcemanager.address localhost:8903 yarn.log-aggregation-enable true """) process.kill() !pkill -f java # kill java processes with open('out.txt', "w") as stdout_file, open('err.txt', "w") as stderr_file: process = subprocess.Popen( ["mapred", "minicluster", "-format", "-jhsport", "8900", "-nnhttpport", "8901", "-nnport", "8902", "-rmport", "8903"], stdout=stdout_file, stderr=stderr_file ) for att in range(10): with open('err.txt') as myfile: if 'Started MiniMRCluster' in myfile.read(): print('MiniCluster is up and running') break else: time.sleep(2) !lsof -n -i -P +c0 -sTCP:LISTEN -ac java !yarn jar ./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar pi \ 5 1000 !yarn application -list -appStates ALL with open('job_out.txt', "w") as stdout_file, open('job_err.txt', "w") as stderr_file: process = subprocess.Popen( ["yarn", "jar", "./hadoop-3.4.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.4.0.jar", "pi", "-D", "localhost:8903", "50", "1000000"], stdout=stdout_file, stderr=stderr_file ) time.sleep(10) !yarn application -list -appStates ALL !yarn application -list -appStates ALL !yarn application -list -appStates FINISHED 2>/dev/null|grep SUCCEEDED|tail -1| cut -f1 %%bash app_id=$(yarn application -list -appStates FINISHED 2>/dev/null|grep SUCCEEDED|tail -1| cut -f1) yarn logs -applicationId $app_id