一.前期准备

开发环境:window7+eclipse+jdk1.7

部署环境:linux+zookeeper+kafka+hadoop+spark

二.概念理解

Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafka、Flume、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到HDFS、Databases和Dashboards等。实际上,你可以将流数据应用于Spark的机器学习和图形处理的算法上。

Spark Streaming内部工作原理,其接收实时输入数据流,同时将数据划分成批次,然后通过Spark引擎处理生成按照批次的结果流。

Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。

三.实例需求

通过Spark Streaming+kafka,实时统计订单的订单总数,所有订单价格数。

四.实例实现

4.1 订单实体order

package com.lm.sparkLearning.orderexmaple;
 
import java.io.Serializable;
 
/**
 * 简单订单
 * @author likuli
 *
 */
public class Order implements Serializable {
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    //订单商品名称
    private String name;
    //订单价格
     private Float price;
 
 
    public Order() {
        super();
    }
 
    public Order(String name, Float price) {
        super();
        this.name = name;
        this.price = price;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Float getPrice() {
        return price;
    }
    public void setPrice(Float price) {
        this.price = price;
    }
    @Override
    public String toString() {
        return "Order [name=" + name + ", price=" + price + "]";
    }
 
}

4.2 kafka订单生产者orderProducer

kafka生产者定时发送随机数量订单

package com.lm.sparkLearning.orderexmaple;
 
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import com.lm.sparkLearning.utils.ConstantUtils;
import com.lm.sparkLearning.utils.RandomUtils;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
/**
 * 订单 kafka消息生产者
 * 
 * @author likuli.com
 *
 */
public class OrderProducer {
    private static Logger logger = LoggerFactory.getLogger(OrderProducer.class);
 
    public static void main(String[] args) throws IOException {
        // set up the producer
        Producer<String, String> producer = null;
        ObjectMapper mapper = new ObjectMapper();
 
        try {
 
            Properties props = new Properties();
            // kafka集群
            props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE);
 
            // 配置value的序列化类
            props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);
            // 配置key的序列化类
            props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);
 
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<String, String>(config);
            // 定义发布消息体
            List<KeyedMessage<String, String>> messages = new ArrayList<>();
            // 每隔3秒生产随机个订单消息
            while (true) {
                int random = RandomUtils.getRandomNum(20);
                if (random == 0) {
                    continue;
                }
                messages.clear();
                for (int i = 0; i < random; i++) {
                    int orderRandom = RandomUtils.getRandomNum(random * 10);
                    Order order = new Order("name" + orderRandom, Float.valueOf("" + orderRandom));
                    // 订单消息体:topic和消息
                    KeyedMessage<String, String> message = new KeyedMessage<String, String>(
                            ConstantUtils.ORDER_TOPIC, mapper.writeValueAsString(order));
                    messages.add(message);
                }
 
                producer.send(messages);
                logger.warn("orderNum:" + random + ",message:" + messages.toString());
                Thread.sleep(10000);
 
            }
 
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("-------------:" + e.getStackTrace());
        } finally {
            producer.close();
        }
 
    }
}

4.3 Spark Streaming+kafka订单实时统计OrderSparkStreaming

package com.lm.sparkLearning.orderexmaple;
 
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
 
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AtomicDouble;
import com.lm.sparkLearning.utils.ConstantUtils;
import com.lm.sparkLearning.utils.SparkUtils;
 
import kafka.serializer.StringDecoder;
import scala.Tuple2;
 
/**
 * spark streaming统计订单量和订单总值
 * 
 * @author likuli.com
 *
 */
public class OrderSparkStreaming {
    private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class);
    private static AtomicLong orderCount = new AtomicLong(0);
    private static AtomicDouble totalPrice = new AtomicDouble(0);
 
    public static void main(String[] args) {
 
        // Create context with a 2 seconds batch interval
        JavaStreamingContext jssc = SparkUtils.getJavaStreamingContext("JavaDirectKafkaWordCount",
                "local[2]", null, Durations.seconds(20));
 
        Set<String> topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(",")));
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE);
        kafkaParams.put("auto.offset.reset", ConstantUtils.AUTO_OFFSET_RESET_VALUE);
 
        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> orderMsgStream = KafkaUtils.createDirectStream(jssc,
                String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
                topicsSet);
 
        // json与对象映射对象
        final ObjectMapper mapper = new ObjectMapper();
        JavaDStream<Order> orderDStream = orderMsgStream
                .map(new Function<Tuple2<String, String>, Order>() {
                    /**
                     * 
                     */
                    private static final long serialVersionUID = 1L;
 
                    @Override
                    public Order call(Tuple2<String, String> t2) throws Exception {
                        Order order = mapper.readValue(t2._2, Order.class);
                        return order;
                    }
                }).cache();
 
        // 对DStream中的每一个RDD进行操作
        orderDStream.foreachRDD(new VoidFunction<JavaRDD<Order>>() {
            /**
             * 
             */
            private static final long serialVersionUID = 1L;
 
            @Override
            public void call(JavaRDD<Order> orderJavaRDD) throws Exception {
                long count = orderJavaRDD.count();
                if (count > 0) {
                    // 累加订单总数
                    orderCount.addAndGet(count);
                    // 对RDD中的每一个订单,首先进行一次Map操作,产生一个包含了每笔订单的价格的新的RDD
                    // 然后对新的RDD进行一次Reduce操作,计算出这个RDD中所有订单的价格众合
                    Float sumPrice = orderJavaRDD.map(new Function<Order, Float>() {
                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;
 
                        @Override
                        public Float call(Order order) throws Exception {
                            return order.getPrice();
                        }
                    }).reduce(new Function2<Float, Float, Float>() {
                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;
 
                        @Override
                        public Float call(Float a, Float b) throws Exception {
                            return a + b;
                        }
                    });
                    // 然后把本次RDD中所有订单的价格总和累加到之前所有订单的价格总和中。
                    totalPrice.getAndAdd(sumPrice);
 
                    // 数据订单总数和价格总和,生产环境中可以写入数据库
                    logger.warn("-------Total order count : " + orderCount.get()
                            + " with total price : " + totalPrice.get());
                }
            }
        });
        orderDStream.print();
 
        jssc.start(); // Start the computation
        jssc.awaitTermination(); // Wait for the computation to terminate
    }
}

扫码关注李苦李公众号

李苦李公众号

标签: Spark, 大数据, Kafka

添加新评论