reliable multicast for replicated-cache update

https://www.usenix.org/conference/usits-99/scalable-web-caching-frequently-updated-objects-using-reliable-multicast is a 1999 research paper. I hope by now multicast has grown more mature more proven. Not sure where this is used, perhaps within certain network boundaries such as a private network of data servers.

This paper examines reliable multicast for invalidation and delivery of popular, frequently updated objects to web cache proxies.

Advertisements

static horizontal sharding: drawbacks #Rahul

Eg of static horizontal sharding — by-symbol. NYSE, OPRA…

Rahul told me that outside finance sector, many companies (esp. west coast) are cautious about static sharding. Over time, One shard can become extremely hot while other shards’ resources stay underutilized.

Rahul said it’s a hassle to change static sharding config. Logistically challenging. Requires phased restarts.

As a result, many tech companies use static horizontal sharding very cautiously, only with valid reasons.

Dynamic sharding is a new concept to me. I think it’s like … based on load level, we could shift one “topic” between shards.

too many DB-writes: sharding insufficient #Indeed

Context: each company receives many many reviews. In a similar scenario, we can say that too many user comments flood in during the soccer world cup.

Aha — the updates don’t need to show up on browser in strict order

Aha — commenting user only need to see her own comment on top, and need not see other users’ latest comments. The comments below her own could be browser-cached content. Ajax…

Interviewer: OK you said horizontal sharding by company id can address highly concurrent data store updates. Now what if one company, say, Amazon, by itself gets 20% of the updates so sharding can’t help this one company.

me: I guess the update requests would block and possibly time out. We can use a task queue.

This is similar to WordPress import/export requests.

  • Each task takes a few seconds, so we don’t want user to wait.
  • If high server load, the wait could be longer.
  • More important — this task need not be immediate. It can be scheduled.
  • We should probably optimize for server efficiency rather than latency or throughput

So similarly, all the review submissions on Amazon can be queued and scheduled.

In FIX protocol, an order sender can receive 150=A meaning PendingNew. I call it “queued for exchange”.  The OMS Server sends the acknowledgement in two steps.

  1. Optional. Execution Report message on order placement in OMS Server’s Order Book. ExecType (150) = A (Pending New)
  1. Execution Report message on receiving acknowledgement from exchange. ExecType (150) = 0 (New). I guess this indicates placement in exchange order book

Note this optimization is completely different from HFT latency engineering.

##RDBMS strategies4faster read+write #widely useful#Sunil

(Further to our chat in mid Apr 2019…) Hi Sunil,

I generally agree that RDBMS such as Sybase, Oracle, DB2, MSSQL, … have good performance for Select and slower performance for Inserts, as you concluded in our last call. However, this conclusion is full of ifs and buts.

  • Read performance can be improved with partitioning — Imagine if the table is relatively large like 100 million rows. Even if the queries use an index, the index can have fairly large footprint (with implications). Often the Asia users only query the Asia records while Europe users only query Europe records.
  • Read performance can be improved with pre-computed data tables. If the same calculation is frequently requested via some frequent queries, then the calculation results can be saved in a pre-computed result table.
  • Read performance can be improved with stored procedures.
  • Read performance can benefit from de-normalization.
  • Read performance can improve if entire table is in-memory, as confirmed by my 2009 DB2 trainer in NY.

Now I think most RDBMS performance tuning techniques target Select as slow Select is the most common pain and the most severe pain.

Insert is typically much slower than read, but user expectation is also less demanding. In my observation, most RDBMS databases are either mostly-select or mostly-insert. The mostly-insert database can benefit from batch insert (bcp in sybase/MSSQL), or a background writer thread (Gemfire).

However, sometimes real-time insert is needed. I think the most common strategy is sharding (horizontal partitioning) like splitting 100 million rows into two tables of 50 millions each.

A related strategy is normalization (vertical partitioning). Normalization removes duplicate data and helps inserts.

sharding:=horizontal partition #too many rows

Assuming a 88-column, 6mil-row table, “Horizontal” means horizontally pushing a knife across all 88 columns, splitting 6 million rows into 3 millions each.

Sharding can work on noSQL too.

GS PWM positions table had 9 horizontal partitions for 9 regions.

“Partitioning” is a more generic term and can mean 1) horizontal (sharding) or 2) vertical cutting like normalization.

