In this notebook, we'll build up a very simple simulator to generate payments data corresponding to legitimate and fraudulent transactions. (There are many ways you could improve this generator and we'll call some of them out.) We'll start by building up some functionality to run simulations in general.
The next function is all you need to run simple discrete-event simulations. Here's how to use it:
yield
a tuple consisting of an offset (the amount of time that has passed since the last event of that type) and a result (an arbitrary Python value associated with the event),simulate
function will yield the next event from all event streams indefinitely.import heapq
def simulate(event_generators, initial_time=0):
def setup_e(e, i):
offset, result = next(e)
return ((offset + i), result, e)
pq = [setup_e(event, initial_time)
for event in event_generators]
heapq.heapify(pq)
while True:
timestamp, result, event = pq[0]
offset, next_result = event.send(timestamp)
heapq.heappushpop(pq, (timestamp + offset, next_result, event))
yield (timestamp, result)
It may be easier to see how this works with an example. In the next three cells, we
from scipy import stats
def bedrockstream(mu, name):
while True:
offset, = stats.poisson.rvs(mu, size=1)
x = yield (offset, name)
sim = simulate([bedrockstream(10, "fred"),
bedrockstream(12, "betty"),
bedrockstream(20, "wilma"),
bedrockstream(35, "barney")])
for i in range(20):
print(next(sim))
There are a couple of small problems here:
We can solve both problems by using a separate generator for each user and seeding it. Scipy will let us create a stream of numbers sampled from a given distribution as an object, but it won't let us pass in a seed to the constructor. However, we can set the seed after we create the stream, like this:
import numpy as np
def det_bedrockstream(mu, name, seedpart=0xda7aba5e):
# scipy doesn't let us specify a seed in the constructor...
poisson = stats.poisson(mu)
# ...so we'll set one up after creating the object
seed = (hash(name) ^ seedpart) & (1 << 31 - 1)
poisson.random_state = np.random.default_rng(seed=seed)
while True:
offset, = poisson.rvs(size=1)
x = yield (offset, name)
sim = simulate([det_bedrockstream(10, "fred"),
det_bedrockstream(12, "betty"),
det_bedrockstream(20, "wilma"),
det_bedrockstream(35, "barney")])
for i in range(20):
print(next(sim))
The first thing we need to do is to decide what data we'll generate for each transaction. Some interesting possibilities include:
We'll also generate a label for each transaction (legitimate
or fraud
). We'll start with a very basic user event stream generator: all of the transactions we generate will be legitimate, and we won't do anything particularly interesting with most of the fields. We also won't bother making this very basic simulation deterministic.
MERCHANT_COUNT = 20000
# a small percentage of merchants account for most transactions
COMMON_MERCHANT_COUNT = MERCHANT_COUNT // 21
np.random.seed(0xda7aba5e)
common_merchants = np.random.choice(MERCHANT_COUNT,
size=COMMON_MERCHANT_COUNT,
replace=True)
def basic_user_stream(user_id, mu, seed=None):
favorite_merchants = np.random.choice(common_merchants,
size=len(common_merchants) // 5)
while True:
amount = 100.00
entry = "chip_and_pin"
foreign = False
merchant_id, = np.random.choice(favorite_merchants, size=1)
offset = stats.poisson.rvs(mu)
result = {
"user_id": user_id,
"amount": amount,
"merchant_id": merchant_id,
"entry": entry,
"foreign": foreign
}
yield (offset, ("legitimate", *result.values()))
sim = simulate([basic_user_stream(1, 700), basic_user_stream(2, 105), basic_user_stream(3, 40)])
for i in range(20):
print(next(sim))
basic_user_stream
so that they occasionally buy from any merchant.basic_user_stream
to model occasional international travel.We'll start by building a generator to build a mixture model we can use to make several kinds of transactions: small, medium, and large. We'll need to do a bit of extra work to make this generator deterministic, but we'll start by showing a nondeterministic version.
def nd_transaction_amounts(means, percentages, distribution=None):
size = 256
if distribution is None:
distribution = lambda m, sz: stats.gamma.rvs(a=1.1, scale=min(m, 750), loc=m, size=sz)
while True:
streams = [distribution(m * 100, size) for m in means]
stream = np.floor(np.choose(np.random.choice(len(means), p=percentages, size=size), streams)) / 100
yield from stream
import pandas as pd
import altair as alt
alt.data_transformers.enable('json')
amt = nd_transaction_amounts([5, 15, 50], [0.5, 0.35, 0.15])
amounts = [next(amt) for i in range(80000)]
source = pd.DataFrame({"amounts": amounts})
alt.Chart(source).mark_bar().encode(
alt.X("amounts", bin=alt.Bin(maxbins=100)),
y='count()'
)
We can also plot a broader distribution of transactions:
%%time
amt = nd_transaction_amounts([5, 10, 15, 20, 50, 100],
[0.35, 0.25, 0.15, 0.1, 0.1, 0.05])
amounts = [next(amt) for i in range(40000)]
source = pd.DataFrame({"amounts": amounts})
alt.Chart(source).mark_bar().encode(
alt.X("amounts", bin=alt.Bin(maxbins=100)),
y='count()',
)
Let's make that transaction-amount generator deterministic.
import time
def transaction_amounts(means, percentages, seed=None):
def mkgamma(m, rng):
while True:
yield from stats.gamma.ppf(rng.uniform(size=1024), a=1.1, scale=min(m, 750), loc=m)
if seed is None:
seed = int(time.time()) & ((1 << 32) - 1)
prng = np.random.default_rng(seed=seed)
distributions = [mkgamma(m * 100, prng) for m in means]
while True:
streams = [next(d) for d in distributions]
yield (np.floor(np.choose(prng.choice(len(means), p=percentages), streams)) / 100)
Finally, we'll sanity-check the deterministic version to make sure it behaves the same way as the non-determinstic version.
%%time
amt = transaction_amounts([5, 10, 15, 20, 50, 100],
[0.35, 0.25, 0.15, 0.1, 0.1, 0.05],
seed=0x12341234)
amounts = [next(amt) for i in range(40000)]
source = pd.DataFrame({"amounts": amounts})
alt.Chart(source).mark_bar().interactive().encode(
alt.X("amounts", bin=alt.Bin(maxbins=100)),
y='count()',
)
Next up, we'll make a generator to create the entry types.
You may have noticed that we need to do some extra work at the beginning of each generator function to make it deterministic -- specifically, something like this:
if seed is None:
seed = int(time.time()) & ((1 << 32) - 1)
prng = np.random.default_rng(seed=seed)
This code sets up a pseudorandom number generator, either seeded with an explicit value (if supplied) or with the time (if no seed is supplied). We shouldn't be writing this code more than once, so let's set up some code to automatically add this to every function that will need its own seeded PRNG. We'll use Python's decorator facility for this purpose.
def makeprng(func):
import time
def call_with_prng(*args, prng=None, seed=None, **kwargs):
if prng is None:
if seed is None:
seed = int(time.time()) & ((1 << 32) - 1)
prng = np.random.default_rng(seed=seed)
return func(*args, prng=prng, seed=seed, **kwargs)
return call_with_prng
Here's our decorator in action:
@makeprng
def legitimate_entry_types(prng=None, seed=None):
entry_types = ["contactless", "chip_and_pin", "swipe", "manual", "online"]
entry_probs = [0.25,0.2,0.15,0.05,0.35]
while True:
yield entry_types[prng.choice(len(entry_types), p=entry_probs)]
We'll also declare a simple function to make it easy to generate an instance of a SciPy distribution class with a given seed:
def makedist(dist_cls, seed=None, prng=None, **kwargs):
d = dist_cls(**kwargs)
d.random_state = (seed and seed) or prng.integers((1 << 32) - 1)
return d
...and one for selecting merchants (primarily a user's favorite merchants):
@makeprng
def merchant_stream(common_merchants, all_merchants, fav_percentage=0.2, probs=[0.6,0.37,0.03], prng = None, seed=None):
favorite_merchants = prng.choice(common_merchants,
size=int(len(common_merchants) * fav_percentage))
merchants = [favorite_merchants, common_merchants, all_merchants]
while True:
pool = merchants[prng.choice(len(merchants), p=probs)]
yield int(prng.choice(pool))
We can combine all of these to generate a stream of legitimate activity for a single user:
@makeprng
def legitimate_user_stream(user_id, transactions_per_day=12, start_timestamp=0, amount_means=[20,100,500], amount_probs=[0.9,0.075,0.025], prng=None, seed=None, common_merchants=None, merchant_count=20000):
if common_merchants is None:
# this case means that "common" merchants are unique to each user
common_merchants = prng.choice(merchant_count, size=merchant_count // 21)
amounts = transaction_amounts(amount_means, amount_probs, seed=prng.integers((1<<32)-1))
entry_types = legitimate_entry_types(seed=prng.integers((1<<32)-1))
merchants = merchant_stream(common_merchants, np.arange(merchant_count), seed=prng.integers((1<<32)-1))
SECONDS_PER_DAY = 86400
SECONDS_PER_HOUR = 60 * 60
loc = SECONDS_PER_DAY // transactions_per_day
p = 1 / (loc / 10)
geom = makedist(stats.geom, prng=prng, p=p, loc=loc)
# choose an arbitrary timezone offset (in seconds)
tz_offset = (user_id % 24) * SECONDS_PER_HOUR
WAKEUP_TIME = 7 * SECONDS_PER_HOUR
SLEEP_TIME = 21 * SECONDS_PER_HOUR
ts = start_timestamp
snooze = False
while True:
amount = next(amounts)
entry = next(entry_types)
foreign = entry == "online" and prng.choice([True, False], p=[0.4, 0.6])
merchant_id = next(merchants)
offset = geom.rvs()
localtime = (ts + tz_offset) % SECONDS_PER_DAY
if localtime < WAKEUP_TIME or localtime > SLEEP_TIME:
snooze = True
while snooze:
localtime = (ts + tz_offset + offset) % SECONDS_PER_DAY
if localtime > WAKEUP_TIME and localtime < SLEEP_TIME:
snooze = False
offset += geom.rvs()
result = {
"user_id": user_id,
"amount": amount,
"merchant_id": merchant_id,
"entry": entry,
"foreign": foreign
}
ts = yield (offset, ("legitimate", *result.values()))
sim = simulate([legitimate_user_stream(i, common_merchants=common_merchants) for i in [1,6,9,14]])
for i in range(200):
if i < 10 or i > 190:
print(next(sim))
elif i == 10:
print("...")
We can visualize the behavior of users in each time zone to see when they're awake:
prng = np.random.default_rng(seed=0xcfe1a77e)
seeds = prng.integers((1<<32) - 1, size=72)
sim = simulate([legitimate_user_stream(i, seed=seeds[i]) for i in range(72)])
results = [(offset, tup[1]) for offset, tup in [next(sim) for _ in range(60000)]]
source = pd.DataFrame({"hours": [(t[0] % 86400) // 3600 for t in results],
"tzs": [t[1] % 24 for t in results]})
alt.Chart(source).mark_area().encode(
alt.X("hours"),
y='count()', color="tzs"
).interactive()
We'll start with some basic assumptions:
These will guide our design of a fraudulent transaction generator. We'll simulate parallel and independent streams of legitimate and fraudulent transactions for each user.
@makeprng
def fraud_entry_types(prng=None, seed=None):
entry_types = ["contactless", "chip_and_pin", "swipe", "manual", "online"]
entry_probs = [0.05,0.05,0.05,0.35,0.5]
while True:
yield entry_types[prng.choice(len(entry_types), p=entry_probs)]
@makeprng
def fraudulent_user_stream(user_id, transactions_per_day = 12, transactions_per_burst=10, amount_means=[5,10,20], amount_probs=[0.2, 0.2, 0.6], prng=None, seed=None, merchant_count=20000):
amounts = transaction_amounts(amount_means, amount_probs, seed=prng.integers((1<<32) - 1))
entry_types = fraud_entry_types(seed=prng.integers((1<<32) - 1))
SECONDS_PER_DAY = 86400
loc = SECONDS_PER_DAY // (transactions_per_day * transactions_per_burst)
p = 1 / 10
poisson = makedist(stats.poisson, prng=prng, mu=transactions_per_burst)
foldnorm = makedist(stats.foldnorm, prng=prng, c=1.8, loc=SECONDS_PER_DAY * 30, scale=1 << 20)
geom = makedist(stats.geom, prng=prng, p=p, loc=loc)
while True:
# consider also np.floor(stats.gamma.rvs(a=6.4, loc=SECONDS_PER_DAY * 90, scale=SECONDS_PER_DAY, size=1))
fraud_delay = np.floor(foldnorm.rvs())
fraud_delay = max(int(fraud_delay), 1)
fraud_count = max(poisson.rvs(), 1)
ams = [next(amounts) for _ in range(fraud_count)]
ens = [next(entry_types) for _ in range(fraud_count)]
fs = prng.choice([True, False], p=[0.3, 0.7], size=fraud_count)
m_ids = prng.choice(merchant_count, size=fraud_count)
offsets = geom.rvs(size=fraud_count)
offsets[0] += fraud_delay
for offset, amount, merchant_id, entry, foreign in zip(offsets, ams, m_ids, ens, fs):
result = {
"user_id": user_id,
"amount": amount,
"merchant_id": merchant_id,
"entry": entry,
"foreign": foreign
}
yield (offset, ("fraud", *result.values()))
Let's sanity-check the output of our fraudulent transaction generator.
sim = simulate([fraudulent_user_stream(1, seed=123, merchant_count=MERCHANT_COUNT),
fraudulent_user_stream(2, seed=456, merchant_count=MERCHANT_COUNT),
fraudulent_user_stream(3, seed=789, merchant_count=MERCHANT_COUNT)])
for i in range(200):
v = next(sim)
if i < 10 or i > 190:
print(v)
elif i == 10:
print("...")
sim = simulate([legitimate_user_stream(1, seed=123, common_merchants=common_merchants, merchant_count=MERCHANT_COUNT),
legitimate_user_stream(2, seed=456, common_merchants=common_merchants, merchant_count=MERCHANT_COUNT),
legitimate_user_stream(3, seed=789, common_merchants=common_merchants, merchant_count=MERCHANT_COUNT),
fraudulent_user_stream(1, seed=321, merchant_count=MERCHANT_COUNT),
fraudulent_user_stream(2, seed=654, merchant_count=MERCHANT_COUNT),
fraudulent_user_stream(3, seed=987, merchant_count=MERCHANT_COUNT)])
count = 0
STEPS = 50000
for i in range(STEPS):
result = next(sim)
if result[1][0] == 'fraud':
count += 1
print("%.02f%% of transactions were fraudulent" % (count / STEPS * 100))
import time
import itertools
def setup(user_count = 10000, merchant_count = 20000, common_fraction = 21, seed = None):
# a small percentage of merchants account for most transactions
common_merchant_count = merchant_count // common_fraction
prng = np.random.default_rng(seed=(seed or 0xda7aba5e))
common_merchants = prng.choice(merchant_count,
size=common_merchant_count,
replace=True)
legitimate_user_seeds = prng.integers(1<<32, size=user_count)
fraud_user_seeds = prng.integers(1<<32, size=user_count)
legitimate_streams = [legitimate_user_stream(uid, seed=seed, transactions_per_day=5 + prng.integers(13),
common_merchants=common_merchants,
merchant_count=merchant_count,
amount_means=[3 * (prng.integers(7) + 1),
5 * (prng.integers(20) + 4),
20 * (prng.integers(35) + 4)])
for uid, seed in enumerate(legitimate_user_seeds)]
fraud_streams = [fraudulent_user_stream(uid, seed=seed, merchant_count=merchant_count)
for uid, seed in enumerate(fraud_user_seeds) if prng.integers(10) < 6]
print("%d legitimate users and %d fraud streams" % (len(legitimate_streams), len(fraud_streams)))
return simulate(itertools.chain(legitimate_streams, fraud_streams), initial_time=int(time.time()))
RECORD_COUNT = 50000000
simulation = setup(seed=0x20200213)
with open("fraud.csv", "w") as f:
f.write("timestamp,label,user_id,amount,merchant_id,trans_type,foreign\n")
for i in range(RECORD_COUNT):
v = next(simulation)
f.write(("%d," % v[0]) + ",".join([str(val) for val in v[1]]) + "\n")
pd.read_csv("fraud.csv").to_parquet("fraud-all.parquet")