[Kubernates] with MSK
kafka-topics.sh methods.
access to pod
get kafka use curl
curl -O https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0/bin
kafka-topics --describe --topic <TOPIC> --bootstrap-server <BROKER_ENDPOINT>:9092
port scanning
for port in {1..65535}
do
(echo > /dev/tcp/127.0.0.1/$port) >/dev/null 2>&1 && echo "$port is open"
done
MSK Ping Test
cat << EOF > TestConnection.java
import java.net.Socket;
public class TestConnection {
public static void main(String[] args) {
try (Socket socket = new Socket("your-msk-broker", 9092)) {
System.out.println("Successfully connected to MSK");
} catch (Exception e) {
System.out.println("Failed to connect to MSK: " + e.getMessage());
}
}
}
EOF'
native-image:ol8-java17-22.3.3
check what I got
ls /bin /usr/bin | grep -E 'curl|wget|dnf|yum|microdnf'
confirm your directory
sh-4.4$ pwd
/app
download
curl -o kafka-console-producer.sh https://raw.githubusercontent.com/apache/kafka/2.8/bin/kafka-console-producer.sh
chmod +x kafka-console-producer.sh
curl -O https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.0/kafka-clients-2.8.0.jar
curl -O https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar
curl -O https://repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.30/slf4j-simple-1.7.30.jar
cat << EOF > producer.properties
bootstrap.servers=<msk-broker-1>:9092,<msk-broker-2>:9092,<msk-broker-3>:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
EOF
echo "Test message" | kubectl exec -i <pod-name> -- bash kafka-console-producer.sh --bootstrap-server <msk-broker-1>:9092,<msk-broker-2>:9092,<msk-broker-3>:9092 --topic test-topic --producer.config producer.properties
First, make sure you have the necessary JAR files:
ls -l *.jar
You should see
kafka-clients-2.8.0.jar
,slf4j-api-1.7.30.jar
, andslf4j-simple-1.7.30.jar
. If not, download them:curl -O https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.0/kafka-clients-2.8.0.jar curl -O https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar curl -O https://repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.30/slf4j-simple-1.7.30.jar
Now, let's create a simple Java program to produce a message:
cat << EOF > KafkaProducerTest.java import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<YOUR_CLUSTER_URL>"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<>(props); String topic = "test-topic"; String message = "this message sent from POD!!!"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); try { RecordMetadata metadata = producer.send(record).get(); System.out.println("Message sent successfully to topic " + metadata.topic() + " partition " + metadata.partition() + " offset " + metadata.offset()); } catch (Exception e) { System.err.println("Error sending message: " + e.getMessage()); e.printStackTrace(); } finally { producer.close(); } } } EOF
Compile the Java program:
javac -cp kafka-clients-2.8.0.jar:slf4j-api-1.7.30.jar:slf4j-simple-1.7.30.jar KafkaProducerTest.java
Run the program:
java -cp .:kafka-clients-2.8.0.jar:slf4j-api-1.7.30.jar:slf4j-simple-1.7.30.jar
Last updated
Was this helpful?