2021-12-16

Created on Thursday, December 16, 2021.

Day 16 Binary decoding

Wow this is a long description. The first thing to catch my eye here is that some of the analysis is going to cross byte boundaries. So in the example packet, bits 7-12 are the first number, and crosses the 8bit boundary.

We're going to need to turn the hexedecimal start into a binary stream and then start processing that stream.

The second thing that I notice is that we've got the concept of a packet and a subpacket, and I'm immedietely suspicious that our subpackets themselves might contain their own subpackets. That could be recursive, so our packet parser is going to have to think about how to represent that.

Our question asks us to only create the sum of the packet versions.

This time, I think what I'm going to do is use some python classes, mostly because I haven't done that yet, and partly because this lets us use a pattern around decompposition. If we have a Packet class, it can contain a list of subpackets. Each packet class can encode the data we care about, so version, type. We can store data if there is any (the operators don't seem to have their own data, but literal value packets do). We can then ask a packet for it's VersionChecksum, which will add together all the VersionChecksums of the subpackets and add it;s own version.

Secondly, we're going to need a parse function that takes a set of bytes and parses them as a bitstream. There's several ways of doing this, but the easiest in python is to use the bitstring package. I thought I'd use it in Day 3 but I never ended up needing it. bitstream has got a neat ability to consume bits of the stream, make decisions and then interpret the next chunk of the stream.

Let's give the classes a define first and then try parsing a literal value from a class first

## Import ipytest and get it setup for use in Python Notebook
import pytest
import ipytest
ipytest.autoconfig()
class Packet:
    def __init__(self, version, typeid):
        self.__version = version
        self.__typeid = typeid

    def version(self):
        return self.__version

    def typeid(self):
        return self.__typeid

    def __eq__(self, other):
        return isinstance(other,Packet) and self.__version == other.__version and self.__typeid == other.__typeid

    def __repr__(self):
        return f"Packet({self.__version}, {self.__typeid})"


class LiteralPacket(Packet):
    def __init__(self, version, data):
        Packet.__init__(self, version, 4)
        self.data = data
    def __eq__(self, other):
        return isinstance(other,LiteralPacket) and Packet.__eq__(self, other) and self.data == other.data

    def __repr__(self):
        return f"LiteralPacket({Packet.__repr__(self), {self.data}})"

class OperatorPacket(Packet):
    def __init__(self, version, typeid, subpackets):
        Packet.__init__(self, version, typeid)
        self.subpackets = subpackets

    def version(self):
        return Packet.version(self)+sum([p.version() for p in self.subpackets])

    def __eq__(self, other):
        return isinstance(other,OperatorPacket) and Packet.__eq__(self, other) and self.subpackets == other.subpackets

    def __repr__(self):
        return f"OperatorPacket({Packet.__repr__(self)}, {self.subpackets})"

testliteral = LiteralPacket(6,2021)
print(testliteral)
assert testliteral.version() == 6
assert testliteral.data == 2021

testoperator = OperatorPacket(1,6,[LiteralPacket(6,10), LiteralPacket(2,20)])
print(testoperator)
assert testoperator.version() == 9

LiteralPacket(('Packet(6, 4)', {2021})) OperatorPacket(Packet(1, 6), [LiteralPacket(('Packet(6, 4)', {10})), LiteralPacket(('Packet(2, 4)', {20}))])

Ok, we've got some packets. We don't really understand what these operators are, so I'm storing TypeId for now, but I suspect that we'll end up mapping them to operations like add, subtract and so on.

Let's get onto parsing the bitstream.

Our most complex thing is that we're going to parse the bitstream, but if we need subpackets, we actually need a way to recurse into creating packets from the remainder of the stream before we create the operator object. We'll use recursion here. But there's an oddity, when we parse a given bitstream, we might return a series of packets.

Think about the example of an operator packet that contains two subpackets. We're going to parse the version, type and then our length, whuch says we have a further 27 bits containing multiple packets. We then need to consume the next 27 bits, and pass it to a function that can return us multiple packets from it.

At the outermost layer we can only have a single packet.

We can either have two functions doing something very similar, or we can have a recursive function and put up with the fact that we're going to return a list of packets from it. The second feels more sensible here.

import bitstring

def parsepacket(stream):
    packets = []
    try: 
        while True:
            version, typeid = stream.readlist("uint:3, 3")
            match typeid:
                case 4:
                    # Generate a literal packet
                    next = 1
                    value = bitstring.BitArray()
                    while next:
                        next, val = stream.readlist("uint:1, bits:4")
                        value.append(val)
                    packets.append(LiteralPacket(version, value.uint))
    except bitstring.ReadError:
        return packets

assert [LiteralPacket(6,2021)] == parsepacket(bitstring.BitStream(hex="D2FE28"))

Ok, we've got something that can parse literal packets, but operator packets are a bit more complex because there's two different mechanisms for storing subpackets. We're going to read the length field and switch those modes and see if we can get the example operator packets to parse

def parsepacket(stream, packetstoread=-1):
    packets = []
    try: 
        while packetstoread != 0:
            version, typeid = stream.readlist("uint:3, 3")
            match typeid:
                case 4:
                    # Generate a literal packet
                    next = 1
                    value = bitstring.BitArray()
                    while next:
                        next, val = stream.readlist("uint:1, bits:4")
                        value.append(val)
                    packets.append(LiteralPacket(version, value.uint))
                case typeid:
                    match stream.read("uint:1"):
                        case 0:
                            totallength = stream.read("uint:15")
                            subpackets = parsepacket(stream.read(f"bits:{totallength}"))
                            packets.append(OperatorPacket(version, typeid, subpackets))
                        case 1:
                            totalpackets = stream.read("uint:15")
                            subpackets = parsepacket(stream, totalpackets)
                            packets.append(OperatorPacket(version, typeid, subpackets))
    except bitstring.ReadError:
        return packets

assert [LiteralPacket(6,2021)] == parsepacket(bitstring.BitStream(hex="D2FE28"))
assert [OperatorPacket(1,6,[LiteralPacket(6,10), LiteralPacket(2,20)])] == parsepacket(bitstring.BitStream(hex="38006F45291200"))
assert [OperatorPacket(7,3,[LiteralPacket(0,1), LiteralPacket(0,2), LiteralPacket(0,3)])] == parsepacket(bitstring.BitStream(hex="EE00D40C823060"))

---------------------------------------------------------------------------

AssertionError Traceback (most recent call last)

/tmp/ipykernel_61/2286271539.py in 28 assert [LiteralPacket(6,2021)] == parsepacket(bitstring.BitStream(hex="D2FE28")) 29 assert [OperatorPacket(1,6,[LiteralPacket(6,10), LiteralPacket(2,20)])] == parsepacket(bitstring.BitStream(hex="38006F45291200")) ---> 30 assert [OperatorPacket(7,3,[LiteralPacket(0,1), LiteralPacket(0,2), LiteralPacket(0,3)])] == parsepacket(bitstring.BitStream(hex="EE00D40C823060"))

AssertionError: assert [OperatorPacket(Packet(7, 3), [LiteralPacket(('Packet(0, 4)', {1})), LiteralPacket(('Packet(0, 4)', {2})), LiteralPacket(('Packet(0, 4)', {3}))])] == [OperatorPacket(Packet(7, 3), [OperatorPacket(Packet(0, 0), [])])] + where [OperatorPacket(Packet(7, 3), [OperatorPacket(Packet(0, 0), [])])] = <function parsepacket at 0x7f16b2cab010>(BitStream('0xee00d40c823060', pos=51)) + where BitStream('0xee00d40c823060', pos=51) = <class 'bitstring.BitStream'>(hex='EE00D40C823060') + where <class 'bitstring.BitStream'> = <module 'bitstring' from '/usr/local/lib/python3.10/site-packages/bitstring.py'>.BitStream

Ok, this is a really interesting error, so rather than debug it, for explanatory purposes I'm going to leave this here as it is.

The examples are a little frustrating as they often don't give the version numbers or operator type in the examples, so I've put 0's into those and then I'd inspect the output and validate it.

What we've got here is the first operator packat works, so we're parsing the operator and getting the subpart of the stream, but the second operator packet, using the length type of 1 is not quite working correctly. It seems to be returning OperatorPacket(Packet(0,0) []).

Now I'm hoping that in python, if we recurse into parsepacket to read the stream, it will correctly advance the stream and return correctly....

And I've literally just written that and realised that my return value is in the exception handler, not at the end of teh stream. So far, I was somewhat expecting the process to end because we've reached the end of teh stream. This is the first time that we're ending because I hit the "number of packets to read" variable reaching 0, and as such, there's no exception, just None.

Let's fix that and see if that solves my problems

def parsepacket(stream, packetstoread=-1, debug=""):
    recursedebug = debug
    if debug: recursedebug = debug+"  "
    packets = []
    try: 
        while packetstoread != 0:
            if debug: print(f"{debug}START {packetstoread}: {stream.peeklist('bin:3,bin:3')}=>{stream.peeklist('uint:3,3')}")
            version, typeid = stream.readlist("uint:3, 3")
            match typeid:
                case 4:
                    # Generate a literal packet
                    next = 1
                    value = bitstring.BitArray()
                    while next:
                        if debug: print(f"{debug}LITERAL: {stream.peeklist('bin:1,bin:4')}=>{stream.peeklist('uint:1,bin:4')}")
                        next, val = stream.readlist("uint:1, bits:4")
                        value.append(val)
                    if debug: print(f"{debug}LITERAL: {value.bin}=>{value.uint}")
                    packets.append(LiteralPacket(version, value.uint))
                case typeid:
                    match stream.read("uint:1"):
                        case 0:
                            if debug: print(f"{debug}OPERATOR length type 0: {stream.peeklist('bin:15')}=>{stream.peeklist('uint:15')}")
                            totallength = stream.read("uint:15")
                            if debug: print(f"{debug}OPERATOR parse: {stream.peeklist('bin:'+str(totallength))}")
                            subpackets = parsepacket(stream.read(f"bits:{totallength}"), debug=recursedebug)
                            packets.append(OperatorPacket(version, typeid, subpackets))
                        case 1:
                            if debug: print(f"{debug}OPERATOR length type 1: {stream.peeklist('bin:11')}=>{stream.peeklist('uint:11')}")
                            totalpackets = stream.read("uint:11")
                            subpackets = parsepacket(stream, totalpackets, debug=recursedebug)
                            packets.append(OperatorPacket(version, typeid, subpackets))
            packetstoread-=1
    except bitstring.ReadError:
        if debug: print(f"{debug}EXCEPT {packetstoread}: ReadError - End of stream")
    return packets

assert [LiteralPacket(6,2021)] == parsepacket(bitstring.BitStream(hex="D2FE28"), debug="1 ")
assert [OperatorPacket(1,6,[LiteralPacket(6,10), LiteralPacket(2,20)])] == parsepacket(bitstring.BitStream(hex="38006F45291200"), debug="2 ")
assert [OperatorPacket(7,3,[LiteralPacket(2,1), LiteralPacket(4,2), LiteralPacket(1,3)])] == parsepacket(bitstring.BitStream(hex="EE00D40C823060"), debug="3 ")

1 START -1: ['110', '100']=>[6, 4] 1 LITERAL: ['1', '0111']=>[1, '0111'] 1 LITERAL: ['1', '1110']=>[1, '1110'] 1 LITERAL: ['0', '0101']=>[0, '0101'] 1 LITERAL: 011111100101=>2021 1 EXCEPT -2: ReadError - End of stream 2 START -1: ['001', '110']=>[1, 6] 2 OPERATOR length type 0: ['000000000011011']=>[27] 2 OPERATOR parse: ['110100010100101001000100100'] 2 START -1: ['110', '100']=>[6, 4] 2 LITERAL: ['0', '1010']=>[0, '1010'] 2 LITERAL: 1010=>10 2 START -2: ['010', '100']=>[2, 4] 2 LITERAL: ['1', '0001']=>[1, '0001'] 2 LITERAL: ['0', '0100']=>[0, '0100'] 2 LITERAL: 00010100=>20 2 EXCEPT -3: ReadError - End of stream 2 START -2: ['000', '000']=>[0, 0] 2 EXCEPT -2: ReadError - End of stream 3 START -1: ['111', '011']=>[7, 3] 3 OPERATOR length type 1: ['00000000011']=>[3] 3 START 3: ['010', '100']=>[2, 4] 3 LITERAL: ['0', '0001']=>[0, '0001'] 3 LITERAL: 0001=>1 3 START 2: ['100', '100']=>[4, 4] 3 LITERAL: ['0', '0010']=>[0, '0010'] 3 LITERAL: 0010=>2 3 START 1: ['001', '100']=>[1, 4] 3 LITERAL: ['0', '0011']=>[0, '0011'] 3 LITERAL: 0011=>3 3 EXCEPT -2: ReadError - End of stream

Ok, there we go. I added in some debugging. In this case because we are recursing, the debugging can be a pain to read normally. Because I prefix the print lines with the debug string, I get indention and line identification, but further when I recurse, we further indent the debugging lines for the each level of recursion, that'll make it far easier to read if I have to debug some of the multi level nested operators in the future.

The issue here incidentally was about 3 main things. I wasn't returning the packets as I thought, but that wasn't all I wasn't actually decrementing the packetstoread counter either and finally, I was trying to read 15 bits of length, which was resulting in consuming the next 4 bits of the first packet, resulting in some odd packets.

The debugging really showed this up, especially printing out the length fiels which made me go "huh, parsing 15,000 odd packets, that can't be right".

With this done, let's try on some test data and validate that our version calculations work as well

assert 6 == parsepacket(bitstring.BitStream(hex="D2FE28"))[0].version()
assert 9 == parsepacket(bitstring.BitStream(hex="38006F45291200"))[0].version()
assert 14 == parsepacket(bitstring.BitStream(hex="EE00D40C823060"))[0].version()
assert 16 == parsepacket(bitstring.BitStream(hex="8A004A801A8002F478"))[0].version()
assert 12 == parsepacket(bitstring.BitStream(hex="620080001611562C8802118E34"))[0].version()
assert 23 == parsepacket(bitstring.BitStream(hex="C0015000016115A2E0802F182340"))[0].version()
assert 31 == parsepacket(bitstring.BitStream(hex="A0016C880162017C3686B18A3D4780"))[0].version()

Ok, that works, let's try the production data

hexvalue = open("day16.txt").read()
packets = parsepacket(bitstring.BitStream(hex=hexvalue))
print(packets[0].version())

873

Part 2 - Operating on thin ground

Now we know how to parse packets, time to implement them. This is where our classes are going to help.

We're going to say that all packets have a value, and each subclass must implement its value method according to the list.

We then need to update the parser to create not just Operator packets, but the right kind of operator packet, but that shouldn't be too complex. We might want to refactor out the shared logic of parsing the subpackets though.

import math

class Packet:
    name = "Packet"
    typeid = 0

    def __init__(self, version):
        self.version = version

    def __eq__(self, other):
        return isinstance(other,Packet) and self.version == other.version and self.typeid == other.typeid

    def __repr__(self):
        return f"{self.name}-v{self.version}"

    def value(self):
        raise NotImplemented


class LiteralPacket(Packet):
    name = "Literal"
    typeid = 4
    def __init__(self, version, data):
        Packet.__init__(self, version)
        self.data = data
    def __eq__(self, other):
        return isinstance(other,LiteralPacket) and Packet.__eq__(self, other) and self.data == other.data

    def __repr__(self):
        return f"{Packet.__repr__(self)} data=({self.data})"

    def value(self):
        return self.data

class OperatorPacket(Packet):
    name="Operator"
    typeid= 0
    def __init__(self, version, subpackets):
        Packet.__init__(self, version)
        self.subpackets = subpackets

    def version(self):
        return Packet.version(self)+sum([p.version() for p in self.subpackets])

    def __eq__(self, other):
        return isinstance(other,OperatorPacket) and Packet.__eq__(self, other) and self.subpackets == other.subpackets

    def __repr__(self):
        return f"{Packet.__repr__(self)} subpackets=[{self.subpackets}]"

class SumPacket(OperatorPacket):
    name="Sum"
    typeid=0

    def value(self):
        return sum([packet.value() for packet in self.subpackets])

class ProductPacket(OperatorPacket):
    name="Product"
    typeid=1

    def value(self):
        return math.prod([packet.value() for packet in self.subpackets])

class MinPacket(OperatorPacket):
    name="Min"
    typeid=2

    def value(self):
        return min([packet.value() for packet in self.subpackets])

class MaxPacket(OperatorPacket):
    name="Max"
    typeid=3

    def value(self):
        return max([packet.value() for packet in self.subpackets])

class GTPacket(OperatorPacket):
    name="GT"
    typeid=5

    def value(self):
        if self.subpackets[0].value() > self.subpackets[1].value():
            return 1
        return 0

class LTPacket(OperatorPacket):
    name="LT"
    typeid=6

    def value(self):
        if self.subpackets[0].value() < self.subpackets[1].value():
            return 1
        return 0

class EqPacket(OperatorPacket):
    name="Eq"
    typeid=7

    def value(self):
        if self.subpackets[0].value() == self.subpackets[1].value():
            return 1
        return 0


testliteral = LiteralPacket(6,2021)
testliteral2 = LiteralPacket(2,2022)
assert SumPacket(1, [testliteral, testliteral2]).value() == 4043
assert ProductPacket(1, [testliteral, testliteral2]).value() == 2021*2022
assert MinPacket(1, [testliteral, testliteral2]).value() == 2021
assert MaxPacket(1, [testliteral, testliteral2]).value() == 2022
assert GTPacket(1, [testliteral, testliteral2]).value() == 0
assert GTPacket(1, [testliteral2, testliteral]).value() == 1
assert LTPacket(1, [testliteral, testliteral2]).value() == 1
assert LTPacket(1, [testliteral2, testliteral]).value() == 0
assert EqPacket(1, [testliteral, testliteral2]).value() == 0
assert EqPacket(1, [testliteral, testliteral]).value() == 1

Now we need to parse our new packets out. Note that there's a refactoring we could do here, I'll come back to that later I think.

def readsubpackets(stream, packetstoread=-1, debug=""):
    recursedebug = debug
    if debug: recursedebug = debug+"  "
    match stream.read("uint:1"):
        case 0:
            if debug: print(f"{debug}OPERATOR length type 0: {stream.peeklist('bin:15')}=>{stream.peeklist('uint:15')}")
            totallength = stream.read("uint:15")
            if debug: print(f"{debug}OPERATOR parse: {stream.peeklist('bin:'+str(totallength))}")
            return parsepacket(stream.read(f"bits:{totallength}"), debug=recursedebug)
        case 1:
            if debug: print(f"{debug}OPERATOR length type 1: {stream.peeklist('bin:11')}=>{stream.peeklist('uint:11')}")
            totalpackets = stream.read("uint:11")
            return parsepacket(stream, totalpackets, debug=recursedebug)

def parsepacket(stream, packetstoread=-1, debug=""):
    packets = []
    try: 
        while packetstoread != 0:
            if debug: print(f"{debug}START {packetstoread}: {stream.peeklist('bin:3,bin:3')}=>{stream.peeklist('uint:3,3')}")
            version, typeid = stream.readlist("uint:3, 3")
            match typeid:
                case 0:
                    subpackets = readsubpackets(stream, packetstoread, debug)
                    packets.append(SumPacket(version, subpackets))
                case 1:
                    subpackets = readsubpackets(stream, packetstoread, debug)
                    packets.append(ProductPacket(version, subpackets))
                case 2:
                    subpackets = readsubpackets(stream, packetstoread, debug)
                    packets.append(MinPacket(version, subpackets))
                case 3:
                    subpackets = readsubpackets(stream, packetstoread, debug)
                    packets.append(MaxPacket(version, subpackets))
                case 4:
                    # Generate a literal packet
                    next = 1
                    value = bitstring.BitArray()
                    while next:
                        if debug: print(f"{debug}LITERAL: {stream.peeklist('bin:1,bin:4')}=>{stream.peeklist('uint:1,bin:4')}")
                        next, val = stream.readlist("uint:1, bits:4")
                        value.append(val)
                    if debug: print(f"{debug}LITERAL: {value.bin}=>{value.uint}")
                    packets.append(LiteralPacket(version, value.uint))
                case 5:
                    subpackets = readsubpackets(stream, packetstoread, debug)
                    packets.append(GTPacket(version, subpackets))
                case 6:
                    subpackets = readsubpackets(stream, packetstoread, debug)
                    packets.append(LTPacket(version, subpackets))
                case 7:
                    subpackets = readsubpackets(stream, packetstoread, debug)
                    packets.append(EqPacket(version, subpackets))
            packetstoread-=1
    except bitstring.ReadError:
        if debug: print(f"{debug}EXCEPT {packetstoread}: ReadError - End of stream")
    return packets

assert [LiteralPacket(6,2021)] == parsepacket(bitstring.BitStream(hex="D2FE28"))
assert [LTPacket(1,[LiteralPacket(6,10), LiteralPacket(2,20)])] == parsepacket(bitstring.BitStream(hex="38006F45291200"))
assert [MaxPacket(7,[LiteralPacket(2,1), LiteralPacket(4,2), LiteralPacket(1,3)])] == parsepacket(bitstring.BitStream(hex="EE00D40C823060"))

Ok, that seems to parse and setup correctly. Let's see if the values work as expected in the test cases

assert 2021 == parsepacket(bitstring.BitStream(hex="D2FE28"))[0].value()
assert 1 == parsepacket(bitstring.BitStream(hex="38006F45291200"))[0].value()
assert 3 == parsepacket(bitstring.BitStream(hex="EE00D40C823060"))[0].value()

tests = [
    ["C200B40A82", 3],
    ["04005AC33890", 54],
    ["880086C3E88112", 7],
    ["CE00C43D881120", 9],
    ["D8005AC2A8F0", 1],
    ["F600BC2D8F", 0],
    ["9C005AC2F8F0", 0],
    ["9C0141080250320F1802104A08", 1],
]

for test in tests:
    assert test[1] == parsepacket(bitstring.BitStream(hex=test[0]))[0].value()

Ok, let's try running that production program

hexvalue = open("day16.txt").read()
packets = parsepacket(bitstring.BitStream(hex=hexvalue))
print(packets[0].value())

402817863665

That worked.

I talked about a refactoring that we could do.

We're storing the typeid in the class, as typeid. We could also build a simple lookup table, so that we cna fetch the appropriate class based on the typeid.

That way we could simply do

version, typeid = stream.readlist("uint:3, 3")
if typeid == 4:
    # get data here
    packet = getPacketType(typeid)(version, data)
else:
    # get subpackets
    packet = getPacketType(typeid)(version, subpackets)

One could also try to abstract away the getsubpackets and get data into a routine that can return either, and then we can pass the result to the packet constructor and hope it works properly.

But I'm otherwise happy with this.

One thing to note, due to debugging this, I went back to edit the class definitions slightly. In particular I changed the repr implementation in Packet to print itself out and rely on data that can be overridden by the subclasses. This is cleaner and means less typing and repetition as we create all the subclasses.

I wonder if we'll see these data types another day in advent?

Next

2021-12-17

Previous

2021-12-15