KDB Tick Explained: A Walkthrough [PART 2]
In my previous tutorial I walked you through all the helper functions you can find in the u.q
file of a plain vanilla Tickerplant. I provided a comprehensive overview of the inner workings of these functions and how they interact with each other. It is now time to circle back and continue with our step-by-step examination of the main tick.q
file and complete complete our understanding of the Tickerplant. If you'd like to revisit our previous discussions or if you're new to this tutorial, you can access my earlier post here.
tick.q
- Continued
The first function definition we encounter in the tick.q
file is the one of .u.ld
. For simplicity, we ignore the namespace change \d .u
and will provide the full compound name of the functions.
.u.ld
.u.ld
is used to create a new Tickerplant Log file and establish a connection to it. It does this by initially verifying the existence of a Tickerplant Log file. If it doesn't exist, a new one is created. Subsequently, it replays the Tickerplant Log file. If an existing Tickerplant Log file was found, the replay will restore the state of the world for all real-time subscribers. If a new file was created, the replay works on an empty file, having no impact. Following this, the function checks the success of the replay and opens a handle to the Tickerplant Log file, returning it. However, if the replay was unsuccessful, it will raise an error, resulting in the Tickerplant aborting. The function .u.ld
is used within .u.tick
as well as within .u.endofday
.
If you aren't familiar with the Tickerplant Log file, I've given a brief explanation of what they are and their purpose in one of my earlier blog posts. Feel free to take a look at it for more details. The Tickerplant Log file
// @param: x (date) - the date for which we want to create a Tickerplant Log file
// @return: the file handle to the Tickerplant Log file for the current day
.u.ld:{
if[not type key .u.L::`$(-10_string .u.L),string x; .[.u.L;();:;()]];
.u.i::.u.j::-11!(-2;.u.L);
if[0<=type .u.i;-2 (string L)," is a corrupt log. Truncate to length ",(string last .u.i),"and restart";exit 1]
hopen .u.L
};
First we need to check for the existence of the Tickerplant Log file. To accomplish this, we must construct the file name. We achieve this by taking the current Tickerplant Log file name stored in .u.L
and converting it to a string. Then, we use the drop operator to eliminate the last 10 characters from the string. This demonstrates how operators can have various overloads. We've previously encountered the drop operator when removing the i-th element from a list, but here we're using it to eliminate the first or last n-elements from a list.
q)string `:sym2023.10.10
":sym2023.10.10"
q)-10_string `:sym2023.10.10
":sym"
Next, we convert the current date contained in variable x
to a string and append it to the initial Tickerplant Log file name that we acquired earlier. Subsequently, we utilize the double-colon ::
operator to assign this new Tickerplant Log file name to the global variable .u.L
.
q).u.L::`$(-10_string .u.L),string x
q)`$(-10_string `:sym2023.10.10),string 2023.10.11
`:sym2023.10.11
We can now use the key
operator to verify the existence of a file. While the key
operator is normally used to retrieve the keys of a dictionary or keyed table, it can also be used to to confirm the existence of a variable, file, or directory. When key
is applied to a variable name or a file path, it will return the variable name or file path if the variable or file exists; otherwise, it will return a null result. In the case of a directory, key
will return the directory's content if it exists, and nothing if it does not.
Let's have a look at some examples. We are currently in the directory testing
in the location "/Users/Alexander/repos/testing"
. The directory contains a directory named folder
, two tables newTable
and t
, a Tickerplant Log file called sym2023.10.10
. The directory folder
contains two more files file1.txt
and file2.txt
q)\pwd
"/Users/Alexander/repos/testing"
q)\ls
"folder"
"newTable"
"sym2023.10.10"
,"t"
"test.txt"
q)a:9
q)key `a
`a
q)key `b
q)key `:sym2023.10.10
`:sym2023.10.10
q)key `:sym2023.10.11
q)key `:folder
`file1.txt`file2.txt
q)\ls folder
"file1.txt"
"file2.txt"
If the Tickerplant Log file doesn't exist, and nothing is returned, type
will return 0b. We use not
to create the logical complement (negation) to return 1b
true if the file doesn't exist. If the file exist and the file name is returned as symbol, type
will return -11h
, the type of a symbol atom, and not
will thus return 0b
, false.
If the Tickerplant Log file doesn't exist, and the if-statement evaluates to true
, we use .[.u.L;();:;()]
to create an empty Tickerplant Log file. You might be curious about what that expression does, and I'm going to share a neat trick. However, this should be used with caution. If you have some knowledge of the history of the Q programming language, you're aware that Q is based on K (which is the foundation of KDB). In a Q console, you can access the K mode and inspect the underlying K code of a Q function. Let me demonstrate this for you.
q).Q.dpft
k){[d;p;f;t;s]if[` in f,c:!+r:`. . `\:t;'`domain];if[~f in c;'f];i:<t f;r:+enxs[$;d;r;s];{[d;t;i;u;x]@[d;x;:;u t[x]i]}[d:par[d;p;t];r;i;]'[(::;`p#)f=c;c];@[d;`.d;:;f,c@&~f=c];t}[;;;;`sym]
As evident, K code can be even less readable than Q and quite complex. Therefore, I strongly discourage attempting to write K code, especially since KX advises against it, as certain functionality might not be guaranteed. However, for simple functions and inspection purposes only, we can take a peek. Let's explore how the set
function is implemented.
q)set
k){$[@x;.[x;();:;y];-19!((,y),x)]}
When we examine the middle part of the set
function's implementation, we notice that the code is similar of how we create an empty Tickerplant Log file. In essence, .[.u.L;();:;()]
accomplishes precisely that. If we replace x
with .u.L
, the file path to the Tickerplant Log file and y
with the empty list ()
, we can observe that we are essentially generating an empty file.
The last thing that is left to do, is to replay the Tickerplant Log file, ensure that it's not corrupted, open a connection handle to it and return it. We replay the Tickerplant Log file with -11!
and the -2
option. The -2
option can be used to replay corrupted Tickerplant Log files. In case the Tickerplant Log file is corrupt, it will return the number of valid chunks in the Tickerplant Log file and the corresponding length of them. If the Tickerplant Log isn't corrupt, it simply returns the number of valid chunks.
We proceed by replaying the Tickerplant Log file using the -2
option and assigning the number of valid chunks to the global variables .u.i
and .u.j
. We then verify whether the Tickerplant Log file was corrupt or not. If the Tickerplant Log file was corrupt, using -11!(-2;logFile)
will return the number of valid chunks and their respective sizes. Therefore, if we receive a list as the return value, the type will be positive (as a list always has a positive type value). In such a case, we throw an error indicating that the Tickerplant Log file needs to be truncated to the valid part and then abort. However, if the Tickerplant Log file wasn't corrupt, we establish a connection to it using hopen
and return this connection handle.
if[0<=type .u.i;-2 (string L)," is a corrupt log. Truncate to length ",(string last .u.i),"and restart";exit 1]
hopen .u.L
.u.tick
.u.tick
is the first function that is executed when the Tickerplant starts up. It proceeds by first invoking .u.init
to set up .u.t
and .u.w
. Following that, it ensures that all tables defined in the schema file have time and sym
as their first columns. If this condition isn't met, the Tickerplant will raise an error and terminate. As a subsequent step, the g
(grouped) attribute is applied to the sym column of each table, a measure taken for performance enhancement, resulting in quicker query times for user queries against the Real-time Database (RDB).
Afterwards, the function initializes .u.d
with the current date and then concludes by executing .u.ld
to create the Tickerplant Log file and establish a connection to the file.
// @param: x (String) - The name of the schema file containing the definition of all tables, without file ending
// @param: y (String) - The path to the directory where the Tickerplant Log and HDB should be stored
// @return: None
.u.tick:{[x;y]
.u.init[];
if[not min(`time`sym~2#key flip value@)each .u.t;'`timesym];
@[;`sym;`g#] each .u.t;
.u.d:.z.D;
if[.u.l::count y;.u.L::`$":",y,"/",x,10#".";.u.l::.u.ld[.u.d]]
};
Even though .u.tick
seems to be a long function with quite a lot going on, it's actually relatively simple and straight forward. After the invocation of .u.init
, which initializes .u.t
and .u.w
, it proceeds to validate that all tables defined in the schema file sym.q
indeed feature time
and sym
columns as their initial columns. Let's delve into the code to better understand how this is accomplished.
// verify that each table has a time and sym column as their first columns
if[not min (`time`sym~2#key flip value@)each .u.t;'`timesym]
Let's break this code further down: We first iterate over every table defined in .u.t
using the each
iterator. Within the parenthesis we then apply value
in order to obtain the actual content of each table. Remember, .u.t
only contains the symbol names of all tables. Using value
we can actually retrieve the data stored in a variable. We then flip
the table transforming the table into a dictionary.
A table is essentially a flipped column dictionary and is stored as such. A column dictionary is a dictionary where each key in the dictionary has the same number of elements as their corresponding values, making the column dictionary rectangular. When we flip such a column dictionary, it transforms into a table. However, it's important to note that KDB/Q doesn't actually transpose the data; it merely marks the data as a table. By keeping the data stored as lists, KDB/Q minimizes memory usage and enhances performance by allowing vectorized operations. This also simplifies the process of storing the table to disk. If we examine the internal representation of a table, we can observe that it is, in fact, a flipped column dictionary.
q)0N!flip `key1`key2`key3!(`valueA`valueB`valueC;`valueX`valueY`valueZ;`E`F`G)
+`key1`key2`key3!(`valueA`valueB`valueC;`valueX`valueY`valueZ;`E`F`G)
key1 key2 key3
------------------
valueA valueX E
valueB valueY F
valueC valueZ G
Note: The +
operator is the K symbol for the Q flip
operator
After transposing the table into a dictionary, we extract the keys using the key
operator and select the first two of them. These two keys are then compared with the symbol list time`sym
using the equality operator ~
. If there is a match, the result is 1b
. This process is repeated for all tables, resulting in a boolean vector that contains 1b
for tables with time
and sym
as their first columns and 0b
for tables that don't meet this criteria. By applying the min
function to this boolean vector, we can determine if all tables have time
and sym
as their first columns. If at least one table does not meet this condition, the min
function will return 0b
, resulting in an error being thrown.
Next, we add the grouped g attribute to the sym column of each table. While the Tickerplant holds no or little data in memory, depending on the mode it runs in, this attribute will be passed to all real-time subscribers when they subscribe to a table. This will ultimately enhance the performance of user queries. Using the apply operator @
we create a projection applying the grouped g
attribute to the sym
column. Finally we iterate over this projection for every table in the .u.t
list.
Following that, we set the global variable .u.d
to the current date, which is obtained using .z.D
. Finally, we invoke .u.ld
to generate a new Tickerplant Log file and establish a connection to it. In this process, we first validate whether the path to the TickerplantP Log file is defined. If it is, we create a temporary Tickerplant Log file name with the format :pathToLogFile/filename.........
by appending ten dots .
at the end of the file name. These dots serve as a temporary placeholder for the actual date, which will be appended to the Tickerplant Log filename during the execution of .u.ld
.
.u.endofday
The .u.endofday
function has three essential tasks:
- It initiates the execution of
.u.end
, which sends an asynchronous message to all real-time subscribers, thereby triggering their respective end-of-day functions. - It increments the Tickerplant's current date, which is stored in the
.u.d
global variable, by one to account for the start of the new day. .u.endofday
safely closes the connection to the current Tickerplant Log file, generates a new Tickerplant Log file for the new date, and establishes a connection to the newly created file.
The .u.endofday
function is called from within the .u.ts
function, but only if it's past midnight and we started a new day.
// @params: None
// @return: None
.u.endofday:{
.u.end[.u.d];
.u.d+:1;
if[.u.l;hclose .u.l;.u.l::0(`.u.ld;.u.d)];
};
Connection handles
The majority of the code in .u.endofday
is straightforward. We start by calling .u.end
with the current date and then increment the current date by one. The final step involves closing the handle to the current Tickerplant Log file (if it exists) and calling .u.ld
, which creates a new Tickerplant Log file, opens a handle to it, and returns the handle. However, when calling .u.ld
, we encounter a new concept. We invoke .u.ld
using one of the three permanent system handles: 0
is the system handle for the console, 1
is the system handle for stdout
, and 2
is the system handle for stderr
. Let's explore how we can use the system handle 0
to execute a function.
// We first define a simple function
q)f:{x+x}
// We can now invoke the function by using the system handle 0
q)0(`f;2)
4
.u.ts
.u.ts
is responsible for verifying whether we have reached the end of the day and passed midnight. It is called from within .z.ts
, which is the system function executed at regular intervals by a timer.
// @param: x (date) - the current date (.z.D is passed)
// @return: None
.u.ts:{[x]
if[.u.d<x;
if[.u.d<x-1; system "t 0";'"more than one day?"];
.u.endofday[]];
};
First, we check if .u.d
, the current date for which the Tickerplant publishes data for, is less than the current date, represented by .z.D
passed as x
. If this condition is satisfied, we ensure that we've published data for only one day and not more than that by comparing .u.d
to x-1
, which is the date preceding the current one. If we indeed only published data for one day, we proceed to invoke .u.endofday
. However, if the first condition is not met, meaning we haven't reached the end of the day yet, we don't take any action. In such a case, .u.ts
will be invoked again at the next timer interval, and the checks will be performed again.
We're now approaching the final section of the Tickerplant code, and we'll encounter different behaviors depending on the mode in which the Tickerplant is running. The Tickerplant can operate in two modes: batch mode and tick mode. In batch mode, incoming data is accumulated for N
milliseconds and then dispatched to all real-time subscribers. In contrast, tick mode delivers incoming data immediately, with minimal delay. Let's delve deeper into these two modes:
Tickerplant Batch mode: .z.ts
and .u.upd
In batch mode, the timer interval for invoking the system function .z.ts
is established at startup through the -t
flag. In this mode, incoming data is temporarily stored in memory for N
milliseconds, and then it is published when the timer triggers after this interval. We check whether a timer interval was set or not using an if
statement and the system command system "t"
. If this condition evaluates to true
, we can determine that we are in batch mode and subsequently define .z.ts
and .u.upd
accordingly.
if[sytem "t"; // If true, we run in the batch mode
.z.ts
// @param: x (timestamp) - .z.ts is invoked with the current timestamp
// @return: None
.z.ts:{
.u.pub'[.u.t;value each .u.t];
@[`.;.u.t;@[;`sym;`g#]0#];
.u.i::.u.j;
.u.ts[.z.D];
};
Initially, we publish all the updates that have been batched in memory to the real-time subscribers. This is accomplished by utilizing .u.pub
along with each-both
and a pair of list comprising table names and their corresponding data.
.u.pub'[.u.t;value each .u.t];
The expression value each .u.t
generates a list of data, with each item representing the complete content of a specific table. Since .u.t
holds the list of all available tables in our Tickerplant, the expression [.u.t;value each .u.t]
forms a pair of lists that include all table names and their respective data. By using each-both
along with the dyadic (two-parameter) function .u.pub
, we can proceed to publish each table and its associated data to the real-time subscribers.
Each-both '
Let's have a closer look how each-both
works:
According to Q for Mortals The iterator Each '
modifies a binary function (operator, keyword) to apply pairwise to corresponding list items. The following examples should illustrate this behavior
q)0N!(`One`Two`Three),'(1;2;3)
((`One;1);(`Two;2);(`Three;3))
`One 1
`Two 2
`Three 3
q)("abc"; "uv"),'("de"; "xyz")
"abcde"
"uvxyz"
Namespacing
Next we use amend-at to clear the contents of the tables in memory. We do this by leveraging the fact that in KDB/Q namespacing is implemented with dictionaries. Consequently, we can directly access the root namespace, using our table names as keys, to retrieve or modify the content of the tables.
Assume we have defined the tables trade and quote, a variable a and a function f in our current KDB/Q process. key `.
will then show the content of the root namespace
q)key `.
`quote`trade`a`f
q)@[`.;`trade`qutote]
+`time`sym`price`size!(`timespan$();`symbol$();`float$();`int$())
+`time`sym`bid`ask`bsize`asize!(`timespan$();`symbol$();`float$();`float$();`..
q)a
1
q)f
{x+x}
Amend
The amend-at operator is highly practical for making modifications at specific indices within data structures. This functionality extends to both simple and nested indexes, depending on the operator being used. For instance, if we wish to replace particular characters in a string, we can employ the amend @
operator along with the unary function upper
.
q)@["alexander";0 2 4 6 8;upper]
"AlExAnDeR
Combining this newly acquired knowledge with the knowledge about namespaces, we can create an expression that deletes the content of all tables and applies the grouped g
attribute to the sym
column of each table.
@[`.;.u.t;@[;`sym;`g#]0#]
The expression above accesses the root namespace to fetch the content of all tables, removes all rows with 0#
, assigns the grouped g
attribute to the sym
column, and subsequently replaces each table with an empty one using their corresponding table names.
We proceed by setting the global variable .u.j
to the value of .u.i
, updating the number of messages stored in the Tickerplant Log file, and then calling .u.ts
with the current date to check if midnight has been reached.
.u.upd
We now define .u.upd
for the case the timer is on startup and we run in batch mode. This function differs based on whether a timer was set or not. When a timer is configured, we publish all incoming messages within the .z.ts
system function rather than within .u.upd
.
The .u.upd
function is triggered by the Feedhandler whenever it publishes data to the Tickerplant. It begins by verifying whether the first column of the incoming data is of type time. If it isn't, the function prefixes the current timestamp to the incoming records. Subsequently, it inserts the data into the corresponding table specified within the Tickerplant, and it persists these newly arrived messages to the Tickerplant Log file. This log file serves the purpose of data playback, allowing for the recreation of the state of the world at any given point in time. Lastly, .u.upd
increments the message counter .u.j
by 1. It's worth noting that .u.j
maintains a record of the total number of messages received, encompassing those saved to the Tickerplant Log file and those held in memory.
When operating in batch mode, .u.upd
refrains from immediately publishing messages to the real-time subscriber. Instead, this action occurs when triggered by a timer, invoked by .z.ts
.
// @param: x (symbol) - the table name of the data published by the Feedhandler
// @param: y (data) - the data to be published. This can be in form of a single record or a list of records
// @return: None
.u.upd can handle both, single record updates as well as multiple/bulk records updates
Example:
Single record update: .u.upd[`trade;(08:00:00;`GOOG;123.4;1000)]
Multiple record update: .u.upd[`trade;(08:30:00.000 08:30:00.001;`MSFT`AAPL;320.4 180.9;100 250)]
.u.upd:{[t;x]
if[not -16=type first first x;
if[.u.d<"d"$a:.z.P;.z.ts[]];
a:"n"$a;
x:$[0>type first x;a,x;(enlist(count first x)#a),x];
];
t insert x;
if[.u.l;.u.l enlist (`upd;t;x);.u.j+:1];
};
There's a lot happening, but it's fairly straightforward. Let's break it down line by line and understand its functionality.
First, we verify the data type of the initial element in the incoming data to be of type 16 (timespan). As .u.upd
manages updates for both single and multiple records simultaneously, the incoming data could be either a single row or multiple rows. If it's a single row, the first element is an atom, while in the case of multiple rows, it's a list. To ensure accurate type comparison, we need to apply the first
function twice for obtaining a single element.
if[not -16=type first first x;
If the if-statement
evaluates to true, and the incoming data does not contain a time column, we examine if the Tickerplant's date is earlier than the current date. We derive the present date from .z.P
, capturing the current date and time and storing it in variable a
. If it's earlier, we trigger .z.ts
, responsible for publishing the data in memory and calling the end-of-day function. If the Tickerplant date is not preceding the current date, indicating that we're within the same day, we proceed by converting the present date and timespan into a timespan datatype to obtain the current timespan. This will serve as the Tickerplant time, indicating when the record was received.
if[.u.d<"d"$a:.z.P;.z.ts[]];
a:"n"$a;
Next, we use the conditional operator $
to assess whether the initial element of the received record is a single atom (negative types are atoms, and positive types are lists) or a list.
x:$[0>type first x;a,x;(enlist(count first x)#a),x];
Case 1: 0>type first x
evaluates to true
meaning we received a single record and the first element is an atom.
We prepend the current timespan to the single record we received, and this updated record is stored in the variable x
.
a,x
Case 2: We've received multiple records, which means we need to create a list of timespans with the same length as the number of records we've received.
First, we use count
to identify the number of records received. Then, we create a list of timespans, consisting of count first x
copies of the current timespan, a
. To prepend this list to another list, we need to convert it into a singleton list (a list containing one element that is a list itself). Otherwise, the elements would be individually prepended rather than as one unified list.
q)1 2 3,((4 5 6);(7 8 9))
1
2
3
4 5 6
7 8 9
q)enlist[1 2 3],((4 5 6);(7 8 9))
1 2 3
4 5 6
7 8 9
Subsequently, we prepend the singleton list of timespans to the received records for the bulk update. Eventually, this updated set of records is reassigned to the variable x
. These modified records are stored in the Tickerplant memory.
(enlist(count first x)#a),x
Up to this point, we were contained within the initial if
statement block. Now, we exit this statement. The first column of the records that are to be inserted now holds timespan values. These values could be the ones sent by the Feedhandler or, in cases where no timespan value was provided, they hold the timespan recorded by the Tickerplant.
We proceed to insert the received records into their respective tables in memory and store them in the Tickerplant Log file, assuming the file exists. After verifying that there is a handle to the Tickerplant Log file,we form a parse tree, including the function name for when the Tickerplant Log file is replayed as the first element, the table name for the stored data as the second element, and the data itself as the third element. This is only executed if a Tickerplant Log file is confirmed to exist.
We then use enlist
and the Tickerplant Log file handle .u.l
to append the records to the Tickerplant Log file. When a process replays the Tickerplant Log file, defined as first element in the parse tree, the will be invoked with t
and x
. Finally we increase .u.j
by 1.
t insert x;
if[.u.l;.u.l enlist (`upd;t;x);.u.j+:1];
For the Tickerplant Log file replay to work, the dyadic function upd
as well the schemas of all tables present in the Tickerplant Log file have to be defined first. A simple example of upd
would be insert
as defined in the Real-time Database (RDB). You can obtain the table schemas from the schema file sym.q
This concludes the definition of .u.upd
. Now that we've defined both .z.ts and .u.upd for the Tickerplant when running in batch mode based on a predefined timer, let's shift our focus to the Tickerplant publishing every incoming record as it arrives.
Tickerplant Tick mode: .z.ts
and .u.upd
As explained previously, if a Tickerplant runs in tick mode, it will publish the incoming data straight to all real-time subscribers. Additionally, a timer will verify every second whether we reached the end of the day or not.
We first verify that there was no timer set and if this is true, we know that we should publish all incoming data to all real-time subscriber as soon as it arrives. However, we still have to set a timer to invoke .z.ts
and verifiy if we have reached the end of the day or not. We do this by using the system
command, setting the interval to invoke the timer to 1 second.
if[not system "t";system "t 1000";
In the next step, we define .z.ts
for the case we run in tick mode.
.z.ts
.z.ts
is the system function that will be invoked on each timer interval defined with system "t N"
, where N
is the interval in milliseconds. Given that we publish all incoming data as soon as it arrives, the only purpose of .z.ts
is to invoke .u.ts
which verifies whether we are past midnight or not.
.z.ts
will be invoked every N
milliseconds and the current timestamp is passed as only argument:
q).z.ts:{show x}
q)\t 1000
q)2023.11.11D16:40:30.981221000
2023.11.11D16:40:31.981221000
2023.11.11D16:40:32.981221000
2023.11.11D16:40:33.981221000
\t 0
q).z.p
2023.11.11D16:40:49.621176000
The definition of .z.ts
is hence straightforward: we invoke .u.ts
with the current date .z.D
.
// @param: x (timestamp) - the current timestamp, .z.P
// @return: None
.z.ts:{.u.ts[.z.D]}
.u.upd
Given that there is no timer set on startup and our Tickerplant runs in tick mode, we will publish all incoming data as soon as it is received by the Tickerplant to achieve the lowest latency possible. The following actions are performed by .u.upd
: It first calls .u.ts
to verify whether we reached the end of day or not and the end of day actions need to be performed. The function then checks if the incoming records contain a timespan as first column; if not, it prefixes them with the current timespan. Subsequently, it publishes all records to real-time subscribers, writes them to the Tickerplant Log file, and increases the overall message count, .u.i
, by one.
// @param: x (symbol) - the table name of the data published by the Feedhandler
// @param: y (data) - the data to be published. This can be in form of a single record or a list of records
// @return: None
.u.upd:{[t;x]
.u.ts "d"$a:.z.P;
if[not -16=type first first x;
a:"n"$a;
x:$[0>type first x;a,x;(enlist (count first x)#a),x]];
f:key flip value t;
.u.pub[t;$[0>type first x;enlist f!x;flip f!x]];
if[.u.l;.u.l enlist (`upd;t;x);.u.i+:1];
};
The initial section of the .u.upd
definition mirrors the one we've previously discussed. First, the current timestamp is stored in the local variable a
, which will serve as the Tickerplant timestamp. Then, this timestamp is converted to a date, and .u.ts
is called to confirm whether we've moved past midnight and require the execution of the end-of-day function.
.u.ts "d"$a:.z.P;
Similar to the previous process, we check if the incoming data's first column is a timestamp. If not, the current timestpan is prepended to the data. This involves converting the variable a
—holding the present timestamp—to a timespan. Using the conditional operator $
, we distinguish between a single record or a bulk update to adapt the data. A single record receives a singular atom timespan as a prefix, while a bulk update generates a list of timespans, matching the number of received records, and applies it as a prefix.
if[not -16=type first first x;
a:"n"$a;
x:$[0>type first x;a,x;(enlist (count first x)#a),x]];
The final segment of this function distinguishes itself from the previous definition as it immediately publishes the records upon arrival. It involves obtaining the column names of the received data by flipping the target table to a dictionary format. Using the key
operator, we retrieve the dictionary's keys, representing the table's column names. It's essential to note that as t
stores the table name as a symbol, using value
is required to access the content of the table t
, even if it's currently an empty table.
f:key flip value t;
The above result could have achieved using the cols
operator on the table.
q)trade:([] sym:`symbol$(); price:`float$())
q)key flip value `trade
`sym`price
q)cols value `trade
`sym`price
q)(cols value `trade)~key flip value `trade
1b
Next we publish the data to all real-time subscribers by using the .u.pub
function.
.u.pub[t;$[0>type first x;enlist f!x;flip f!x]];
We first check if we are publishing a single record or a bulk update. Similar to previous checks, we evaluate the type of the initial record in our data. A negative response to the type
function, indicated by 0>type first x
, confirms that we are publishing a single record. On the contrary, a false
result denotes the publication of multiple records. Employing the conditional operator $
allows us to take appropriate action for both scenarios. If we're dealing with a single record, we create a dictionary by mapping the column names stored in variable f
to the actual data stored in variable x
, thus creating a column-dictionary. We then enlist this dictionary, transforming it from a column-dictionary, into a single-row table.
enlist f!x
A KDB/Q table is basically a list of dictionaries, meaning that if you enlist a dictionary, you obtain a single row of a table. If you flip a column dictionary, a dictionary where each key contains a list of values of the same length, you obtain a table. On the other hand, if you take only one row of a table, the data will be represented as a dictionary
q)`sym`price!(`GOOG;200.0)
sym | `GOOG
price| 200f
q)enlist `sym`price!(`GOOG;200.0)
sym price
----------
GOOG 200
q)flip `sym`price!(`GOOG`APPL`MSFT;200.0 145.9 332.34)
sym price
-----------
GOOG 200
APPL 145.9
MSFT 332.34
q)trade:([] sym:`GOOG`APPL`MSF;price:200.0 145.9 332.34)
q)trade
sym price
-----------
GOOG 200
APPL 145.9
MSF 332.34
q)first trade
sym | `GOOG
price| 200f
q)trade[1]
sym | `APPL
price| 145.9
q)last trade
sym | `MSF
price| 332.34
If we are dealing with a bulk update on the other hand, we map the column names to the list of data records, creating a column dictionary. Subsequently, this column dictionary is flipped to form a table.
flip f!x
Finally, the data can be published using .u.pub
, the corresponding table name t
and the updated data records stored in x
.u.pub[t;$[0>type first x;enlist f!x;flip f!x]];
The final step involves inserting the received data into the Tickerplant Log, provided it exists, and updating the overall message count .u.i
by one. As previously explained, we ensure that the handle to the Tickerplant Log exists, and if so, the function appends a parse tree that comprises the upd
function as the initial element, the table name t as the second element, and the data as the third element. Subsequently, the function utilizes enlist to add the records to the Tickerplant Log by writing to its handle .u.l
.
if[.u.l;.u.l enlist (`upd;t;x);.u.i+:1];
We have now defined all necessary functions to operate our lightweight Tickerplant. The final line in the tick.q file calls the .u.tick
function with the name of the file that contains the schema definitions and the designated path to store the Tickerplant Log file.
.u.tick[src;.z.x. 1]
We've completed our Tickerplant walkthrough. Next, we'll briefly examine the r.q
file containing the Real-time Database code.
The Real-Time Database (RDB) Code Explained- r.q
The RDB, or Real-time Database, is a crucial component in any KDB/Q Tick setup. It preserves all intraday data in memory, ensuring accessibility for business users and other services requiring intraday data queries. Despite its simplicity, the standard Real-time Database, comprised of just three functions, closely resembles RDBs in more sophisticated and advanced KDB/Q Tick configurations.
Let's delve into the code, dissecting it line by line:
We first verify whether our KDB/Q Tick system is running on a Windows or Unix operating system and if we are running on the later we are invoking a sleep
system command to pause the process for one second. This allows the system to register our process before establishing any interprocess communication (TCP/IP)
if[not "w"=first string .z.o;system "sleep 1"];
upd
Next we define the update upd
function that is invoked by the Tickerplant whenever data is published to the Real-time Database. In most cases, the upd
function is defined as a simple insert
. Remember, the Tickerplant sends a parse tree, consisting of upd
as the function, the first element, followed by the table name and the actual data as parameters. Defining upd
as insert
will ensure that any new data is added to the end of the respective table.
upd:insert
Below examples should illustrate this behavior
q)trade:([] time:`timestamp$();sym:`symbol$();price:`float$());
q)quote:([] time:`timestamp$();sym:`symbol$();bid:`float$();ask:`float$());
q)upd:insert
q)// single row update
q)value (`upd;`trade;(.z.P;`GOOG;400.4))
,0
q)trade
time sym price
----------------------------------------
2023.11.13D21:35:46.676306000 GOOG 400.4
q)// multiple row update
q)value (`upd;`trade;(3#.z.P;`GOOG`MSFT`APPL;400.4 134.56 234.5))
1 2 3
q)trade
time sym price
-----------------------------------------
2023.11.13D21:35:46.676306000 GOOG 400.4
2023.11.13D21:36:11.973625000 GOOG 400.4
2023.11.13D21:36:11.973625000 MSFT 134.56
2023.11.13D21:36:11.973625000 APPL 234.5
q)value (`upd;`quote;(3#.z.P;`GOOG`MSFT`APPL;400.4 134.56 234.5;400.5 144.9 235.1))
0 1 2
q)quote
time sym bid ask
-----------------------------------------------
2023.11.13D21:37:06.689310000 GOOG 400.4 400.5
2023.11.13D21:37:06.689310000 MSFT 134.56 144.9
2023.11.13D21:37:06.689310000 APPL 234.5 235.1
We use value
to evalute the simple parse tree. You can read more about parse trees and how to create and evaluate them here
We assign the Tickerplant (TP) port and Historical Database (HDB) port provided as startup parameters to the global variable .u.x
. In the case where no ports are provided upon startup, we assign the default ports 5010
and 5012
to the Tickerplant and Historical Database, respectively.
.u.x:.z.x,(count .z.x)_(":5010";":5012");
q test.q 5010 5012 --debug 1b --test hello
KDB+ 4.0 2023.01.20 Copyright (C) 1993-2023 Kx Systems
q).z.x
"5010"
"5012"
"--debug"
"1b"
"--test"
"hello"
q).z.x 0
"5010"
q).z.x 1
"5012"
q).Q.opt .z.x
-debug| "1b"
-test | "hello"
.u.end
.u.end
is the function invoked when the end-of-day process starts. At this point, the Tickerplant send an asynchronous message to all real-time subscribers and invokes their corresponding end-of-day function .u.end
. As part of the Real-time Database (RDB), it performs the following tasks: it collects the names of all currently defined tables, selecting the subset of tables with the g
grouped attribute applied. It then uses the .Q.hdpf
function to save the current day's data in memory to disk partitions and proceeds to clear the tables in memory, triggers a reload message to the HDB, and lastly, reapplies the grouped attribute g
to the relevant tables.
// @param: x (date) - the current date
// @return: None
.u.end:{[x]
t:tables`.;
t@:where `g=attr each t@\:`sym;
.Q.hdpf[`$":",.u.x 1;`:.;x;`sym];
@[;`sym;`g#] each t;
};
The initial line of code is quite simple; it essentially retrieves the list of tables defined within the current process and stores them in the variable t
.
t:tables`.;
Next we retrieve the subset of tables that have the grouped attribute g
applied and re-assign it to variable t
. We do this by first indexing into the sym
column of each table using the each-left
operator in conjunction with the apply @
operator. For each sym
column we then use the attr
operator to check for applied attributes. This produces a list of attributes, which we can compare to the symbol representing the grouped attribute g
. This comparison generates a boolean mask that marks true (1b)
if a table has the grouped attribute g
applied and false (0b)
if it does not. Using this boolean mask in combination with where
, we index into the list of table names to retrieve only those with the grouped attribute g
applied. Finally, we overwrite the t
variable with this updated list.
t@:where `g=attr each t@\:`sym;
Using .Q.hdpf
we then save all in-memory tables into a partitioned Historical Database (HDB), clear the in memory tables and instruct the Historical Database (HDB) to reload. .Q.hdpf
takes the following parameters as arguements
.Q.hdpf[historicalport;directory;partition;`p#field]
In our case this equates to the following:
.Q.hdpf[`$":",.u.x 1;`:.;x;`sym];
where
- historicalport: the location of our HDB `$":",.u.x 1
- directory: the root directory of our HDB, the current directory `:.
- partition: the partition to write the data to, x the input/date passed to .u.end
- `p#field: the column to sort for and apply the parted attribute `p on, `sym in this case
Finally, we re-apply the grouped attribute g
to the tables that had the attribute applied before the data purge.
@[;`sym;`g#] each t;
This concludes our end-of-day function, and we can proceed to the last function defined in the r.q
file: the replay function .u.rep.
.u.rep
The last function in our Real-time Database (RDB) is also one of the most crucial ones. .u.rep
is the function responsible for replaying the Tickerplant Log file and is initially invoked when the RDB starts up. It receives a list of pairs constisting of the table names and their corresponding schema as the first parameter. This information is used to initialise all tables with their respective schema. The second parameter is a list consisting of the total number of messages the Tickerplant has received so far as first element and the location of the Tickerplant Log file as second element. This information is then utilized to replay the Tickerplant Log file, ensuring that the RDB reflects the current state of the world. Let's examine the functionality of .u.rep
together:
// @param: x (List) - List of pairs containing table names as first element and their corresponding table schema as second element
// @param: y (List) - List consisting of the total number of elements the Tickerplant has received so far and the Tickerplant Log file location
// @return: None
.u.rep:{
(.[;();:;].)each x;
if[null first y;:()];
-11!y;
system "cd ",1_-10_string first reverse y
};
Before we dive deeper into the functionality of .u.rep
let's have a quick look at the structure of that first parameter x
that the function takes. If you remember, whenever a real-timer subscriber subscribes to the Tickerplant by invoking .u.sub
it will receive a list of pairs consisting of table names and their corresponding schema. This structure is illustrated in the code below. If we examine the content as a whole and then break it down into individual elements, we can observe that each element of the list is a pair, with the table name as the first element and the respective empty schema as the second element:
q)x
`quote +`time`sym`bid`ask`bsize`asize!(`timespan$();`g#`symbol$();`float$();`float$();`int$();`int$())
`trade +`time`sym`price`size!(`timespan$();`g#`symbol$();`float$();`int$())
q)x 0
`quote
+`time`sym`bid`ask`bsize`asize!(`timespan$();`g#`symbol$();`float$();`float$();`int$();`int$())
q)x 1
`trade
+`time`sym`price`size!(`timespan$();`g#`symbol$();`float$();`int$())
q)x[0;0]
`quote
q)x[0;1]
time sym bid ask bsize asize
----------------------------
q)x[1;0]
`trade
q)x[1;1]
time sym price size
-------------------
The initial line of code may seem familiar. As previously discussed in the context of .u.ld
, I explained that .[x;();:;y]
is essentially equivalent to the K implementation of the set
operator. This code, combined with the dot-apply .
operator, enables us to initialize each table with its respective table schema.
// Illustration of dot-apply
q){x+y} . (2;3)
5
q)x
`quote +`time`sym`bid`ask`bsize`asize!(`timespan$();`g#`symbol$();`float$();`float$();`int$();`int$())
`trade +`time`sym`price`size!(`timespan$();`g#`symbol$();`float$();`int$())
q)(.[;();:;].) each x
`quote`trade
q)quote
time sym bid ask bsize asize
----------------------------
q)trade
time sym price size
-------------------
After initializing all tables with their corresponding schema, we check whether the number of messages received by the Tickerplant so far is null or not. This is done by examining the first element of the parameter y
. If there were no messages received, and the value is null, we terminate the function early and return an empty list. Otherwise, we proceed to replay the Tickerplant Log file using the system function -11!
and the parameter y
.
Given that y
contains the number of received messages and the path to the Tickerplant Log file, -11!
will replay the first N
messages of the Tickerplant Log file
-11!y;
Finally, we change the current directory to the directory of the Historical Database (HDB) to ensure data is saved to the correct location when the RDB invokes the end-of-day save down and calls .Q.hdpf
. In the plain vanilla Tick setup, the Tickerplant Log and the Historical Database are saved under the same path. Therefore, we can extract the first part of the path and use it to change the current directory.
system "cd ",1_-10_string first reverse y
RDB startup
The final line of code in r.q
is, in fact, the only line that gets executed. Upon startup of the Real-time Database, after initializing all functions with their definitions, it calls .u.rep . (hopen `$":",.u.x 0)"(.u.sub[`;`];`.u `i`L)";
. This establishes a connection to the Tickerplant and sends a synchronous message, invoking the .u.sub
function to subscribe to all available tables for all symbols. Additionally, the Real-time Database accesses the .u
namespace to retrieve .u.i
and .u.L
– the number of received messages (.u.i
) and the path to the Tickerplant Log file (.u.L
). This results in a two-item list, where the first item is a list of pairs containing a table name and an empty table schema. The second item of the list consists of the total number of messages received by the Tickerplant and the path to the Tickerplant Log file.
ChatGPT
Observe the elegance of "Q-style" retrieval for .u.i
and .u.L
: Instead of individually retrieving .u.i
and .u.L
, we take advantage of the fact that you can pass multiple indices to a data structure for retrieving their elements. This functionality extends across all data structures, including lists, dictionaries, or tables. Because a namespace (.u
in this case) is represented as a dictionary, this also applies to namespaces. The following example illustrates this concept.
// Namespace
q).u.i:345345
q).u.L:system "pwd"
q).u.L
"/Users/Alexander/repos/testing"
q).u `i`L
345345
,"/Users/Alexander/repos/testing"
// List
q)list:1 2 3 4 5
q)list 0 1 4
1 2 5
// Table
q)table:([] a:`a`b`c; b:1 2 3)
q)table 1 2
a b
---
b 2
c 3
// Dictionary
q)dict:`a`b`c!1 2 3
q)dict `c`a
3 1
While you generally want to keep all communication asynchronous within a KDB/Q Tick setup, the subscription of a real-time subscriber to the Tickerplant must be synchronous. The initialisation of the table schemas as well as the Tickerplant replay is crucial for the real-time subscriber to bring itself up to date with the current state of the world as well as be able to receive future updates, which would not be possible without the table schema
This two-item list is then passed as parameters to the dyadic function .u.rep
, which proceeds to initialize all tables with their corresponding table schemas. It further checks whether the Tickerplant Log file contains any messages and, if so, replays the Tickerplant Log file to synchronize the real-time subscriber with the current state of the world.
This concludes our comprehensive walkthrough of a complete, vanilla KDB/Q Tick setup, providing you with a thorough understanding of its inner workings. We delved into each process in detail, explaining key KDB/Q concepts along the way.
Limitations of KDB/Q Tick
Although this setup serves as an excellent foundation for capturing real-time data, it is essential to acknowledge some inherent limitations, which will be discussed in the following sections.
Tickerplant doesn't deal with corrupt Tickerplant Log files
** The Problem:**
One of the primary drawbacks of our basic Tickerplant is its inability to address corrupted Tickerplant Log files. Upon reviewing the .u.ld
function, you'll notice that encountering a corrupted Tickerplant Log file leads to the termination of the Tickerplant with an error message. While a corrupt Log file is indeed a critical issue requiring manual attention, there are alternative approaches to handle this situation. If your Tickerplant is inactive, it implies potential data loss.
Possible solution:
One possible solution is to utilize the -2
option of -11!
to identify the count of valid messages in the Tickerplant Log File, replay them, and then generate an alert message to a monitoring service, prompting investigation into the cause of the Log file corruption.
Process location: One host only
** The Problem:**
The architecture of this KDB/Q Tick system mandates the Real-time Database or any other real-time subscriber to replay the Tickerplant Log on startup for recreating the current state of the world and retrieving all messages received by the Tickerplant up to that moment. However, this requires the Tickerplant Log file to be present on the same host or server as the process attempting to replay the log file. As we pass the path of the Tickerplant Log file as a parameter to -11!
, the function used for replaying the log file, this requirement not only dictates that all real-time subscribers must be on the same host, impacting our hardware needs, but it also makes our system less flexible and limits its extensibility.
** Possible Solution:**
A potential solution to this challenge could involve creating an independent process dedicated to replaying the Tickerplant Log file whenever necessary and forwarding the messages to the real-time subscriber that triggered the replay. Another alternative could be the introduction of a Chained Tickerplant (CTP) residing on a secondary host. The CTP would subscribe to the main Tickerplant on the primary host and maintain a local Tickerplant Log file accessible to all real-time subscribers on that specific host. Both solutions require additional design and implementation, and a detailed explanation goes beyond the scope of this post. Nevertheless, they serve as potential ideas for consideration.
Unnecessary serialisation of messages
** The problem:**
The existing implementation of .u.pub
disseminates all messages received by the Tickerplant to each real-time subscriber separately. In other words, the messages are serialized and dispatched to each subscriber individually. This could potentially increase the latency of our system.
** Possible Solution:**
This problem only has a partial solution, depending on the KDB/Q version your system is running on. In version 3.4 KX introduced asynchronous broadcast -25!
, which serialises outgoing messages only once. If your system is running on version 3.4 or above, you can take advantage of this functionality and reduce CPU and memory load as well as latency.
That's all folks. Happy coding!