#Install the docker service - this will be preinstalled in final VM !sudo apt-get install docker.io #Install the python docker wrapper - this will be preinstalled in final VM !pip3 install docker-py #We're going to use the recipe described by https://sebastianvoss.com/docker-mongodb-sharded-cluster.html #You should only run this cell once - once you have run it, change the cell from a Code cell to a Raw NBConvert cell docker_config_mongod='''FROM ubuntu:latest # Add 10gen official apt source to the sources list RUN apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10 RUN echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | tee /etc/apt/sources.list.d/10gen.list # Install MongoDB RUN apt-get update RUN apt-get install mongodb-10gen # Create the MongoDB data directory RUN mkdir -p /data/db EXPOSE 27017 ENTRYPOINT ["usr/bin/mongod"]''' docker_config_mongos='''FROM dev24/mongodb:latest EXPOSE 27017 ENTRYPOINT ["usr/bin/mongos"]''' !mkdir docker_mongodb_cluster !mkdir docker_mongodb_cluster/mongod with open('docker_mongodb_cluster/mongod/Dockerfile','w') as f: f.write(docker_config_mongod) !mkdir docker_mongodb_cluster/mongos with open('docker_mongodb_cluster/mongos/Dockerfile','w') as f: f.write(docker_config_mongos) #Create the docker images %cd docker_mongodb_cluster !sudo docker.io build -t dev24/mongodb mongod !sudo docker.io build -t dev24/mongos mongos %cd /vagrant/notebooks import docker #Connect to docker c = docker.Client(base_url='unix://var/run/docker.sock', version='1.10', timeout=10) #We can run a mongo database server in a container using the following command: # ##sudo docker run \ ## -P -name rs1_srv1 \ ## -d dev24/mongodb \ ## --replSet rs1 \ ## --noprealloc --smallfiles # #The -P flag is a docker flag, the rest are passed through the mongodb server) #That is, the following command is run to start each mongo server using a configuration to keep it small for now...: # usr/bin/mongod --replSet REPLICA_SET_NAME --noprealloc --smallfiles #Add a -v flag to command to specify verbose stdio logging (increase the number of v's for more... eg -vv or -vvvv) def createReplicaSetNode(c,stub,num=0): ''' Create and run a specified number of mongo database servers as a replica set ''' name='{stub}_srv{num}'.format(stub=stub,num=num) command='--replSet {stub} --noprealloc --smallfiles'.format(stub=stub) c.create_container('dev24/mongodb',name=name,command=command) c.start(name,publish_all_ports=True) return name createReplicaSetNode(c,'test') #Some helper functions via https://github.com/docker/docker-py #Equivalent of docker ps def docker_ps(c): return c.containers(quiet=False, all=False, trunc=True, latest=False, since=None, before=None, limit=-1) docker_ps(c) !docker.io ps | head -n 3 #Find the local port bound for 27017/tcp for each server in the replica set def get27017tcp_port(c,container): cConfig = c.inspect_container(container) return int(cConfig['NetworkSettings']['Ports']['27017/tcp'][0]['HostPort']) def get27017tcp_ports(c,containers): ports={} for container in containers: ports[container]= get27017tcp_port(c,container) return ports get27017tcp_ports(c,['test_srv0']) def getContainIPaddress(c,container): cConfig = c.inspect_container(container) return cConfig['NetworkSettings']['IPAddress'] def getContainIPaddresses(c,containers): ipaddresses={} for container in containers: ipaddresses[container]= getContainIPaddress(c,container) return ipaddresses getContainIPaddress(c,'test_srv0') def showContainers(c): for xc in c.containers(quiet=False, all=False, trunc=True, latest=False, since=None, before=None, limit=-1): print(xc['Names'],xc['Status']) showContainers(c) #Helper routines for shutting down and removing containers def tidyAwayContainer(c,container): container=container.strip('/') c.stop(container) c.remove_container(container) def tidyAwayContainers(c,containers): for container in containers: tidyAwayContainer(c,container) tidyAwayContainer(c,'test_srv0') #Let's create a function that will create several nodes def createReplicaSetNodes(c,stub,numNodes): ''' Create and run a specified number of mongo database servers as a replica set ''' names=[] for i in range(0,numNodes): name=createReplicaSetNode(c,stub,i) names.append(name) return names def rs_config(c,rsid,num=3): ''' Create a replica set of nodes and then define a configuation file for that replica set ''' createReplicaSetNodes(c,rsid,num) _rs_config={"_id" : rsid, 'members':[] } #This is scrappy - should really return something better from the creation for i in range(0,num): name='{stub}_srv{num}'.format(stub=rsid,num=i) #c.inspect_container(name) #get IP and port _rs_config['members'].append({"_id":i,"host":'{0}:{1}'.format(getContainIPaddress(c,name),27017)}) return _rs_config tidyAwayContainers(c,['rs4_srv0','rs4_srv1','rs4_srv2']) rsc=rs_config(c,'rs4') rsc #Initialise the replica set from pymongo import MongoClient #We'll use the 0th server in the set as a the node mc = MongoClient('localhost', get27017tcp_port(c,'rs4_srv0')) #In the mongo console, we would typically use the command rs.config() to initial the replica set #Here, we use the replSetInitiate admin command, applying it with the desired configuration mc.admin.command( "replSetInitiate",rsc); #We may need to wait a minute or two for the configuration to come up #If you get an error message that suggests the configuration is up yet, wait a few seconds then rerun the cell mc.admin.command('replSetGetStatus') from pymongo import MongoReplicaSetClient testclient= MongoReplicaSetClient('{0}:{1}'.format(getContainIPaddress(c,'rs4_srv0'),27017), replicaSet='rs4') testdb=testclient.testdb testcollection=testdb.testcollection testcollection.insert({'name':'test1'}) for x in range(0,10): testcollection.insert({'name':'test'+str(x)}) for ff in testcollection.find(): print(ff) ##https://github.com/dcm-oss/blockade/ # # Copyright (C) 2014 Dell, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import random import string import subprocess import collections #---errors.py class BlockadeError(Exception): """Expected error within Blockade """ class BlockadeConfigError(BlockadeError): """Error in configuration """ class AlreadyInitializedError(BlockadeError): """Blockade already created in this context """ class NotInitializedError(BlockadeError): """Blockade not created in this context """ class InconsistentStateError(BlockadeError): """Blockade state is inconsistent (partially created or destroyed) """ #--- def parse_partition_index(blockade_id, chain): prefix = "%s-p" % (blockade_id,) if chain and chain.startswith(prefix): try: return int(chain[len(prefix):]) except ValueError: pass raise ValueError("chain %s is not a blockade partition" % (chain,)) def partition_chain_name(blockade_id, partition_index): return "%s-p%s" % (blockade_id, partition_index) def iptables_call_output(*args): cmd = ["iptables", "-n"] + list(args) try: output = subprocess.check_output(cmd) return output.decode().split("\n") except subprocess.CalledProcessError: raise BlockadeError("Problem calling '%s'" % " ".join(cmd)) def iptables_call(*args): cmd = ["iptables"] + list(args) try: subprocess.check_call(cmd) except subprocess.CalledProcessError: raise BlockadeError("Problem calling '%s'" % " ".join(cmd)) def iptables_get_chain_rules(chain): if not chain: raise ValueError("invalid chain") lines = iptables_call_output("-L", chain) if len(lines) < 2: raise BlockadeError("Can't understand iptables output: \n%s" % "\n".join(lines)) chain_line, header_line = lines[:2] if not (chain_line.startswith("Chain " + chain) and header_line.startswith("target")): raise BlockadeError("Can't understand iptables output: \n%s" % "\n".join(lines)) return lines[2:] def iptables_get_source_chains(blockade_id): """Get a map of blockade chains IDs -> list of IPs targeted at them For figuring out which container is in which partition """ result = {} if not blockade_id: raise ValueError("invalid blockade_id") lines = iptables_get_chain_rules("FORWARD") for line in lines: parts = line.split() if len(parts) < 4: continue try: partition_index = parse_partition_index(blockade_id, parts[0]) except ValueError: continue # not a rule targetting a blockade chain source = parts[3] if source: result[source] = partition_index return result def iptables_delete_rules(chain, predicate): if not chain: raise ValueError("invalid chain") if not isinstance(predicate, collections.Callable): raise ValueError("invalid predicate") lines = iptables_get_chain_rules(chain) # TODO this is susceptible to check-then-act races. # better to ultimately switch to python-iptables if it becomes less buggy for index, line in reversed(list(enumerate(lines, 1))): line = line.strip() if line and predicate(line): iptables_call("-D", chain, str(index)) def iptables_delete_blockade_rules(blockade_id): def predicate(rule): target = rule.split()[0] try: parse_partition_index(blockade_id, target) except ValueError: return False return True iptables_delete_rules("FORWARD", predicate) def iptables_delete_blockade_chains(blockade_id): if not blockade_id: raise ValueError("invalid blockade_id") lines = iptables_call_output("-L") for line in lines: parts = line.split() if len(parts) >= 2 and parts[0] == "Chain": chain = parts[1] try: parse_partition_index(blockade_id, chain) except ValueError: continue # if we are a valid blockade chain, flush and delete iptables_call("-F", chain) iptables_call("-X", chain) def iptables_insert_rule(chain, src=None, dest=None, target=None): """Insert a new rule in the chain """ if not chain: raise ValueError("Invalid chain") if not target: raise ValueError("Invalid target") if not (src or dest): raise ValueError("Need src, dest, or both") args = ["-I", chain] if src: args += ["-s", src] if dest: args += ["-d", dest] args += ["-j", target] iptables_call(*args) def iptables_create_chain(chain): """Create a new chain """ if not chain: raise ValueError("Invalid chain") iptables_call("-N", chain) def clear_iptables(blockade_id): """Remove all iptables rules and chains related to this blockade """ # first remove refererences to our custom chains iptables_delete_blockade_rules(blockade_id) # then remove the chains themselves iptables_delete_blockade_chains(blockade_id) def partition_containers(blockade_id, partitions): if not partitions or len(partitions) == 1: return for index, partition in enumerate(partitions, 1): chain_name = partition_chain_name(blockade_id, index) # create chain for partition and block traffic TO any other partition iptables_create_chain(chain_name) for other in partitions: if partition is other: continue for container in other: if container.ip_address: iptables_insert_rule(chain_name, dest=container.ip_address, target="DROP") # direct traffic FROM any container in the partition to the new chain for container in partition: iptables_insert_rule("FORWARD", src=container.ip_address, target=chain_name) class netobj(): def __init__(self, ip_address): self.ip_address = ip_address #In this case, let's be cruel and put the primary in a partition on its own partition_containers('test1w2s2', [ [netobj('172.17.0.2')],[netobj('172.17.0.3'),netobj('172.17.0.4')]]) #Wait a bit before generating the log... #In an ssh shell, we can use the follow sort of command to look at a real time stream of stdio log messages from the container #!docker.io logs --follow=true rs4_srv1 #testcollection.insert({'name':'test3'}) !docker.io logs rs4_srv0 > rs4_srv0_log.txt !docker.io logs rs4_srv1 > rs4_srv1_log.txt !docker.io logs rs4_srv2 > rs4_srv2_log.txt !tail -n 30 rs4_srv0_log.txt !tail rs4_srv1_log.txt !tail rs4_srv2_log.txt #!sudo iptables -L #Clear the network problems... clear_iptables('test1w2s2') #Wait a bit before generating the log... #In an ssh shell, we can use the follow sort of command to look at a real time stream of stdio log messages from the container #!docker.io logs --follow=true rs4_srv1 #testcollection.insert({'name':'test3'}) !docker.io logs rs4_srv0 > rs4_srv0_log.txt !docker.io logs rs4_srv1 > rs4_srv1_log.txt !docker.io logs rs4_srv2 > rs4_srv2_log.txt !tail -n 30 rs4_srv0_log.txt !tail rs4_srv1_log.txt !tail rs4_srv2_log.txt #Create a new problem - this time we partition of a single secondary server partition_containers('test1w2s2', [ [netobj('172.17.0.4')],[netobj('172.17.0.3'),netobj('172.17.0.2')]]) #Wait a bit before generating the log... #In an ssh shell, we can use the follow sort of command to look at a real time stream of stdio log messages from the container #!docker.io logs --follow=true rs4_srv1 #testcollection.insert({'name':'test3'}) !docker.io logs rs4_srv0 > rs4_srv0_log.txt !docker.io logs rs4_srv1 > rs4_srv1_log.txt !docker.io logs rs4_srv2 > rs4_srv2_log.txt !tail -n 30 rs4_srv0_log.txt !tail -n 20 rs4_srv1_log.txt !tail -n 20 rs4_srv2_log.txt #–--JUNK BELOW HERE !sudo iptables -L # !netstat -lntu !more /etc/default/docker.io !sudo docker.io run -P -name rs2_srv4 -d dev24/mongodb --replSet rs2 --noprealloc --smallfiles class NetworkState(object): NORMAL = "NORMAL" SLOW = "SLOW" FLAKY = "FLAKY" UNKNOWN = "UNKNOWN" class BlockadeNetwork(object): def __init__(self, config): self.config = config def new_veth_device_name(self): chars = string.ascii_letters + string.digits return "veth" + "".join(random.choice(chars) for _ in range(8)) def network_state(self, device): return network_state(device) def flaky(self, device): flaky_config = self.config.network['flaky'].split() traffic_control_netem(device, ["loss"] + flaky_config) def slow(self, device): slow_config = self.config.network['slow'].split() traffic_control_netem(device, ["delay"] + slow_config) def fast(self, device): traffic_control_restore(device) def restore(self, blockade_id): clear_iptables(blockade_id) def partition_containers(self, blockade_id, partitions): clear_iptables(blockade_id) partition_containers(blockade_id, partitions) def get_ip_partitions(self, blockade_id): return iptables_get_source_chains(blockade_id) def traffic_control_restore(device): cmd = ["tc", "qdisc", "del", "dev", device, "root"] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) _, stderr = p.communicate() stderr = stderr.decode() if p.returncode != 0: if p.returncode == 2 and stderr: if "No such file or directory" in stderr: return # TODO log error somewhere? raise BlockadeError("Problem calling traffic control: " + " ".join(cmd)) def traffic_control_netem(device, params): try: cmd = ["tc", "qdisc", "replace", "dev", device, "root", "netem"] + params subprocess.check_call(cmd) except subprocess.CalledProcessError: # TODO log error somewhere? raise BlockadeError("Problem calling traffic control: " + " ".join(cmd)) def network_state(device): try: output = subprocess.check_output( ["tc", "qdisc", "show", "dev", device]).decode() # sloppy but good enough for now if " delay " in output: return NetworkState.SLOW if " loss " in output: return NetworkState.FLAKY return NetworkState.NORMAL except subprocess.CalledProcessError: # TODO log error somewhere? return NetworkState.UNKNOWN #!netstat -lntu !sudo docker.io ps --all #--no-trunc traffic_control_netem('vethb03e', ["loss", '40%']) device='vethb03e' traffic_control_netem(device, ["loss", '40%']) traffic_control_netem(device, ["delay","75ms","100ms","distribution","normal"]) traffic_control_restore('vethafb1') #We can see the different veth names from netstat !netstat -i #but this doesn't give a way of associating them with a container (sudo docker.io ps)