The following guide provides step-by-step instructions to get started integrating Kinetica with Kafka. The connector can be integrated as part of a Kafka stack or the Confluent platform.
This project is aimed to make Kafka topics accessible to Kinetica, meaning data can be streamed from a Kinetica table or to a Kinetica table via Kafka Connect. The custom Kafka Source Connector and Kafka Sink Connector do no additional processing, though limited data transformations can be made.
The two connector classes that integrate Kinetica with Kafka are:
com.kinetica.kafka.KineticaSourceConnector
-- A Kafka Source Connector,
which receives a data stream from the database via table monitorcom.kinetica.kafka.KineticaSinkConnector
-- A Kafka Sink Connector,
which receives a data stream from a Kafka Source Connector and writes it to
the databaseThe connector provided in this project assumes launching will be done on a server capable of running the Kinetica Kafka Connectors in standalone mode or in a cluster.
Kafka Connect allows you to configure the Kinetica Kafka Connector exactly the same for a simple Kafka stack as you would for an integrated Confluent platform. The following table is provided for the ease of identifying compatible dependency versions.
Confluent Platform | Apache Kafka | Java |
---|---|---|
3.1.x | 0.10.1.x | 1.7.0_60, 1.8.0_60 |
3.2.x | 0.10.2.x | 1.7.0_60, 1.8.0_60 |
3.3.x | 0.11.0.x | 1.7.0_60, 1.8.0_60 |
4.0.x | 1.0.x | 1.7.0_60, 1.8.0_60 |
4.1.x | 1.1.x | 1.7.0_60, 1.8.0_60 |
5.0.x | 2.0.x | 1.8.0_60 |
5.1.x | 2.1.x | 1.8.0_60 |
The Kinetica Kafka Connector has a kafka.version
property in the
pom.xml
configuration file to set the Kafka version appropriate for the
target environment. The build process will add the Kafka version to the name
of the JAR:
kafka-<kafka.version>-connector-kinetica-7.0.<X>.<Y>-jar-with-dependencies.jar
The Connector code is Java 7 compatible and does not require a separate build to support a Java 8 environment.
Clone and build the project as follows:
git clone https://github.com/kineticadb/kinetica-connector-kafka
cd kinetica-connector-kafka/kafka-connect-kinetica
mvn clean compile package -DskipTests=true
Three JAR files are produced by the Maven build, in
kinetica-connector-kafka/kafka-connect-kinetica/target/
:
kafka-<ver>-connector-kinetica-<ver>.jar
- default JAR (not for use)kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar
- complete connector JARkafka-<ver>-connector-kinetica-<ver>-tests.jar
- tests JAR (see below how to use it to test connectivity)To run JUnit tests as part of build process, make sure that you have a running Kinetica instance, and that its URL and user credentials are properly configured in the following files:
config/quickstart-kinetica-sink.properties
config/quickstart-kinetica-source.properties
Then, run:
mvn clean compile package
The connector JAR built in the previous section is a Kafka Connect plugin. A Kafka Connect plugin is simply a set of JAR files where Kafka Connect can find an implementation of one or more connectors, transforms, and/or converters. Kafka Connect isolates each plugin from one another so that libraries in one plugin are not affected by the libraries in any other plugins.
A Kafka Connect plugin is either:
an uber JAR containing all of the class files for the plugin and its third-party dependencies in a single JAR file:
kinetica-connector-kafka/kafka-connect-kinetica/target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar
a directory on the file system that contains the JAR files for the plugin and its third-party dependencies:
kinetica-connector-kafka/kafka-connect-kinetica/target/kafka-<ver>-connector-kinetica-<ver>-package
|-- etc
|-- kafka-connect-kinetica
|-- quickstart-kinetica-sink.properties
|-- quickstart-kinetica-source.properties
|-- share
|-- doc
|-- kafka-connect-kinetica
|-- licenses
|-- LICENSE.apache2.0.txt
|-- LICENSE.bsd.txt
|-- ...
|-- LICENSE
|-- NOTICE
|-- version.txt
|-- java
|-- kafka-connect-kinetica
|-- ...
|-- kafka-connect-kinetica-<ver>.jar
|-- ...
|-- xz-1.5.jar
To install the Kinetica Kafka Connector at the target server location using a simple Kafka stack, copy the uber JAR into the Kafka libraries folder:
cp target/kafka-<ver>-connector-kinetica-<ver>-jar-with-dependencies.jar <KAFKA_HOME>/libs/
Ensure the configuration in the following files matches the configuration you've tested with your Kafka connector:
<KAFKA_HOME>/config/connect-distributed.properties
<KAFKA_HOME>/config/connect-standalone.properties
Any additional properties files you might need should go in the same folder:
<KAFKA_HOME>/config/
To install the Kinetica Kafka Connector at the target server location using the Confluent platform, copy the contents of the build package folder to the appropriate locations under the Confluent home directory:
mkdir <CONFLUENT_HOME>/share/java/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/share/java/* <CONFLUENT_HOME>/share/java/kafka-connect-kinetica/
mkdir <CONFLUENT_HOME>/etc/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/etc/* <CONFLUENT_HOME>/etc/kafka-connect-kinetica/
mkdir <CONFLUENT_HOME>/share/doc/kafka-connect-kinetica
cp target/kafka-<ver>-connector-kinetica-<ver>-package/share/doc/* <CONFLUENT_HOME>/share/doc/kafka-connect-kinetica/
Important
Following this convention accurately when naming folders and
placing the JAR file and its configuration properties accordingly is
essential to being able to load and start/stop the connector remotely via
REST services. The Unique Connector name in the quickstart properties is
passed to the REST service. The kafka-connect-kinetica
folder name is
treated both as the connector identifier and as a part of the path built on
the fly when the connector is engaged. Starting the folder name with
kafka-connect-<connector name>
is a Confluent convention used for all
Kafka Connect components, such as jdbc, s3, hdfs, and others.
Users can run Kafka Connect in two ways:
In standalone mode, a single process runs all the connectors. It is not fault tolerant. Since it uses only a single process, it is not scalable. Standalone mode can be used for proofs of concept, demonstrations, and testing. It is managed through the CLI.
When testing the connector in standalone mode, the following properties files govern the interaction:
connect-standalone.properties
- used to configure worker tasksquickstart-kinetica.properties
- used to configure the connector itselfTo run the connector, use the following syntax:
Kafka
cd <KAFKA_HOME>
bin/connect-standalone.sh \
config/connect-standalone.properties \
config/quickstart-kinetica.properties
Confluent
cd <CONFLUENT_HOME>
bin/connect-standalone \
etc/kafka/connect-standalone.properties \
etc/kafka-connect-kinetica/quickstart-kinetica.properties
Worker tasks and connector configurations use different sets of configuration parameters. Do not add connector configuration parameters to the worker task configuration, as they would be ignored. Configurations for sink and source connectors differ in the name and number of parameters, although some parameters are common to both.
When you start a connector (sink or source), it should have its own dedicated
port set with the rest.port
parameter. You can't use the same
connect-standalone.properties
file for different connectors running
simultaneously. It is not required, but overall very useful to have separate
configuration files named connect-standalone-sink.properties
and
connect-standalone-source.properties
with preset port values, such as
rest.port=8090
for the sink and rest.port=8089
for the source.
In distributed mode, multiple workers run Kafka Connect and are aware of each others' existence, which can provide fault tolerance and coordination between them during the event of reconfiguration. Distributed mode provides flexibility, scalability, and high availability; thus, it is better suited for production use. It is managed through the REST interface.
Important
See distributed mode for details on using distributed mode for a production deployment.
Kafka Connector configuration sent in REST calls has the same config
properties that are listed in connect-standalone-sink.properties
and
connect-standalone-source.properties
for standalone mode deployments, but
should have the configuration formatted as an application/json
object.
For example, this is the Source connector JSON:
{
"name":"kinetica-source-connector",
"config":{
"name":"kinetica-source-connector",
"connector.class":"com.kinetica.kafka.KineticaSourceConnector",
"tasks.max":"3",
"kinetica.url":"http://localhost:9191",
"kinetica.table_names":"KafkaConnectorTest"
}
}
The value of name
parameter should be consistent, as it is the same for the
outer object (connector) and inner (connector config). It is going to be used as
the connector name as part of the REST endpoint URL.
When testing the connector in distributed mode, use the following syntax to start the Kafka Connect service:
Kafka
cd <KAFKA_HOME>
bin/connect-distributed.sh config/connect-distributed.properties
Confluent
cd <CONFLUENT_HOME>
bin/connect-distributed etc/kafka/connect-distributed.properties
By default, Kafka Connect is listening on port 8083
, and assuming your
bootstrap server IP is 127.0.0.1
, here are examples of the available REST
calls:
Check the available connectors:
curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors
["my-jdbc-source", "my-hdfs-sink"]
Create a new connector (connector object is returned):
curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" --data '{"name":"kinetica-source-connector", "config": {"name":"kinetica-source-connector","connector.class":"com.kinetica.kafka.KineticaSourceConnector","tasks.max":"3","kinetica.url":"http://localhost:9191","kinetica.table_names":"KafkaConnectorTest"}}' http://127.0.0.1:8083/connectors
{
"name":"kinetica-source-connector",
"config":{
"name":"kinetica-source-connector",
"connector.class":"com.kinetica.kafka.KineticaSourceConnector",
"tasks.max":"3",
"kinetica.url":"http://localhost:9191",
"kinetica.table_names":"KafkaConnectorTest"
},
"tasks":[],
"type":null
}
Get info on existing connector (connector object is returned):
curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors/kinetica-source-connector
{
"name":"kinetica-source-connector",
"config":{
"name":"kinetica-source-connector",
"connector.class":"com.kinetica.kafka.KineticaSourceConnector",
"tasks.max":"3",
"kinetica.url":"http://localhost:9191",
"kinetica.table_names":"KafkaConnectorTest"
},
"tasks":[{"connector":"kinetica-source-connector","task":0}],
"type":"source"
}
Variation of the previous call--get the connectors tasks collection only
curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors/kinetica-source-connector/tasks
[
{
"id":{
"connector":"kinetica-source-connector",
"task":0
},
"config":{
"kinetica.timeout":"0",
"kinetica.password":"",
"task.class":"com.kinetica.kafka.KineticaSourceTask",
"kinetica.url":"http://localhost:9191",
"kinetica.kafka_schema_version":"",
"kinetica.table_names":"KafkaConnectorTest",
"kinetica.topic_prefix":"",
"kinetica.username":""
}
}
]
Variation of the previous call--get the connectors config only
curl -X GET -H "Accept: application/json" http://127.0.0.1:8083/connectors/kinetica-source-connector/config
{
"name":"kinetica-source-connector",
"connector.class":"com.kinetica.kafka.KineticaSourceConnector",
"tasks.max":"3",
"kinetica.url":"http://localhost:9191",
"kinetica.table_names":"KafkaConnectorTest"
}
Reconfigure the running connector (would cascade to reconfiguring its tasks).
Ensure that you send only the "config" node from the original JSON connector configuration:
curl -X PUT -H "Accept: application/json" -H "Content-Type: application/json" --data '{"name":"kinetica-source-connector","connector.class":"com.kinetica.kafka.KineticaSourceConnector","tasks.max":"10","kinetica.url":"http://localhost:9191","kinetica.table_names":"KafkaConnectorTest,KafkaConnectorTest2"}' http://127.0.0.1:8083/connectors/kinetica-source-connector/config
{
"name":"kinetica-source-connector",
"config":{
"name":"kinetica-source-connector",
"connector.class":"com.kinetica.kafka.KineticaSourceConnector",
"tasks.max":"10",
"kinetica.url":"http://localhost:9191",
"kinetica.table_names":"KafkaConnectorTest,KafkaConnectorTest2"
},
"tasks":[{"connector":"kinetica-source-connector","task":0}],
"type":"source"
}
Halt the connector's tasks, then delete the connector and its configuration
curl -X DELETE http://127.0.0.1:8083/connectors/kinetica-source-connector
204 No Content
For more considerations on using the Kafka Connect REST API please refer to the Confluent web site.
The KineticaSourceConnector
can be used as-is by Kafka Connect to stream
data from Kinetica into Kafka. The connector will create table monitors to
listen for inserts into a set of tables and
publish the inserted rows to separate topics. A separate Kafka topic will be
created for each database table configured. Data will be streamed in flat
Kafka Connect Struct
format with one field for each table column.
The KineticaSourceConnector
is configured using the
quickstart-kinetica-source.properties
file, which accepts the following
parameters:
Property Name | Required | Description |
---|---|---|
name |
Y | Name for the connector |
connector.class |
Y | Must be com.kinetica.kafka.KineticaSourceConnector |
tasks.max |
Y | Number of threads |
kinetica.url |
Y | The URL of the Kinetica database server |
kinetica.username |
N | Username for authentication |
kinetica.password |
N | Password for authentication |
kinetica.table_names |
Y | A comma-delimited list of names of tables to stream from |
kinetica.topic_prefix |
Y | Token prepended to the name of each topic (see below) |
kinetica.timeout |
N | Timeout in milliseconds (default is no timeout) |
The connector uses kinetica.topic_prefix
to generate the name for the
destination topic from kinetica.table_names
. For example, if
kinetica.topic_prefix
is Tweets.
and an insert is made to table
KafkaConnectorTest
, then it would publish the change to the topic named
Tweets.KafkaConnectorTest
.
The KineticaSinkConnector
can be used as-is by Kafka Connect to stream
data from Kafka into Kinetica. Streamed data must be in a flat
Kafka Connect Struct
that uses only supported data types for fields
(BYTES
, FLOAT64
, FLOAT32
, INT32
, INT64
, and STRING
). No
translation is performed on the data and it is streamed directly into a table.
The target table and collection will be created if they do not exist.
Warning
If the target table does not exist, the connector will create it based on the information available in the Kafka schema. This information is missing potentially performance-impacting column types & properties. If these attributes are important to your application, the table should be created in advance of running the connector, so that these properties can be defined.
The KineticaSinkConnector
is configured using the
quickstart-kinetica-sink.properties
file, which accepts the following
parameters:
Property Name | Required | Description |
---|---|---|
name |
Y | Name for the connector |
connector.class |
Y | Must be com.kinetica.kafka.KineticaSinkConnector |
topics |
Y | Comma-separated list of topics to stream from |
topics.regex |
N | Regular expression used to match against all available topic names to
select topics to stream from; must start with ^ |
tasks.max |
Y | Number of threads |
kinetica.url |
Y | The URL of the Kinetica database server |
kinetica.username |
N | Username for authentication |
kinetica.password |
N | Password for authentication |
kinetica.table_prefix |
N | Prefix for destination tables (see below) |
kinetica.dest_table_override |
N | Override for table name (see below) |
kinetica.collection_name |
Y | Collection to put the table in (default is empty) |
kinetica.batch_size |
N | The number of records to insert at a time (default = 10000 ) |
kinetica.timeout |
N | Timeout in milliseconds (default = 1000 ) |
kinetica.create_table |
N | Automatically create the table, if missing (default = true ) |
kinetica.update_on_existing_pk |
N | When true and the destination table has a primary key, update any
existing records in the destination table with incoming message data
when the primary key value(s) of those records match the corresponding
field values of any incoming messages; otherwise discard any incoming
messages with matching primary key values (default = true ) |
kinetica.allow_schema_evolution |
N | When true , allow schema evolution support for incoming messages;
requires Schema Registry running in the Kafka stack
(default = false ) |
kinetica.single_table_per_topic |
N | When true , put all incoming messages into a single table;
otherwise create a table for each message type (default = false ) |
kinetica.add_new_fields_as_columns |
N | When kinetica.allow_schema_evolution is true and an incoming
message has a new field, create a new column for it in the
destination table (default = false ) |
kinetica.make_missing_field_nullable |
N | When kinetica.allow_schema_evolution is true and an incoming
message does not have a required field, alter the corresponding table
column to be nullable (default = false ) |
kinetica.retry_count |
N | Number of attempts to insert data before task fails (default = 1 ) |
The topics
and topics.regex
parameters are mutually exclusive. Either
the names of Kafka topics to subscribe to are provided explicitly via
topics
, or a regular expression in topics.regex
will be used to filter
all available Kafka topics for matching topic names.
You can use the optional kinetica.table_prefix
parameter to prepend a token
to the table name. This is useful for testing where you have a source
connector reading from multiple tables and you want the sink connector to
write to different tables in the same database.
You can also use the optional kinetica.dest_table_override
parameter to
manually specify a table name not generated from the Kafka schema. When the
topics
parameter has a comma-separated list of topic names,
kinetica.dest_table_override
should be a comma-separated list of table names
corresponding to those topics. When topics are selected by the topics.regex
expression, the kinetica.dest_table_override
parameter is ignored.
If kinetica.single_table_per_topic
is set to true
, a single table is
created with the name of the topic as its name. All incoming data would be
formatted to fit its structure and column types. If the parameter is set to
false
, there may be multiple tables created for a single topic, depending on
the number of unique message schemas read from the topic. The name of the
destination table is then based on the message schema.
If kinetica.allow_schema_evolution
is set to true
, schema versions of
the message objects will be looked up through the Schema Registry. Additional
parameters, kinetica.add_new_fields_as_columns
and
kinetica.make_missing_field_nullable
, allow modification of Kinetica
tables on the fly upon receiving Kafka messages with new or missing fields.
The default value for these three parameters is false
, as mapping schemas
and altering tables are expensive, time-consuming operations. This set of
parameters was added to support Avro Schema Evolution and connecting to the
Schema Registry. If kinetica.allow_schema_evolution
is set to false
,
the connector assumes the object format is not going to change over time and
will not attempt to map field names and types of incoming data to cached
schemas, even if the Schema Registry service is available. Every schema
version other than the version available at the time the connector subscribed to
the topic would be blacklisted and data in that format ignored.
Note
You can find more on schema evolution rules in Apache Avro Specification and in Kafka Schema Evolution.
If you intend to use the Kafka Schema Registry (with or without a possibility
of Schema Evolution), please configure the following parameters in your
connect-standalone.properties
file. The Schema Registry service is
usually set up on the same server as bootstrap-server, port 8081
.
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true
Starting with Kafka 2.0.0 (Confluent 5.0.0) you can configure additional error-handling and error-logging options; all are optional:
Property Name | Description |
---|---|
errors.tolerance |
Whether to fail the task upon exceeding retires for any message
(default =
|
errors.retry.timeout |
Total time that a failed operation will be retried, in milliseconds; use
-1 for infinite retries [-1, Long.MAX_VALUE] (default = 0 ) |
errors.retry.delay.max.ms |
Maximum delay between consecutive retries, in milliseconds
[1, Long.MAX_VALUE] (default = 60000 ) |
errors.log.enable |
Whether to enable error context logging, including the message and
failure detail (default = false ) |
errors.log.include.messages |
Whether to log the content of failed messages (default = false ) |
errors.deadletterqueue.topic.name |
Name of Kafka topic to redirect failed messages to; if not given, the dead letter queue will not be used (default behavior) |
errors.deadletterqueue.topic.replication.factor |
Replication factor used for an automatically created dead letter queue
topic [1, Short.MAX_VALUE] (default = 3 ) |
errors.deadletterqueue.context.headers.enable |
Whether to log extra metadata of failed messages (default = false ) |
These options are added to the connector configuration files:
quickstart-kinetica-sink.properties
quickstart-kinetica-source.properties
The errors.deadletterqueue
family of options are valid for the sink connector only.
The datapump utility is used to generate insert activity on tables to
facilitate testing. It will create tables KafkaConnectorTest
and
KafkaConnectorTest2
and insert records at regular intervals.
usage: TestDataPump [options] [URL]
-c,--configFile <path> Relative path to configuration file
-d,--delay-time <seconds> Seconds between batches
-h,--help Show usage
-n,--batch-size <count> Number of records per batch
-t,--total-batches <count> Number of batches to insert
The below example (built for Kafka 2.0.0 and Kinetica 7.0.0.0) runs the datapump with default options on a local Kinetica instance and will insert batches of 10 records every 3 seconds.
java -cp kafka-2.0.0-connector-kinetica-7.0.0.0-tests.jar:kafka-2.0.0-connector-kinetica-7.0.0.0-jar-with-dependencies.jar \
com.kinetica.kafka.TestDataPump \
http://localhost:9191
A configuration file can be used to provide the Kinetica DB instance configuration, including URL, username, password, and timeout:
java -cp kafka-2.0.0-connector-kinetica-7.0.0.0-tests.jar:kafka-2.0.0-connector-kinetica-7.0.0.0-jar-with-dependencies.jar \
com.kinetica.kafka.TestDataPump \
-c config/quickstart-kinetica-sink.properties
This test will demonstrate the Kinetica Kafka Connector source and sink in standalone mode. The following steps use example files that may require modifications to match the target environment.
Create two configuration files, connect-standalone-source.properties
&
connect-standalone-sink.properties
, in the <KAFKA_HOME>/config
directory with the following content:
# This should point to your Kafka broker
bootstrap.servers=localhost:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=5000
# Source port
#rest.port=8089
# Sink port
#rest.port=8090
# needed for Kinetica
key.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
# Disable schemas for internal key/value parameters
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
In the connect-standalone-source.properties
file, uncomment the source
port:
# Source port
rest.port=8089
In the connect-standalone-sink.properties
file, uncomment the sink port:
# Sink port
rest.port=8090
In the same directory, create a configuration file (source.properties
)
for the source connector:
# Connector API required config
name=TwitterSourceConnector
connector.class=com.kinetica.kafka.KineticaSourceConnector
tasks.max=1
# Kinetic specific config
kinetica.url=http://localhost:9191
kinetica.table_names=KafkaConnectorTest,KafkaConnectorTest2
kinetica.timeout=1000
kinetica.topic_prefix=Tweets.
In the same directory, create a configuration file (sink.properties
) for
the sink connector:
name=TwitterSinkConnector
topics=Tweets.KafkaConnectorTest,Tweets.KafkaConnectorTest2
connector.class=com.kinetica.kafka.KineticaSinkConnector
tasks.max=1
# Kinetic specific config
kinetica.url=http://localhost:9191
kinetica.collection_name=TEST
kinetica.table_prefix=out_
kinetica.timeout=1000
kinetica.batch_size=100
The rest of the system test will require four terminal windows:
In terminal 1, start Zookeeper and Kafka:
cd <KAFKA_HOME>
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties
In terminal 2, start the test datapump. This will create the
KafkaConnectorTest
and KafkaConnectorTest2
tables and generate insert
activity:
java -cp kafka-2.0.0-connector-kinetica-7.0.0.0-tests.jar:kafka-2.0.0-connector-kinetica-7.0.0.0-jar-with-dependencies.jar \
com.kinetica.kafka.TestDataPump -c config/sink.properties
In terminal 3, start the Kafka sink connector:
cd <KAFKA_HOME>
bin/connect-standalone.sh config/connect-standalone-sink.properties config/sink.properties
In terminal 4, start the Kafka source connector:
cd <KAFKA_HOME>
bin/connect-standalone.sh config/connect-standalone-source.properties config/source.properties
Verify that data is copied to tables out_KafkaConnectorTest
and
out_KafkaConnectorTest2
.
To test schemaless JSON format, in the connect-standalone-source.properties
& connect-standalone-sink.properties
files, set:
key.converter.schemas.enable=false
value.converter.schemas.enable=false
For more information, please refer to configuration descriptions and test
scenarios in the kinetica-connector-kafka
GitHub project, under
test-connect/README.md.