Wednesday 21 December 2022

Apache Storm - An Introduction

  Apache Storm is an open-source real-time solution for data stream processing. It accepts huge amount of data coming in extremely fast manner, can be from multiple sources, analyse it, and publish Real Time updates to some data source without storing any actual data. It is highly available for parallel execution, scalable, and fault-tolerant. It is generally used for real-time analytics, machine learning, and unbounded stream processing. Let's try to understand its basic terminology.

Apache Storm Basics

Topology : The logic for a realtime application is packaged into a Storm topology. A Storm topology is analogous to a MapReduce job. A topology is a graph of spouts and bolts that are connected with stream groupings.

Streams : A stream is an abstraction of flow of data bytes in a pipeline where data is flowing from one pipe to another. It is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion.

Spout : As the name suggests, it is the source of streams in a topology. Generally spouts read tuples (data stream) from an external source (queueing broker) like Kestrel, RabbitMQ, or Kafka. There are two types of spouts, reliable or unreliable. A reliable spout will replay the tuple if it fails while processing, whereas unreliable spout emit and forget about the tuple. A reliable spout uses ack or nack methods for its processing.

Bolts : Processing logic of streaming data is implemented in Bolts for example filtering, functions, aggregations, joins or talking to databases. One bolt output can be an input for another bolt. Tuple transformation logic is written in execute method of Bolt.

Storm Topology Graph Example

Stream groupings : A stream grouping defines how that stream should be partitioned among the bolt's tasks. Below are few important in-build stream grouping:

  • Shuffle : Tuples are spread to bolts in a random fashion in order to maintain equal load on all bolts.
  • LocalOrShuffle : If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping. Sometimes LocalOrShuffle is more performant than Shuffle.
  • Fields : Tuples are partitioned by the fields specified in the grouping. For example customerId field can be taken for partitioning stream to various bolts.

Apache Storm Code example

Till now we have covered some basics of Apache Storm. Now let's dive in with some working examples. Here we will take a topology example which takes source data from Kafka, processes it and saves the result in Cassandra. For this we have to create a topology which contains :
  1. Kafka Spout
  2. Processor Bolt
  3. Cassandra writer bolt
Topology Graph for Code Example


Kafka packet structure (Input) :
{
   "id" : "18276382",
   "userId" : "12345",
   "tnxId" : "Pscj17293Ahe7292847Uxw"
}

Cassandra table structure (Output) :
{
   "id" : "18276382",
   "user_id" : "12345",
   "tnx_id" : "Pscj17293Ahe7292847Uxw"
}

Kafka Spout consumer :
final KafkaSpoutConfig<String, String> kafkaSpoutConfig = 
        KafkaSpoutConfig.builder("localhost:9092", "ECC_spout_topic")
          .setProp("group.id", "ECC_spout_CONSUMER")
          .setProp("key.deserializer", StringDeserializer.class)
          .setProp("value.deserializer", StringDeserializer.class)
          .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE)
          .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
          .setMaxUncommittedOffsets(1)
          .setOffsetCommitPeriodMs(2000)
          .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20)
          .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000)
          .build();
final KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);

Kafka Spout properties

  • ProcessingGuarantee.AT_LEAST_ONCE : An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once)
  • MaxUncommittedOffsets (max.uncommitted.offsets) : maximum number of polled offsets (records) that can be pending commit before another poll can take place. When this limit is reached, no more offsets can be polled until the next successful commit sets the number of pending offsets below the threshold.
  • OffsetCommitPeriodMs (offset.commit.period.ms) : specifies the period of time (in milliseconds) after which the spout commits to Kafka
  • MAX_POLL_RECORDS_CONFIG : The maximum number of records returned in a single call to poll()
  • MAX_PARTITION_FETCH_BYTES_CONFIG : The maximum amount of data per-partition the server will return

DTO class for Kafka topic
@AllArgsConstructor
@NoArgsConstructor
@ToString
@Getter
@JsonIgnoreProperties(ignoreUnknown = true)
public class EccProcessorDto {

  private String id;
  private Integer userId;
  private String tnxId;
}

Processor Bolt which will parse Kafka packet to custom DTO and forward it to Cassandra writer bolt :
public class EccProcessorBolt extends BaseRichBolt {

  private static final long serialVersionUID = 173815L;
  private static final Logger LOGGER = LoggerFactory.getLogger(EccProcessorBolt.class);

  private final ObjectMapper MAPPER = new ObjectMapper();
  private OutputCollector collector;

  @Override
  public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    LOGGER.info("EccProcessorBolt prepare");
    this.collector = outputCollector;
  }

  @Override
  public void execute(Tuple tuple) {
    LOGGER.info("tuple : {}", tuple);
    final String packetData = tuple.getStringByField("value");

    EccProcessorDto eccProcessorDto = null;
    if (!StringUtils.isEmpty(packetData)) {
      try {
        eccProcessorDto = MAPPER.readValue(packetData, EccProcessorDto.class);
      } catch (Exception e) {
        LOGGER.error("error while parsing kafka packet data : {}", packetData, e);
      }
    }

    // Ignore kafka packet for any invalid value and do not emit
    if (eccProcessorDto == null || eccProcessorDto.getId() == null
        || eccProcessorDto.getTnxId() == null || eccProcessorDto.getUserId() == null) {
      this.collector.ack(tuple);
      return;
    }
    
    final List<Object> output = Arrays.asList(eccProcessorDto.getId(), eccProcessorDto.getUserId(), eccProcessorDto.getTnxId());

    // only emit packets are pushed to Cassandra stream
    collector.emit("ECC_OUTPUT_STREAM", output);
    this.collector.ack(tuple);
  }
	
}