How RTS achieved 400-700 KMPS #p2p

Official benchmark result – 700 KMPS per instance of rebus or parser. Here are some enabling features:

  • [! iii] mostly single-threaded apps. Some older parsers are multi-threaded but each thread operating in single threaded mode. No shared mutable:)
    • In the order book replication engine, we use lockfree in one place only
  • ! (ST mode loves) Stateless [1] parser, small footprint. Only downstream component (Rebus) handles cancels/busts, modifications.. So we can run many parser instances per machine, or many threads per instance, fully utilizing the cpu cores. See https://haydenjames.io/linux-performance-almost-always-add-swap-space/
  • [! ii] allocation avoided — on millions of Output message objects. Pre-allocated ring buffer eliminates new(). very few and small STL containers .. mostly arrays… pre-allocate DTOs@SOD #HFT
  • ! allocation avoided — on millions of Input message objects, thanks to reinterpret_cast() on pointers… nearly Zero-copy. See reinterpret_cast^memcpy in raw mktData parsing
  • ! allocation avoided — custom containers to replace STL containers used, since they all allocate from heap
  • ! p2p messaging beats MOM 
  • ! Socket buffer tuning — to cope with busts. 64-256MB receive buffer in kernel; 4MB UserModeBuffer
  • low memory footprint. Most parsers use around 60MB. (My parsers was 100 to 150MB.) I think there are many benefits in terms of latency.
  • epoll — to replace select() with 2 orders of magnitude improve in throughput
  • buffering of Output list (of messages). I guess this hurts latency but enhances throughput
  • Very fast duplicate seq check, without large history data — a hot function
  • large containers like the ring buffer are pre-sized. No reallocation.
  • mostly array-based data structures — cache-friendly
  • Hot functions use pbref or RVO or move semantics, minimizing stack allocations
  • aggressively simplified parsing. Minimal logic
  • Special Logger to reduce I/O cost
  • Sharding to speed up symbol lookup
  • kernel bypass : RTS
  • No virtual functions — enhancing data cache and inlining .. 3 overheads@vptr #ARM
  • Inline
  • —-Speed with control
  • Best of both to reduce the real risk of our connection becoming slow or lossy
  • Depth monitor to detect missed messages
  • Capacity tracker to monitor mps rates
  • Missed seq reporting
  • [!=one of the really effective techniques in RTS]
  • [i=presented in multiple (!!, !!!) interviews]

[1] parser does remembers the sequence numbers. In fact sequence number management is crucial. Before retrans was added to parser, we never sent any retrans request. We did keep track of gaps. Mostly we used the gap-tracker to check duplicates across Line A/B. We also relied on our sequence number state to handle sequence resets.

OPRA: name-based sharding by official feed provider

2008(GFC)peak OPRA msg rate — Wikipedia “low latency” article says 1 million updates per second. Note My NYSE parser can handle 370,000 messages per second per thread !

https://www.opradata.com/specs/48_Line_Notification_Common_IP_Multicast_Specification.pdf shows 48 multicast groups each for a subset of the option symbols. When there were 24 groups, the symbols starting with RU to SMZZZ were too heavy too voluminous for one multicast group, more so than the other 23 groups.

Our OPRA parser instance is simple and efficient (probably stateless) so presumably capable of handling multiple OPRA multicast groups per instance. We still use one parser per MC group for simplicity and ease of management.

From: Chen, Tao
Sent: Tuesday, May 30, 2017 8:24 AM
To: Tan, Victor
Subject: RE: OPRA feed volume

Opra data is provided by SIAC(securities industry automation corporation). The data is disseminated on 53 multicast channels. TP runs 53 instances of parser and 48 instances of rebus across 7 servers to handle it.

cloud DB sharding – phrasebook#eg PaaS

I read quite a few books touching on sharding, but one book puts it succinctly into the big picture of noSQL, big data, web2.0 and cloud — [[cloud architecture patterns]]

PWM partition — The position database in PWM was partitioned into regions like Asia, Europe, Latam, …  This is an early example of sharding.
custom built — early sharding tends to be custom-built, like PWM. Nowadays standard sharding support is available in many databases, so no more custom-built.
noSQL and SQL databases — both support sharding
Autonomous — Shards do not depend on (or reference) each other.
Static data tables — are not sharded but replicated. 

big data tech feature: scale out

