Distribution and Co-Location Mysteries
A repeating theme of new Netezza deployments is: The greatest strength of Netezza is its power – and the greatest weakness of Netezza is its power. Because such power tempts us to build inefficient queries and structures, and presume the power will cover us. It will – for a while. The new boost in power is so overwhelming, they cannot imagine it could be faster — with simple changes.
Sometimes a new Netezza user will trip-over new terms in the technology. They see the “distribute on” phrase and immediately translate “partition” or “index” when Netezza has neither. In fact, those concepts don’t even have equivalents in Netezza. This confusion is simply borne on the notion that Netezza-is-like-other-databases-so-fill-in-the-blank. And this mistake won’t lead to functional problems. They will still get the right answer, and get it pretty fast.
But it could be soooo much faster.
As an example, we might have a traditional star-schema for our reporting users. We might have a fact table with customer transactions, along with dimensions of a customer table, a vendor table, a product table etc. If we look at the size of the tables, we find that the product and vendor tables are relatively small compared to the customer, and the fact table dwarfs them all. A typical default would be to distribute each of these tables on their own integer ID, such as customer_id, vendor_id etc. and then add a transaction fact record id (transaction_id) separate from the others, even though the transaction record contains the ID fields from the other tables.
Then the users join the customer and the transaction fact using the customer_id. Functionally this will deliver the correct answer but not the best performance. Let’s take a look under-the-covers at what the machine does. I will use a rounded number of dataslices (100) just to keep things simple.
Backstory: When we added the table to the machine, it exists in the catalog once, but it has 100 parts on the dataslices. If the table is “random” distribution, and we add 1000 records, we can expect exactly 10 records on each dataslice. When we query the table, each dataslice will operate independently of the others, in a “shared nothing hardware” scenario. They will each get dedicated CPU cycles, memory, and so forth.
This is why, if we added a hundred million rows to the table, it would have one million rows per dataslice. And each query, even if a table scan, runs 100 times faster than its non-distributed counterpart. This however, is just one facet of parallel processing.
What is also means, is that every other table shares the same dataslices. So I have another table with 200 million rows, we can expect 2 million rows to be on each dataslice. I’ll talk about the deeper mechanics of the dataslices in another essay.
Now let’s say we distribute on a key. If — as noted above — each table gets its own distribution key, we will never be able to achieve a “co-located” — or parallel — join. This is important, because you paid a lotta money for a machine that will give you a parallel join.
Take a quick look at the keys we assigned. The machine will faithfully calculate which of the 100 dataslices will own a given key value, and all records with that value will arrive on the same dataslice. We can expect many different key values in a given dataslice, but we won’t expect the same key value on more than one dataslice.
This means, if we have say, the customer table distributed on customer_id, and the transaction_fact distributed on transaction_id, we will never actually join them on these keys. We will join the customer table to the fact table on customer_id.
When this happens (joining two tables that don’t share the same distribution) the machine has to pull all the salient data from one table and “broadcast” it to the other. For large tables, this can drag the query performance into the weeds.
But it also saturates the machine’s inter-CPU resources, like its memory and network paths. If several of these queries hit at once, they will fight each other over those resources. A query that runs in 20 seconds by itself may take two minutes or longer when running with other inefficient queries like it.
Brute mechanical force is the wrong answer, because the resource saturation will ultimately fill the machine’s capacity and everything will run slow. This is not a drawback of the Netezza machine. It is a misapplication of the machine’s capabilities.
To fix this, we simply distribute the transaction_fact on the customer_id. This guarantees that all records with the same customer_id value will land on the same dataslice — for both tables. Thus when we join the tables, it pushes the query deep into the machine and the heavy-lifting of the join happens in parallel, no broadcasting or saturation of machine resources. We preserve capacity and can run multiples of these queries with consistent performance.
This does not mean the query is as fast as it can be. It simply means we are using the machine as intended and the query itself is more efficient.
The above describes a co-located read. In data processing however, we often use “ELT” — that is — SQL transforms to affect data. In Netezza, the tables are parallelized so it’s more efficient to use multiple, focused ELT operations (we read from one or more tables and write the result to an intermediate table).
An important factor here is the data write. If we pull data from one table distributed on, say, product_ID, and inadvertently write the data to a random table, or to another key, we are redistributing the data all over the machine, and it costs a lot. If however we configure the target table to use the same distribution key as our largest source table, the data will largely stay on the same datalice without redistributing everywhere. Using this technique will make your ELT radically more efficient. We’re talkin’ operations that took hours can be reduced to minutes.
Case Study Short: One shop built a series of stored procedures, each of which merged and transformed a variety of tables over a long chain of operations. As a service bureau, they hosted data for various clients, and would process client data through this application. One day a client arrived with more data than all their other clients combined. Where any given client ran in an hour, this new client took 53 hours. We showed them how to make it more efficient by using co-located writes, reducing the total run-time to under 20 minutes for the new client, and under 1 minute for the others.
Deletes and Updates — A delete in Netezza is a soft-delete. It removes the record from the query’s view. A groom on the table removes it completely. An update is a combination of a full-record delete and full-record insert. Thus any delete or update leaves soft-deleted records behind.
Since a delete operation can grind the machine if done wrong, and can affect both deletes and updates, let’s look under the covers at how a parallel delete works.
It works like this: Let’s say we want to delete a thousand records. We load these to an intermediate table and execute a query similar to:
Delete from MyTable A where exists (select B.mykey from Temp_Deletes B where A.mykey = B.mykey)
If the MyTable has a random distribution, the delete operation doesn’t have any idea where any given key might be. It must broadcast all 1000 keys to every dataslice. This is radically inefficient.
However, if we distribute MyTable with MyKey, the query will chop the 1000 keys into their designated dastaslice, on average 10 keys sent to each. This is the difference between a delete/update which will grind, and one which will return in a flash.
Don’t miss this – with 100 dataslices, not using a co-located delete means your delete will run 100 times slower than it should, or rather, use 100 times more capacity than it should. So it’s important to remove these kinds of inefficiencies.
A word: If we took all tables and placed them on random distribution, and did a baseline of our query performance, and repeated the test with distribution on common keys and “co-located” joins – we will see a percentage boost in performance. But we won’t (normally) see multiple-X performance boosts.
We get 100x / 200x – etc. performance boost with effective use of zone maps. More on those in another essay.
Consider the following distribution strategy:
Customer = customer_id
Product = product_id
Transaction_Fact = customer_id, product_id
It’s easy to imagine the Transaction_Fact, in using two keys, will know which one applies in a join. That is, it’s smart enough to know customer_id goes with Customer, etc.
It does neither. When we add a record to Transaction_Fact – it takes the value of both keys, combines them into one, and makes a hash. The only way to get a co-located join on Transaction_Fact is to use both keys on another table that is also distributed on those keys. As neither Product nor Customer is distributed on both keys, they will never see a co-located join.
For this reason, we consider it a best practice to use one and only one distribution key for each table, and make it from values that are part of the data, especially the natural values of the columns. This simplifies distribution and makes things easier to understand. It also makes things easier to rebuild in case of data loss.
How do we know if the distribution is smooth, or mostly smooth, like a “ragged edge” on the administrator’s graphs? One way is to use the graphs, but a quick and dirty way is just query the table. Each table maintains a hidden column called “datasliceid” which in the above example for 100 dataslices, will have a domain of 1 to 100.
To test a given column for viability, we don’t have to rebuild the whole table, just the column:
Create MyTest as select mykey from MyTable distribute on (mykey);
Now we can query the table to see if any dataslice has skew:
select count(*) mycount , datasliceid dsid from MyTest group by dsid order by dsid;
We can expect it to spit out 100 rows, each with a number 1 to 100 for datasliceid and the count of records for each. This is easy enough to eyeball. But if we want it a little more automated because we have 500 dataslices, another way might be:
create table dsmon as select select count(*) mycount , datasliceid dsid from MyTest group by dsid;
select max(mycount), min(mycount), avg(mycount) from dsmon;
The above will give us a simple max/min and average row count. These three results should be close in value. Otherwise the data might be skewed and we need to understand why.
Case Study Short: A national shipper used customer_id in the fact table but the data was radically skewed. Seems some customers are highly active while others, not so much. By including the 5-digit zip code in the distribution key, this smoothed the data nationwide for the big customers with little effect on the smaller ones. We just had to remember to always use the ZipCode in the queries.
Case study short: A user asked about a slow query. He showed me a graphic of the table’s distribution, a flat line all the way across, except for dastacliceID = 1. It had more data than any three of the others combined. Giant spike on the graph. He said, ignore that, the problem seems to be…
I said no, here’s how it works: The query hits all the dataslices at once, and each one will finish at a different time depending on what data it finds. If one dataslice has a tall spike like that, it means the machine will wait on that dataslice long after the others are done.
Case Study Short: One shop distributed their data on Transaction_Date. This gave them a perfect distribution across all dataslices. Why were the queries so slow? Each query invariably included a date or range of dates. If the query asked for records of a single date, those records only exist on one dataslice, so only one CPU/dataslice could assist in the query. This is called process skew. Another group distributed their transaction fact on store_id. Whenever users queried, it was invariably from a store location, and they only wanted their store’s data. Once again, only one dataslice carried their store’s data, so all the queries were slow. By picking another distribution key, each store’s records spread across more dataslices, involving more of the machine’s resources to resolve the query.
Never – ever – use dates or anything calendar-related for a distribution key.
Back to our example of Customer, Product, and Transaction_Fact:
Another question arises: If the customer and product dimensions are not co-located with the transaction_fact, how then will we avoid this broadcast of data? With small tables, the impact is negligible. Larger tables present a problem.
At one site, the Customer, Product, and Fact – joined the Product table to capture several data points. As the Customer and Fact were distributed on customer_id – this caused all Product data to broadcast. As we examined the queries and the query history, we learned the queries only touched three columns on the Product table. We copied those columns to the Fact table and eliminated the Product table from the join. Problem solved.
Some are cringing now. What? You denormalized the data? Purists can’t stand the idea of denormalization. Just so illogical. But — we aren’t denormalizing. Those columns remain on the product table, we are simply making copies available on the fact — as “co-located” data.
This is another brand of co-location. We copy a few problem columns to a busy fact table, and the data is already in the walking path of the queries without including the Product table.
If we take a step back, a Fact table is already a metadata table. It contrives a relationship between multiple dimensions that does not exist (in one place) in the transactional tables. So moving these columns to the Fact table – for co-location and performance – is a completely different matter than for logical reference.
Another common example is the use of a Fact table, representing orders / transactions, and a Fact_Header table, representing the data common to all the transactions within an order. Often we may need to join these two large tables, unless we consider that some of the Fact_Header data can migrate (co-locate) to the Fact and avoid the join altogether.
Note, we are not trying to make a super-wide fact table, just a fact table that is optimized for how the users consume it. Right-size it for consumption.
At another site, we saw two groups competing for machine resources. The customer group queried the tables based on Customer, Fact, and customer_id, with fast response times. The product group queried the Product table, distributed on product_id, and the Fact, distributed on customer_id, and the queries were always slow. More important — because the product group’s activities saturated the machine’s resources, it made the customer group’s queries run slow, too.
Important safety tip: There is no such thing as a general-purpose schema in Netezza. We build a schema with a purpose — to deliberately serve the data and performance needs of a user base.
Back to the above — the product group requested that a subset of data in the Fact be carved out and distributed on product_id. The DBAs flatly refused to make what they called a redundant copy. While the data itself may be redundant, its underpinning physical layout with a different distribution key, makes it completely different implementation. It is often best to demonstrate the difference with a POC, otherwise the DBAs will understand it conceptually as they do for other database vendors.
In another example, in a medical setting, each patient claim arrived with companion information recorded in other tables such as the diagnosis, pharmacy, etc. These were one-to-many for each patient, but the patient_id only existed on two tables of the fifteen in play. We discovered we could put the patient_id on every table and while it would be functionally redundant on the table, and in the query itself, it would allow all tables to co-locate on a common key. This is not an opportunity we want to throw away. Not only did performance boost, it opened up more options for integrating data.
Our primary problem as data architects is in not prioritizing the performance as we think about the logical data. And the distribution keys are a big deal.
Key-based thinking means we regard the functional flow from primary to foreign key. In a data warehouse, these relationships are “useful” but not always enforceable. If we use key-based thinking, we will get the functionality right but it may be inefficient on the machine.
If we think of the MPP first – massively parallel processing – it’s all about physics, not just the logical relationships.
So – when building the logical model, think physically, but act logically.
That is, build the logical model but always consider physics in both the data structures and the data types of the columns. For example, any alphanumeric “varchar” that is always a numeric integer in value, convert it to a binary, especially for a key. If we have natural keys that are not numeric, form a BIGINT field for the key and use the SQLToolKit’s HASH8() function to make a binary equivalent. Move inefficient data types toward high-performance data types.
Case Study Short: One shop had credit card numbers as distribution keys, but were varchar data types, not only the most inefficient type for join keys but also the most inefficient for distribution keys. Why varchar? Because if binary, the left-pad zeros were lost on the user display. This is a case of display-level features driving a data decision detrimental to performance. Isolate the user requirements from the physical data, ideally with views. In this case we converted all values to bigint, and in the view for the table, the column’s value was converted to a display value complete with left-padded zeros.
When formulating the queries to consume these tables, think logically but act physically.
Often we may have a BI tool or application that manufactures SQL for the task at hand. We might not have a lot of control over the logic in the query. We do have some responsibilities in making sure the query uses the distribution keys and the Organize keys in the right way. This provides us freedom in the query and the only constraint is in the efficiency of the where/join phrase.
Important: If we distribute the Customer table on customer_id, likewise for the Transaction_Fact, yet fail to include the customer_id in the join, the tables will not co-locate their joins.
This is why we say — tables and queries in Netezza have a “synergistic dance”. We must distribute the data to serve the stated user performance needs, but the users must form queries to leverage it.
Many newcomers attempt to map their understanding of their favorite databases into Netezza, only to find Netezza has no equivalent. In transactional databases, relationships flow from primary to foreign key, something the Netezza machine doesn’t even enforce.
Likewise, “policy” for our other databases (such as not moving data columns to a fact table) enforce consistency and performance in those databases but are detrimental in Netezza. Don’t change or throw out the policy. Adapt it for Netezza’s performance. The data columns aren’t moving to denormalize for logic, but to co-locate for performance.
We use logic to get the right answer, but we use physics to get the right answer fast.
This is not to say that “function doesn’t matter” but we cannot design tables for a highly physical machine and expect it to behave at highest performance — unless we regard and protect the physics as an asset. Addressing the functionality alone might provide the right functional answer, but not the most scalable performance.