Tuesday, 19 December 2023

Data Integrity Kafka

#!/bin/bash
brokers1=localhost-src:9092
brokers2=localhost-dest:9092
numberofpart=$(kafka-topics.sh --bootstrap-server $brokers1 --describe --topic $1 | grep PartitionCount | awk '{print $6}')
echo ""
echo "Topic $1 has total $numberofpart partition(s)"
echo ""
echo "Now Checking newest msg from every partition"
echo ""
echo "------------------------------------------------------------------------------------------------------------------"
printf "| Partition no |     Src Cluster Hash |      Dest Cluster Hash | Match/No Match | \n"
echo "------------------------------------------------------------------------------------------------------------------"
for (( c=0; c<$numberofpart; c++ ))
do
 srcvalue=$(kcat -q -C -b $brokers1 -t $1 -p $c -o -1 -e | md5sum | sed s/'\s'//g)
 destvalue=$(kcat -q -C -b $brokers2 -t $1 -p $c -o -1 -e | md5sum| sed s/'\s'//g)
 if [ "$srcvalue" = "$destvalue" ]; then
     echo "       $c     $srcvalue     $destvalue        Match"
 else
     echo "       $c     $srcvalue     $destvalue        No Match"
 fi
done
echo ""
echo "Now Checking oldest msg from every partition"
echo ""
echo "------------------------------------------------------------------------------------------------------------------"
printf "| Partition no |     Src Cluster Hash |      Dest Cluster Hash | Match/No Match | \n"
echo "------------------------------------------------------------------------------------------------------------------"
for (( c=0; c<$numberofpart; c++ ))
do
 srcvalue=$(kcat -q -C -b $brokers1 -t $1 -p $c -o beginning -c 1 -e | md5sum | sed s/'\s'//g)
 destvalue=$(kcat -q -C -b $brokers2 -t $1 -p $c -o beginning -c 1 -e | md5sum| sed s/'\s'//g)
 if [ "$srcvalue" = "$destvalue" ]; then
     echo "       $c    $srcvalue     $destvalue        Match"
 else
     echo "       $c    $srcvalue     $destvalue        No Match"
 fi
done
echo ""