在SPARK中实现Receiver

  发布于 2023-01-04 11:42

我一直在尝试为SPARK 0.9实现接收器.我使用Jnetpcap库捕获了数据包,需要将它传递给Scala中的spark.用"def receive()"方法编写数据包的捕获部分是否足够?

编辑:以下是此链接中使用Jnetpcap库捕获数据包的代码:

import java.util.Date
import java.lang.StringBuilder
import org.jnetpcap.Pcap
import org.jnetpcap.packet.PcapPacket
import org.jnetpcap.packet.PcapPacketHandler

object PacketCapture1 {
  def main(args: Array[String]){
    val snaplen = 64 * 1024 // Capture all packets, no trucation
    val flags = Pcap.MODE_PROMISCUOUS // capture all packets
    val timeout = 10 * 1000
    //val errbuf = new StringBuilder()

    val jsb = new java.lang.StringBuilder()
    val errbuf = new StringBuilder(jsb);
    val pcap = Pcap.openLive("eth0", snaplen, flags, timeout, errbuf)
    if (pcap == null) {
      println("Error : " + errbuf.toString())
    }
    println(pcap)
    val jpacketHandler = new PcapPacketHandler[String]() {

      def nextPacket(packet: PcapPacket, user: String) {
        println("Received packet at %s caplen=%4d len=%4d %s\n", new Date(packet.getCaptureHeader.timestampInMillis()),
          packet.getCaptureHeader.caplen(), packet.getCaptureHeader.wirelen(), user)
      }
    }
    pcap.loop(30, jpacketHandler, "jNetPcap works!")
    pcap.close()

  }
}

如何使用此代码捕获的数据包实现spark接收器?

1 个回答
  • 您必须创建自定义NetworkReceiver(或Spark 1.0+中的Receiver)并实现该onStart()方法.对于Spark 0.9,请参阅http://spark.apache.org/docs/0.9.1/streaming-custom-receivers.html

    对于spark 1.0(强烈推荐),请参阅http://spark.apache.org/docs/latest/streaming-custom-receivers.html

    2023-01-04 11:44 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有