Guaranteeing of Message Processing

There are two ways to emit a record from a bolt : anchored or without anchored
Case 1 : without anchored
collector.emit("ECC_OUTPUT_STREAM", output);
Case 2 : with anchored
collector.emit("ECC_OUTPUT_STREAM", tuple, output);
In case 2, with anchoring, the spout tuple at the root of the tree will be replayed later on if the tuple failed to be processed downstream. In case 1, without anchoring, if there is a failure in downstream bolt, then tuple will be replayed from that downstream bolt/spout only.
 
For better understanding Guaranteeing of Message Processing read this

Writer Bolt for Cassandra :
private static BaseCassandraBolt getBaseCassandraBolt() {

  final PropertiesReader propertiesReader = new PropertiesReader();
  final Map cassandraConfig = propertiesReader.readProperties("cassandra.properties");
  final List<String> attributes = Arrays.asList("id", "user_id", "tnx_id");
  final String queryString = "INSERT INTO ECC.ecc_table (" + StringUtils.join(attributes, ",") + ") values (?,?,?);";
  final List<FieldSelector> selectors = new ArrayList<>();

  for (String colName : attributes) {
    selectors.add(new FieldSelector(colName));
  }

  CqlMapper.SelectableCqlMapper cqlMapper = new CqlMapper.SelectableCqlMapper(selectors);
  final List<String> cassandraColumns = Arrays.asList("id", "user_id", "tnx_id");

  final BaseCassandraBolt cassandraWriterBolt = new CassandraWriterBolt(
              DynamicStatementBuilder.async(DynamicStatementBuilder.simpleQuery(queryString).with(cqlMapper)))
          .withCassandraConfig(cassandraConfig)
          .withResultHandler(new CustomExecutionResultHandler())
          .withStreamOutputFields("ECC_OUTPUT_STREAM", new Fields(cassandraColumns));

  return cassandraWriterBolt;
}

Creating Topology with all spout and bolts :
public static void main(String[] args) {

  // Building topology
  final TopologyBuilder builder = new TopologyBuilder();

  builder.setSpout("ECC_SPOUT_ID", kafkaSpout, 2);  // parallelism_hint = 2
                                                    // NumTasks is automatically set to 2

  builder
    .setBolt("ECC_PROCESSOR_BOLT", new EccProcessorBolt(), 2)
    .localOrShuffleGrouping("ECC_SPOUT_ID")
    .setNumTasks(4);

  builder
    .setBolt("ECC_CASSANDRA_BOLT", getBaseCassandraBolt(), 6)
    .localOrShuffleGrouping("ECC_PROCESSOR_BOLT", "ECC_OUTPUT_STREAM");

  // Topology level configs
  final Config conf = new Config();
  conf.setNumWorkers(1);
  conf.setMessageTimeoutSecs(60);

  // Topology submit command
  try {
    StormSubmitter.submitTopology("CASSANDRA_CONSUMER_TOPOLOGY", conf, builder.createTopology());
    LOGGER.info("CASSANDRA_CONSUMER_TOPOLOGY topology started");
  } catch (Exception e) {
    LOGGER.error("Error in CASSANDRA_CONSUMER_TOPOLOGY", e);
  }
}

Parallelism in Apache Storm

Let's try to understand topology level config params.
Three main entities which are used to run a topology in a Storm cluster:
  • Worker processes
  • Executors (threads)
  • Tasks
Illustration of Worker Process, Executor and Task

As in above code example (and diagram) : 
There are total 2 worker processes for this Topology
Kafka Spout has parallelism_hint = 2 and NumTasks = 2
Processing Bolt has parallelism_hint = 2 and NumTasks = 4
Kafka Spout has parallelism_hint = 6 and NumTasks = 6

Worker process is a process running on a machine for a Storm topology. One worker process may run one or more executor threads. Executor thread runs topology tasks related to spout or bolt. Task is the actual implementation of data processing. Each worker process runs executors for a specific topology.
  • setNumWorkers : How many worker processes to create for the topology across machines in the cluster
  • parallelism_hint : initial number of executors threads for a component (spout/bolt)
  • setNumTasks : How many tasks to create per component. By default, the number of tasks is set to be the same as the number of executors, i.e. Storm will run one task per thread.
Assigning workers, tasks and executors for a topology is an important strategy to make topology balanced and properly utilised. Spout/Bolt task count must be in proportion with the number of workers. We should keep these numbers in such a way that load is distributed and balanced. For example, if we are running topology which is having 2 worker processes and 1 spout task then only 1 worker is having that spout which means one worker is getting all packets.So parallelism is not achieved in this example and workers are also not load balanced so better to use spout task count as 2 which means each worker is having 1 spout and it will be load balanced.

For better understanding parallelism in apache storm read this.

Full implementation of above code example is available here.

For debugging Kafka related issues (recovering offset lag) in Storm pipeline refer Kafka commands from here.

No comments:

Post a Comment