Scalability is driven by one of the 4 V’s — Velocity, aka throughput.

Disambiguation: having many machines to store the data as readonly isn’t “scalability”. Any non-scalable solution could achieve that without effort.

Big data often requires higher throughput than RDBMS could support. The solution is horizontal rather than vertical scalability.

I guess gmail is one example. Requires massive horizontal scalability. I believe RDBMS also has similar features such as partitioning, but not sure if is economical. See posts on “inexpensive hardware”.

The Oracle nosql book suggests noSQL compared to RDBMS, is more scalable — 10 times or more.

RDBMS can also scale out — PWM used partitions.

#1 database scale-out technique (financial+

Databases are easy to scale-up but hard to scale-out.

Most databases get the biggest server in the department as it’s memory intensive, I/O intensive and cpu-intensive, taking up all the kernel threads available. When load grows, it’s easy to buy a bigger server — so-called scale-up.

Scale-out is harder, whereby you increase throughput linearly by adding nodes. Application scale-out is much easier, so scalable architects should take early precautions.

If DB is read-mostly, then you are lucky. Just add slave nodes. If unlucky, then you need to scale-out the master node.

Most common DB scale-out technique is shard or HORIZONTAL partitioning (cut horizontally, left to right). Simplest example — federated table, one partition per year.

I worked in a private wealth system. biggest DB is the client sub-ledger, partitioned by geographical region into 12 partitions.

We also used Vertical partitioning — multiple narrow tables.

Both vertical and horizontal partitions can enhance performance.

gemfire write-behind and gateway queue #conflation, batched update

http://community.gemstone.com/display/gemfire60/Database+write-behind+and+read-through says (simplified by me) —
In the Write-Behind mode, updates are asynchronously written to DB. GemFire uses Gateway Queue. Batched DB writes. A bit like a buffered file writer.

With the asynch gateway, low-latency apps can run unimpeded. See blog on offloading non-essentials asynchronously.

GemFire’s best known use of Gateway Queue technology is for the distribution/propagation of cache update events between clusters separated by a WAN (thus they are referred to as ‘WAN Gateways’).

However, Gateways are designed to solve a more fundamental integration problem shared by both disk and network IO — 1) disk-based databases and 2) remote clusters across a WAN. This problem is the impedance mismatch when update rates exceed absorption capability of downstream. For remote WAN clusters the impedance mismatch is network latency–a 1 millisecond synchronously replicated update on the LAN can’t possibly be replicated over a WAN in the same way. Similarly, an in-memory replicated datastore such as GemFire with sustained high-volume update rates provides a far greater transaction throughput than a disk-based database. However, the DB actually has enough absorption capacity if we batch the updates.

Application is insulated from DB failures as the gateway queues are highly available by default and can be configured to allow zero data loss.

Reduce database load by enabling conflation — Multiple updates of the same key can be conflated and only the final entry (containing all updates combined) written to the database.

Each Gateway queue is maintained on at least 2 nodes, internally arranged in a primary + (one or multiple) secondary configuration.

200,000,000 orders in cache, to be queried in real time #UBS

Say you have up to 200,000,000 orders/ticks each day. Traders need to query them like “all IBM”, or “all IBM orders above $50”. Real time orders. How do you design the server side to be scalable? (Now i think you need a tick db like kdb)

First establish theoretical limits. entire population + response time + up-to-date results — choose any 2. google forgoes one of them.

Q: do users really need to query this entire population, or just a subset?

This looks like a reporting app. Drill-down is a standard solution. Keep order details (100+ fields) in a separate server. Queries don’t need these details.

Once we confirm the population to be scanned (say, all 200,000,000), the request rate (say a few traders, once a while) and acceptable response time (say 10 seconds), we can allocate threads (say 200 in a pool). We can divide each query into 50 parallel queries ie 50 tasks in the task queue. On average we can work on 4 requests concurrently. Others queue up in a JMS queue.

DB techniques to borrow:
* partitioning to achieve near-perfect parallel execution
* indexing — to avoid full scans? How about gemfire indexes?
* You can also use a standard DB like sybase or DB2 and give enough RAM so no disk IO needed. But it’s even better if we can fit entire population in a single JVM — no serialization, no disk, no network latency.

L1, L2 caches? What to put in the caches? Most used items such as index root nodes.

See post on The initiator mailbox MOM model.