Chapter 3 of Real World Algorithms.
Panos Louridas
Athens University of Economics and Business
To implement Huffman encoding we need a priority queue.
We will implement the priority queue as a min-heap.
A min-heap is a complete binary tree in which the value of each node is less than or equal to the values of its children.
The heap will be implemented as an array with the root of the heap at position 0. Then for each node, $i$, its left child is at position $2i + 1$ and its right child is at position $2i + 2$.
6
/ \
11 8
/ \ / \
17 19 13 12
[6, 11, 8, 17, 19, 13, 12]
We create a priority queue by creating an empty list:
def create_pq():
return []
To insert an element in the priority queue, we will first put it at the end of the corresponding list (so it will be at the bottom of the queue, or the equivalent min-heap).
Then the element will start floating up until it reaches a level where it is smaller than its parent node.
Here is the function that simply adds an element at the end of the priority queue.
def add_last(pq, c):
pq.append(c)
It will also be useful to have some helper functions for the priority queue.
We need a function that returns the position of the root, i.e., the first element in the queue for a non-empty queue. That is simple, it will always be position 0.
Then, we need a function that assigns an element to the root of a non-empty queue. If the queue is empty, nothing will be done; otherwise, the item will be assigned to position 0.
def root(pq):
return 0
def set_root(pq, c):
if len(pq) != 0:
pq[0] = c
We also need a function that will return the data stored at a specified position in the queue.
In essence, that is just the element at that position in the list.
def get_data(pq, p):
return pq[p]
As we have said, the children of node $c$ are at positions $2c + 1$ and $2c + 2$.
So, here is the function that returns the children of a node in the queue.
Note that we must be careful to return the right number of children.
A node that has children has always two children, except if its only child is the last element of the queue; check the two following examples:
6 6
/ \ / \
11 8 11 8
/ \ / \ / \ /
17 19 13 12 17 19 13
[6, 11, 8, 17, 19, 13, 12] [6, 11, 8, 17, 19, 13]
def children(pq, p):
if 2*p + 2 < len(pq):
return [2*p + 1, 2*p + 2]
else:
return [2*p + 1]
Conversely, the parent of a node at position $c$ is at position $\lfloor(c - 1)\rfloor / 2$.
def parent(p):
return (p - 1) // 2
To swap two elements of the queue we will use the following function:
def exchange(pq, p1, p2):
pq[p1], pq[p2] = pq[p2], pq[p1]
With all this in place, here is how we can insert an element in the priority queue:
def insert_in_pq(pq, c):
add_last(pq, c)
i = len(pq) - 1
while i != root(pq) and get_data(pq, i) < get_data(pq, parent(i)):
p = parent(i)
exchange(pq, i, p)
i = p
We can follow the queue construction process step-by-step.
min_queue = create_pq()
insert_in_pq(min_queue, 11)
print(min_queue)
[11]
11
insert_in_pq(min_queue, 17)
print(min_queue)
[11, 17]
11
/
17
insert_in_pq(min_queue, 12)
print(min_queue)
[11, 17, 12]
11
/ \
17 12
insert_in_pq(min_queue, 8)
print(min_queue)
[8, 11, 12, 17]
8
/ \
11 12
/
17
insert_in_pq(min_queue, 19)
print(min_queue)
[8, 11, 12, 17, 19]
8
/ \
11 12
/ \
17 19
insert_in_pq(min_queue, 13)
print(min_queue)
[8, 11, 12, 17, 19, 13]
8
/ \
11 12
/ \ /
17 19 13
insert_in_pq(min_queue, 6)
print(min_queue)
[6, 11, 8, 17, 19, 13, 12]
6
/ \
11 8
/ \ / \
17 19 13 12
Now we must implement the extraction of elements from the queue.
To do that, we must use a helper function that will extract the last element of the queue.
That is equivalent to removing the last element of the corresponding list.
def extract_last_from_pq(pq):
return pq.pop()
We will also need a function that will determine whether a given node has children.
Remember that a node $p$ may have children at positions $2p + 1$ and $2p + 2$.
Therefore we only need to check whether position $2p + 1$ is a valid position in the list.
def has_children(pq, p):
return 2*p + 1 < len(pq)
With this, we can write the following element extraction function.
Note that inside the while
loop we want to get the minimum of the children. To do that we use Python's min()
function, telling it to use the data of each child for the comparison.
def extract_min_from_pq(pq):
c = pq[root(pq)]
set_root(pq, extract_last_from_pq(pq))
i = root(pq)
while has_children(pq, i):
# Use the data stored at each child as the comparison key
# for finding the minimum.
j = min(children(pq, i), key=lambda x: get_data(pq, x))
if get_data(pq, i) < get_data(pq, j):
return c
exchange(pq, i, j)
i = j
return c
Let's see how it works in practice.
m = extract_min_from_pq(min_queue)
print(m)
print(min_queue)
6 [8, 11, 12, 17, 19, 13]
8
/ \
11 12
/ \ /
17 19 13
m = extract_min_from_pq(min_queue)
print(m)
print(min_queue)
8 [11, 13, 12, 17, 19]
11
/ \
13 12
/ \
17 19
With the priority queue in our hands we can implement Huffman encoding.
We will adapt the example given in Rosetta Code.
We start by reading the text that we want to compress and counting the frequencies of its characters.
To do that we can use a dictionary.
text = "This is the phrase that we want to compress."
symb2freq = {}
for ch in text:
# If ch is not in the frequency table
# we have to create an entry for it
# initialized to zero.
if ch not in symb2freq:
symb2freq[ch] = 0
# Add one to the number of times we have
# seen ch.
symb2freq[ch] += 1
import pprint
pprint.pprint(symb2freq)
{' ': 8, '.': 1, 'T': 1, 'a': 3, 'c': 1, 'e': 4, 'h': 4, 'i': 2, 'm': 1, 'n': 1, 'o': 2, 'p': 2, 'r': 2, 's': 5, 't': 5, 'w': 2}
Instead of using a bare-bones dictionary, it is more practical to use Python's defaultdict
.
This will add an entry with its value as zero if we try to access an element that does not already exist in the dictionary.
from collections import defaultdict
symb2freq = defaultdict(int)
for ch in text:
symb2freq[ch] += 1
pprint.pprint(symb2freq)
defaultdict(<class 'int'>, {' ': 8, '.': 1, 'T': 1, 'a': 3, 'c': 1, 'e': 4, 'h': 4, 'i': 2, 'm': 1, 'n': 1, 'o': 2, 'p': 2, 'r': 2, 's': 5, 't': 5, 'w': 2})
We can simplify things even further if we use Python's Counter
.
This will automatically count the occurrences of the characters in our text.
from collections import Counter
symb2freq = Counter(text)
pprint.pprint(symb2freq)
Counter({' ': 8, 's': 5, 't': 5, 'h': 4, 'e': 4, 'a': 3, 'i': 2, 'p': 2, 'r': 2, 'w': 2, 'o': 2, 'T': 1, 'n': 1, 'c': 1, 'm': 1, '.': 1})
Going now to the priority queue, it will contain elements of the following form:
[value, [character, encoding], ... ]
The value
is the frequency corresponding to the element of the queue.
The ...
above refers to repeated [character, encoding]
instances; these are the characters represented by the specific element of the queue and the Huffman encodings of these characters.
We will see that this will allow us to create the Huffman encoding while we are building the encoding tree, without needing to traverse it afterwards.
We start by putting the characters and their frequencies in the priority queue.
pq = create_pq()
# The priority queue will be initialized with elements of the form:
# [value, [character, encoding]]
for key, value in symb2freq.items():
insert_in_pq(pq, [value, [key, '']])
pprint.pprint(pq)
[[1, ['.', '']], [1, ['T', '']], [1, ['c', '']], [2, ['p', '']], [2, ['w', '']], [2, ['i', '']], [1, ['m', '']], [2, ['r', '']], [4, ['h', '']], [8, [' ', '']], [3, ['a', '']], [5, ['t', '']], [2, ['o', '']], [4, ['e', '']], [1, ['n', '']], [5, ['s', '']]]
As you can see, right now we have inserted into the queue the individual characters of the text with their occurrences. None of them has any Huffman encoding, yet.
To create the Huffman encodings, we work as follows.
Note that we do not really need to create a Huffman tree explicitly at all.
When we take out two nodes from the queue and create a new node with the two nodes as children, we only need to add a 0
to the encoding of the left child and a 1
to the encoding of the right child.
So, when we take out the two nodes:
[1, ['.', '']]
[1, ['T', '']]
We will get the node:
[2, ['.', '0'], ['T', '1']]
Then, when we take out the two nodes:
[2, ['.', '0'], ['T', '1']]
[1, ['n', '']]
We will get the node:
[3, ['n', '0'], ['.', '10'], ['T', '11']]
That explains why we had the priority queue have elements of the form:
[value, [character, encoding], [character, encoding], ... ]
def create_huffman_code(pq):
while len(pq) > 1:
# Extract the two minimum items from the priority queue.
x = extract_min_from_pq(pq)
y = extract_min_from_pq(pq)
# Get all the [character, encoding] items associated with x;
# as x is the left child of the new node, prepend '0'
# to their encodings.
for pair in x[1:]:
pair[1] = '0' + pair[1]
# Do the same for y; as y is the right child of the
# new node, prepend '1' to their encodings.
for pair in y[1:]:
pair[1] = '1' + pair[1]
# Insert a new node with the sum of the occurrences
# of the two extracted nodes and the updated
# [character, encoding] sequences.
insert_in_pq(pq, [x[0] + y[0]] + x[1:] + y[1:])
return extract_min_from_pq(pq)
hc = create_huffman_code(pq)
print("Huffman Code:")
pprint.pprint(hc)
Huffman Code: [44, ['h', '000'], ['o', '0010'], ['p', '0011'], ['r', '0100'], ['w', '0101'], ['s', '011'], ['t', '100'], ['a', '1010'], ['n', '10110'], ['.', '101110'], ['T', '101111'], [' ', '110'], ['c', '111000'], ['m', '111001'], ['i', '11101'], ['e', '1111']]
As we can see, the Huffman encoding was returned in a list.
It is more practical to enter it into a dictionary, where the keys are the characters and the values are their encodings.
We will call this Huffman coding table.
hc_table = { character: encoding for [character, encoding] in hc[1:]}
pprint.pprint(hc_table)
{' ': '110', '.': '101110', 'T': '101111', 'a': '1010', 'c': '111000', 'e': '1111', 'h': '000', 'i': '11101', 'm': '111001', 'n': '10110', 'o': '0010', 'p': '0011', 'r': '0100', 's': '011', 't': '100', 'w': '0101'}
Now we can encode our original text.
To see what is happening, we take the original text and we print the binary representation of each character.
The Python ord()
function returns the Python encoding for a character, which we convert to binary format.
Then we print out the compressed text, so you can see the savings:
huffman_encoding = [ hc_table[c] for c in text ]
print(text)
print("Original contents:")
bit_representation = [ f'{ord(c):b}' for c in text ]
print(bit_representation)
print("Compressed contents:")
print(huffman_encoding)
This is the phrase that we want to compress. Original contents: ['1010100', '1101000', '1101001', '1110011', '100000', '1101001', '1110011', '100000', '1110100', '1101000', '1100101', '100000', '1110000', '1101000', '1110010', '1100001', '1110011', '1100101', '100000', '1110100', '1101000', '1100001', '1110100', '100000', '1110111', '1100101', '100000', '1110111', '1100001', '1101110', '1110100', '100000', '1110100', '1101111', '100000', '1100011', '1101111', '1101101', '1110000', '1110010', '1100101', '1110011', '1110011', '101110'] Compressed contents: ['101111', '000', '11101', '011', '110', '11101', '011', '110', '100', '000', '1111', '110', '0011', '000', '0100', '1010', '011', '1111', '110', '100', '000', '1010', '100', '110', '0101', '1111', '110', '0101', '1010', '10110', '100', '110', '100', '0010', '110', '111000', '0010', '111001', '0011', '0100', '1111', '011', '011', '101110']
Careful, now we go into a bit more advanced stuff, on how we would actually store compressed data and how we will the decompress them.
Notice that the compressed contents we have been displaying are not really compressed.
Let us check how long our original text was:
print(len(text))
44
It is 44 characters long; whereas the compressed text is:
huffman_string = ''.join(huffman_encoding)
print(huffman_string)
print(len(huffman_string))
101111000111010111101110101111010000011111100011000010010100111111110100000101010011001011111110010110101011010011010000101101110000010111001001101001111011011101110 165
This happens because we store 0
and 1
as characters, not as individual bits.
In reality, we want to store them using single bits. Then we will achieve the promised savings.
To convert them to bits, we'll break the Huffman string to chunks of eight, and we will convert each one of them to two bytes, equal to to exactly eight bits.
If the Huffman string is not divisible by eight, we assume that the last chunk is padded to the left with zeros.
To convert a bit string to bytes, we use the function int(chunk, 2)
, which will treat chunk
as a string representing a binary (base 2) number.
We convert the result to bytes, using to_bytes(1, byteorder='big')
: we need one byte to represent the integer we got. The to_bytes()
function returns an array of bytes, here it returns an array of one. In general, though, it could be longer.
The second argument specifies how bytes will be put into the array. By giving byteorder='big'
we specify that
and we want the bytes to follow the big endian representation, i.e., if we put two bytes into the array, the most significant of the two bytes is the first one and it will be put first in the array, followed by the second.
For example, if we have the two bytes 0xABCD
in hexadecimal we will get an array of two bytes, the first element of the array will contain 0xAB
and the second 0xCD
. If we use to_bytes(2, byteorder='little')
, then we get little endian representation, meaning that the hexadecimal numberABCD
would be stored as an array with the first element being CD
and the second AB
.
Again, here it does not matter as we get only one byte, but we do have to specify it.
for i in range(0, len(huffman_string), 8):
chunk = huffman_string[i:i+8]
byte_chunk = int(chunk, 2).to_bytes(1, byteorder='big')
print(f'{chunk} {byte_chunk}')
10111100 b'\xbc' 01110101 b'u' 11101110 b'\xee' 10111101 b'\xbd' 00000111 b'\x07' 11100011 b'\xe3' 00001001 b'\t' 01001111 b'O' 11110100 b'\xf4' 00010101 b'\x15' 00110010 b'2' 11111110 b'\xfe' 01011010 b'Z' 10110100 b'\xb4' 11010000 b'\xd0' 10110111 b'\xb7' 00000101 b'\x05' 11001001 b'\xc9' 10100111 b'\xa7' 10110111 b'\xb7' 01110 b'\x0e'
Instead of printing it on screen, we want to save the compressed data.
We also need to save the Huffman coding table, otherwise there is no way we will be able to decompress them.
So we will create a file, in which we will save the Huffman coding table first, followed by the total number of uncompressed characters and the bytes of the Huffman encoding of our data.
We'll need the total number of uncompressed characters to ensure correct decompression.
We'll save these two (the table and the number of uncompressed characters) with Python's pickle facility.
For each character in the file, we'll get its Huffman encoding.
We need to pack the encodings into bytes in order to save them; to do that, we will pack the Huffman encodings into a buffer
variable, which we'll be breaking at 8 characters to create and output a byte.
Care must be taken in the last byte of the file, which may need to be padded to the right with zeroes, if the sum of the lengths of all Huffman encodings of the characters is not a multiple of 8.
import pickle
def huffman_compress(original_file, output_file):
pq = create_pq()
# First pass: count character occurrences.
symb2freq = Counter()
with open(original_file) as uncompressed_file:
for line in uncompressed_file:
symb2freq += Counter(line)
# Put the occurrences in a priority queue.
pq = create_pq()
for key, value in symb2freq.items():
insert_in_pq(pq, [value, [key, '']])
# Create the Huffman code.
hc = create_huffman_code(pq)
# Turn the code to a dictionary for easier lookup.
hc_table = { character: encoding for [character, encoding] in hc[1:]}
# Second pass: we'll read again the uncompressed file,
# we'll compress the contents and save them to the
# compressed file as we go.
with open(original_file) as uncompressed_file, \
open(output_file, 'wb') as compressed_file:
# First save the Huffman encoding...
pickle.dump(hc_table, compressed_file)
# then save the total number of characters in the original file.
pickle.dump(sum(symb2freq.values()), compressed_file)
# Use a buffer in which we will be adding the encoded characters;
# when the buffer has 8 bits or more we will output a byte and
# keep the remaining bits.
buffer = ''
for line in uncompressed_file:
for c in line:
# For each character, add the encoding to the buffer.
buffer += hc_table[c]
# Have we got enough stuff in the buffer to output a byte?
while len(buffer) >= 8:
# Yes, output a byte.
byte = int(buffer[:8], base=2).to_bytes(1, byteorder='big')
compressed_file.write(byte)
# Keep any remaining stuff in the buffer; that will go out
# with the next byte.
buffer = buffer[8:]
if len(buffer) > 0:
# If we have still remaining stuff, it means that part of the last
# character encoding was put in the previous byte, and part of it
# will go in the last byte; we'll pad zeroes to the end of it.
buffer = buffer.ljust(8, '0')
byte = int(buffer[:8], base=2).to_bytes(1, byteorder='big')
compressed_file.write(byte)
Decompressing a file follows the reverse logic.
We read the Huffman coding table and reverse it, in order to use it for decoding.
Then we read the number of expected decompressed characters.
We proceed with reading the file byte-by-byte, converting the bytes to bits, and decoding them as soon as we find a valid Huffman code for a sequence of bits.
def huffman_decompress(input_file, output_file):
with open(input_file, 'rb') as compressed_file,\
open(output_file, 'w') as decompressed_file:
# Read the Huffman table.
hc_table = pickle.load(compressed_file)
# Read the total number of uncompressed characters.
num_chars = pickle.load(compressed_file)
# Construct an inverse, Huffman decoding table.
hc_decoding_table = { v: k for (k, v) in hc_table.items() }
# Set a counter for the decompressed characters.
num_decompressed = 0
# Keep the Huffman code that we want to decode.
encoding = ''
# Read the file byte-by-byte.
byte = compressed_file.read(1)
while byte:
# For each byte, get its bit representation.
bit_repr = format(int.from_bytes(byte, byteorder='big'), '08b')
# Then read it bit-by-bit, extending the current encoding
# that we are trying to decode.
for bit in bit_repr:
encoding += bit
# Is this a valid Huffman encoding?
if encoding in hc_decoding_table:
# Yes, decompress it.
decompressed_file.write(hc_decoding_table[encoding])
num_decompressed += 1
# If we have decompressed the expected amount of
# characters, we are done; any leftover is just the
# padding of the last byte of the file.
if num_decompressed == num_chars:
break
encoding = ''
byte = compressed_file.read(1)
For convenience, here is a full program that compresses and decompresses files using the Huffman scheme.
The usage information is as follows:
usage: huffman.py [-h] [-d] input_file output_file
Huffman compression/decompression
positional arguments:
input_file Input file
output_file Output file
optional arguments:
-h, --help show this help message and exit
-d, --decompress Decompress
# %load huffman.py
from collections import Counter
import pickle
import argparse
def create_pq():
return []
def add_last(pq, c):
pq.append(c)
def root(pq):
return 0
def set_root(pq, c):
if len(pq) != 0:
pq[0] = c
def get_data(pq, p):
return pq[p]
def children(pq, p):
if 2*p + 2 < len(pq):
return [2*p + 1, 2*p + 2]
else:
return [2*p + 1]
def parent(p):
return (p - 1) // 2
def exchange(pq, p1, p2):
pq[p1], pq[p2] = pq[p2], pq[p1]
def insert_in_pq(pq, c):
add_last(pq, c)
i = len(pq) - 1
while i != root(pq) and get_data(pq, i) < get_data(pq, parent(i)):
p = parent(i)
exchange(pq, i, p)
i = p
def extract_last_from_pq(pq):
return pq.pop()
def has_children(pq, p):
return 2*p + 1 < len(pq)
def extract_min_from_pq(pq):
c = pq[root(pq)]
set_root(pq, extract_last_from_pq(pq))
i = root(pq)
while has_children(pq, i):
# Use the data stored at each child as the comparison key
# for finding the minimum.
j = min(children(pq, i), key=lambda x: get_data(pq, x))
if get_data(pq, i) < get_data(pq, j):
return c
exchange(pq, i, j)
i = j
return c
def create_huffman_code(pq):
while len(pq) > 1:
# Extract the two minimum items from the priority queue.
x = extract_min_from_pq(pq)
y = extract_min_from_pq(pq)
# Get all the [character, encoding] items associated with x;
# as x is the left child of the new node, prepend '0'
# to their encodings.
for pair in x[1:]:
pair[1] = '0' + pair[1]
# Do the same for y; as y is the right child of the
# new node, prepend '1' to their encodings.
for pair in y[1:]:
pair[1] = '1' + pair[1]
# Insert a new node with the sum of the occurrences
# of the two extracted nodes and the updated
# [character, encoding] sequences.
insert_in_pq(pq, [x[0] + y[0]] + x[1:] + y[1:])
return extract_min_from_pq(pq)
def huffman_compress(input_file, output_file):
pq = create_pq()
# First pass: count character occurrences.
symb2freq = Counter()
with open(input_file) as uncompressed_file:
for line in uncompressed_file:
symb2freq += Counter(line)
# Put the occurrences in a priority queue.
pq = create_pq()
for key, value in symb2freq.items():
insert_in_pq(pq, [value, [key, '']])
# Create the Huffman code.
hc = create_huffman_code(pq)
# Turn the code to a dictionary for easier lookup.
hc_table = { character: encoding for [character, encoding] in hc[1:]}
# Second pass: we'll read again the uncompressed file,
# we'll compress the contents and save them to the
# compressed file as we go.
with open(input_file) as uncompressed_file, \
open(output_file, 'wb') as compressed_file:
# First save the Huffman encoding.
pickle.dump(hc_table, compressed_file)
# Then save the total number of characters in the input file.
pickle.dump(sum(symb2freq.values()), compressed_file)
# Use a buffer in which we will be adding the encoded characters;
# when the buffer has 8 bits or more we will output a byte and
# keep the remaining bits.
buffer = ''
for line in uncompressed_file:
for c in line:
# For each character, add the encoding to the buffer.
buffer += hc_table[c]
# Have we got enough stuff in the buffer to output a byte?
while len(buffer) >= 8:
# Yes, output a byte
byte = int(buffer[:8], base=2).to_bytes(1, byteorder='big')
compressed_file.write(byte)
# Keep any remaining stuff in the buffer; that will go out
# with the next byte.
buffer = buffer[8:]
if len(buffer) > 0:
# If we have still remaining stuff, it means that part of the last
# character encoding was put in the previous byte, and part of it
# will go in the last byte; we'll pad zeroes to the end of it.
buffer = buffer.ljust(8, '0')
byte = int(buffer[:8], base=2).to_bytes(1, byteorder='big')
compressed_file.write(byte)
def huffman_decompress(input_file, output_file):
with open(input_file, 'rb') as compressed_file,\
open(output_file, 'w') as decompressed_file:
# Read the Huffman table.
hc_table = pickle.load(compressed_file)
# Read the total number of uncompressed characters.
num_chars = pickle.load(compressed_file)
# Construct an inverse, Huffman decoding table.
hc_decoding_table = { v: k for (k, v) in hc_table.items() }
# Set a counter for the decompressed characters.
num_decompressed = 0
# Keep the Huffman code that we want to decode.
encoding = ''
# Read the file byte-by-byte.
byte = compressed_file.read(1)
while byte:
# For each byte, get its bit representation.
bit_repr = format(int.from_bytes(byte, byteorder='big'), '08b')
# Then read it bit-by-bit, extending the current encoding
# that we are trying to decode.
for bit in bit_repr:
encoding += bit
# Is this a valid Huffman encoding?
if encoding in hc_decoding_table:
# Yes, decompress it.
decompressed_file.write(hc_decoding_table[encoding])
num_decompressed += 1
# If we have decompressed the expected amount of
# characters, we are done; any leftover is just the
# padding of the last byte of the file.
if num_decompressed == num_chars:
break
encoding = ''
byte = compressed_file.read(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=
'Huffman compression/decompression')
parser.add_argument('input_file', help='Input file')
parser.add_argument('output_file', help='Output file')
parser.add_argument('-d', '--decompress',
action='store_true',
help='Decompress',
default=False)
args = parser.parse_args()
if args.decompress:
huffman_decompress(args.input_file, args.output_file)
else:
huffman_compress(args.input_file, args.output_file)
If we run it on James Joyce's Ulysses, which is 1,520,795 bytes, we get a compressed version of 884,155 bytes; that is, we achieved a compression ratio of 58%.
We now turn to Lempel-Ziv-Welch (LZW) compression.
The code is much more straightforward than with Huffman encoding, because most of the underlying machinery is provided as-is by Python dictionaries.
Therefore the following function is a straightforward application of the algorithm described in the book.
def lzw_compress(message, nb, n):
"""
Perform LZW compression.
Parameters
----------
message : str
the message to compress
nb : int
the number of bits used for each encoding
n : int
the size of the alphabet
Returns
-------
compressed : list
The encoded message
"""
compressed = [] # list of encodings
max_code = 2**nb - 1 # size of the encoding table
# Initialize table with encodings for each character
# in the alphabet.
table = { chr(i): i for i in range(n) }
code = n # this is the encoding for the next unencoded ngram
w = "" # current ngram
for c in message:
wc = w + c # form new ngram
# If we have already encountered the new ngram
# prepare to add another character to it.
if wc in table:
w = wc
else:
# Otherwise we must put out the encoding of the
# existing ngram.
compressed.append(table[w])
# Start an ngram from the current character.
w = c
# Check if we can add the non-found ngram
# in the table and add it if possible.
if code <= max_code:
table[wc] = code
code += 1
# If we have finished the message, output the encoding
# of the current ngram.
if w:
compressed.append(table[w])
return compressed
We can now test it to see how it works:
compressed = lzw_compress("MELLOW YELLOW FELLOW", nb=8, n=2**7)
print(compressed)
[77, 69, 76, 76, 79, 87, 32, 89, 129, 131, 133, 70, 136, 132]
Similarly, LZW decompression is also a straightforward application of the algorithm in the book.
The only substantial change is that instead of building the output by concatenating strings, we use the io.StringIO()
class in Python, which allows us to write to a stream representing a string.
This is faster than performing a lot of concatenations.
import io
def lzw_decompress(compressed, nb, n):
"""
Perform LZW decompression.
Parameters
----------
compressed : list
the message to decompress
nb : int
the number of bits used for each encoding
n : int
the size of the alphabet
Returns
-------
result : str
The decompressed message
"""
max_code = 2**nb - 1 # size of the decoding table
# Initialize the decoding table with reverse encodings
# for each character in the alphabet.
table = { i : chr(i) for i in range(n) }
code = n # this is the encoding for the next unencoded ngram
result = io.StringIO()
# Output the first character.
c = compressed.pop(0)
v = table[c]
result.write(v)
pv = v # previous
# For each encoded value in the compressed message:
for c in compressed:
# If we know the corresponding ngram, get it.
if c in table:
v = table[c]
# If we do not know it, the corresponding ngram
# is really the previous ngram with its first
# character appended to its end.
else:
v = pv + pv[0]
result.write(v)
# If there is room in the decoding table:
if code <= max_code:
# add the new mapping to it.
table[code] = pv + v[0]
code += 1
pv = v
return result.getvalue()
We can test that it works by verifying that it decompresses our compressed message correctly.
decompressed = lzw_decompress(compressed, nb=8, n=2**7)
print(decompressed)
MELLOW YELLOW FELLOW
To see LZW in action, we try compressing a larger text, such as James Joyce's Ulysses.
We can try 12 bits for encoding and an encoding table of $2^8$ entries.
ulysses = open('ulysses.txt').read()
compressed_ulysses = lzw_compress(ulysses, 12, 2**8)
Then we can see how much we have actually saved.
We will multiply the length of compressed_ulysses
by 12 and compare with the length of the original text multiplied with 8 (8 bits ASCII).
s_compressed = 12 * len(compressed_ulysses)
s_original = 8 * len(ulysses)
print('bits for encoding: 12',
f'original size: {s_original}',
f'compressed size: {s_compressed}',
f'compression ratio: {s_compressed/s_original}')
bits for encoding: 12 original size: 12165552 compressed size: 6508548 compression ratio: 0.5349981653113645
Are 12 bits for a each encoding a good choice?
We can actually check.
We will compare the savings we have for a series of values for the number of encoding bits, from 8 to 24 (inclusive).
best_savings = 1
best_nbits = 8
for nbits in range(8, 25):
compressed_ulysses = lzw_compress(ulysses, nbits, 2**8)
s_compressed = nbits * len(compressed_ulysses)
s_original = 8 * len(ulysses)
savings = s_compressed / s_original
if savings < best_savings:
best_savings = savings
best_nbits = nbits
print(f'bits for encoding: {nbits}',
f'original size: {s_original}',
f'compressed size: {s_compressed}',
f'compression ratio: {savings}')
print(f'best savings:{best_savings} best number of encoding bits: {best_nbits}')
bits for encoding: 8 original size: 12165552 compressed size: 12165552 compression ratio: 1.0 bits for encoding: 9 original size: 12165552 compressed size: 8147061 compression ratio: 0.6696828060083093 bits for encoding: 10 original size: 12165552 compressed size: 7372070 compression ratio: 0.6059790792887984 bits for encoding: 11 original size: 12165552 compressed size: 6844981 compression ratio: 0.5626527263210087 bits for encoding: 12 original size: 12165552 compressed size: 6508548 compression ratio: 0.5349981653113645 bits for encoding: 13 original size: 12165552 compressed size: 6221514 compression ratio: 0.5114041680969347 bits for encoding: 14 original size: 12165552 compressed size: 5994072 compression ratio: 0.492708592261165 bits for encoding: 15 original size: 12165552 compressed size: 5791950 compression ratio: 0.47609430299586897 bits for encoding: 16 original size: 12165552 compressed size: 5564896 compression ratio: 0.45743062049301175 bits for encoding: 17 original size: 12165552 compressed size: 5465517 compression ratio: 0.44926173510252554 bits for encoding: 18 original size: 12165552 compressed size: 5462298 compression ratio: 0.4489971355183883 bits for encoding: 19 original size: 12165552 compressed size: 5743377 compression ratio: 0.4721016358320609 bits for encoding: 20 original size: 12165552 compressed size: 6045660 compression ratio: 0.49694909034953777 bits for encoding: 21 original size: 12165552 compressed size: 6347943 compression ratio: 0.5217965448670147 bits for encoding: 22 original size: 12165552 compressed size: 6650226 compression ratio: 0.5466439993844916 bits for encoding: 23 original size: 12165552 compressed size: 6952509 compression ratio: 0.5714914539019684 bits for encoding: 24 original size: 12165552 compressed size: 7254792 compression ratio: 0.5963389084194454 best savings:0.4489971355183883 best number of encoding bits: 18
We see that the best value is to use 18 bits for encoding.
With 8 bits we get no savings at all. That is because we use one byte for each encoding, we have 256 encodings, and our alphabet, 8-bit ASCII, has 256 characters. So we only encode unigrams.
As we increase the number of bits we use, we start using more and more ngrams.
The savings are increased, but after 18 bits they start decreasing.
That is because, although we use many ngrams, we squander too many bits for encoding unigrams and small n-grams.
In practice, we can usually only read and store whole bytes, so we settle down for using a multiple of 8 as the number of bits for encoding.
The following program performs LZW compression and decompression on files.
The usage information is as follows:
usage: lzw.py [-h] [-d] [-n NB] [-s SIZE] input_file output_file
LZW compression/decompression
positional arguments:
input_file Input file
output_file Output file
optional arguments:
-h, --help show this help message and exit
-d, --decompress decompress
-n NB, --nb NB number of bits of each table entry
-s SIZE, --size SIZE size of alphabet
The default values are 16 bits for each encoding and an alphabet size of 256.
# %load lzw.py
import argparse
def lzw_compress(input_file, output_file, nb, n):
"""
Perform LZW compression.
Parameters
----------
message : str
the message to compress
nb : int
the number of bits used for each encoding
n : int
the size of the alphabet
Returns
-------
compressed : list
The encoded message
"""
uncompressed_file = open(input_file)
compressed_file = open(output_file, 'wb')
max_code = 2**nb - 1 # size of the encoding table
# Initialize table with encodings for each character
# in the alphabet.
table = { chr(i): i for i in range(n) }
code = n # this is the encoding for the next unencoded ngram
# The necessary bytes to store nb bits nb // 8 rounded up to
# next integer; to do that, we add 7 to nb.
num_bytes = (nb + 7) // 8
w = "" # current ngram
for line in uncompressed_file:
for c in line:
wc = w + c # form new ngram
# If we have already encountered the new ngram
# prepare to add another character to it.
if wc in table:
w = wc
else:
# Otherwise we must put out the encoding of the
# existing ngram.
compressed_file.write(table[w].to_bytes(num_bytes,
byteorder='big'))
# Start an ngram from the current character.
w = c
# Check if we can add the non-found ngram
# in the table and add it if possible.
if code <= max_code:
table[wc] = code
code += 1
# If we have finished the input file, output the encoding of the
# current ngram.
if w:
compressed_file.write(table[w].to_bytes(num_bytes, byteorder='big'))
uncompressed_file.close()
compressed_file.close()
def lzw_decompress(input_file, output_file, nb, n):
"""
Perform LZW decompression.
Parameters
----------
compressed : list
the message to decompress
nb : int
the number of bits used for each encoding
n : int
the size of the alphabet
Returns
-------
result : str
The decompressed message
"""
max_code = 2**nb - 1 # size of the decoding table
# Initialize the decoding table with reverse encodings
# for each character in the alphabet.
table = { i : chr(i) for i in range(n) }
code = n # this is the encoding for the next unencoded ngram
# The necessary bytes to store nb bits nb // 8 rounded up to
# next integer; to do that, we add 7 to nb.
num_bytes = (nb + 7) // 8
compressed_file = open(input_file, 'rb')
decompressed_file = open(output_file, 'w')
# Output the first character.
bytes = compressed_file.read(num_bytes)
c = int.from_bytes(bytes, byteorder='big')
v = table[c]
decompressed_file.write(v)
pv = v # previous
# For each encoded value in the compressed message:
bytes = compressed_file.read(num_bytes)
while bytes:
c = int.from_bytes(bytes, byteorder='big')
# If we know the corresponding ngram, get it.
if c in table:
v = table[c]
# If we do not know it, the corresponding ngram
# is really the previous ngram with its first
# character appended to its end.
else:
v = pv + pv[0]
decompressed_file.write(v)
# If there is room in the decoding table:
if code <= max_code:
# add the new mapping to it.
table[code] = pv + v[0]
code += 1
pv = v
bytes = compressed_file.read(num_bytes)
decompressed_file.close()
compressed_file.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=
"LZW compression/decompression")
parser.add_argument('input_file', help='Input file')
parser.add_argument('output_file', help='Output file')
parser.add_argument("-d", "--decompress", help="decompress",
default=False,
action="store_true")
parser.add_argument("-n", "--nb", help="number of bits of each table entry",
type=int, default=16)
parser.add_argument("-s", "--size", help="size of alphabet",
type=int, default=2**8)
args = parser.parse_args()
if (args.decompress):
lzw_decompress(args.input_file, args.output_file, args.nb, args.size)
else:
lzw_compress(args.input_file, args.output_file, args.nb, args.size)