数据从kafka到hive_天地不仁以万物为刍狗的博客-CSDN博客_kafka到hive


本站和网页 https://blog.csdn.net/tianyeshiye/article/details/92803064 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

数据从kafka到hive_天地不仁以万物为刍狗的博客-CSDN博客_kafka到hive
数据从kafka到hive
天地不仁以万物为刍狗
于 2019-06-19 09:00:16 发布
10721
收藏
14
分类专栏:
gobblin
gobblin
专栏收录该内容
2 篇文章
0 订阅
订阅专栏
背景
公司的系统是一个对外提供服务的接口,每一次调用日志都需要保存到hive中,以便后期做数据分析。每天的调用量在亿级,日志数据量100G以上,在量级还没有这么大的时候,采取的办法比较原始:直接通过log4j打印到日志文件,然后通过抽数工具同步到hive中,每天凌晨同步前一天的数据。随着量级增大,日志文件越来越大,每天抽数就要抽好几个小时,而且偶尔还由于网络问题等原因失败。
方案
日志数据不能直接发送给hive,这样耦合度太强了。既然说到去耦合,肯定是采用消息管道了,kafka由于其与大数据结合的紧密程度,成为不二选择。所以初步方案是先将日志发送到kafka,再通过其他工具从kafka读到hive表中,在遇到峰值时,即便kafka挂了,也不会影响接口服务。  下一步就是如何将数据从kafka读到hive中,kafka的东家LinkedIn给出了解决方案:camus(https://github.com/linkedin/camus)和gobblin(https://github.com/linkedin/gobblin)。camus在2015年已经停止维护了,gobblin是后续产品,camus功能是是gobblin的一个子集,通过执行mapreduce任务实现从kafka读取数据到HDFS,而gobblin是一个通用的数据提取框架,可以将各种来源的数据同步到HDFS上,包括数据库、FTP、KAFKA等。因为只需要同步kafka数据,所以我们采用了实现相对简单的camus。在测试过程中,同步一个小时的数据(5G以上),大概需要2分钟左右,即便日志量翻10倍,也是可以接受的,当然,抽数时间也不会随数据量增大而线性增长。  只差最后一步了,camus只能把数据读到HDFS,从HDFS到hive是通过shell脚本实现的,shell脚本执行load命令直接将数据搬到hive中。
实施
下载camus代码后,直接用maven编译,生成的jar包在camus-example中。源码里面包含一个camus.properties的配置文件,这里说几个重要的配置项:
#数据目标路径,最终取到的数据在HDFS上的位置
etl.destination.path=/user/username/topics
#执行信息存放路径,最重要的是上次读取的kafka的offset
etl.execution.base.path=/user/username/exec
#消息解码类,camus读到的数据是byte[]格式的,可以在自定义类进行反序列化
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.SimpleMessageDecoder
#要读取的topic
kafka.whitelist.topics=
#时区,这个很重要,因为数据存放是按日期的
etl.default.timezone=Asia/Chongqing
将jar包和properties文件放在同一目录下,通过hadoop -jar camus-example-0.1.0-SNAPSHOT-shaded.jar -p camus.properties运行任务。  camus任务执行完毕后再通过脚本将数据load到hive中,脚本内容如下:
date_string=$(date '+%Y/%m/%d/%H') partion=$(date '+%Y-%m-%d_%H')
table_name= service_log_table
filePath="/user/username/topics/hourly/"$date_string"/"
hive<<EOF
create table if not exists $table_name (
mapJson STRING,
desc STRING,
str1 STRING,
str2 STRING,
str3 STRING)
PARTITIONED BY (dt STRING)
STORED AS TEXTFILE;
load data inpath '$filePath' overwrite into table $table_name partition (dt='$partion');
EOF
前面一篇讲到了将数据从kafka读到hdfs使用了开源工具camus,既然用到了开源的代码,免不了研究一下实现过程。所以这里分享一下阅读camus代码了解到的一些细节。
前置知识
在讲camus之前,需要提一下hadoop的一些知识。
关于inputFormat
inputFormat类的原型如下:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,JobConf job,Reporter reporter) throws IOException;
hadoop job调用setInputFormat来设置InputFormat类,通过getSplits函数,hadoop可以将输入分割为InputSplit,每个map分配到InputSplit后,再通过getRecordReader获取reader,逐条读取输入文件中Key-Value。hadoop本身提供了一些InputFormat类的实现,如TextInputFormat,KeyValueTextInputFormat,SequenceFileInputFormat等;  除了使用hadoop本身提供的这些类以外,hadoop允许自定义InputFormat类,只需要实现相应的getSplits函数和RecoderReader类即可。
关于OutputFormat 和 OutputCommitter
OutputFormat类用于写入记录,原型如下:  
public abstract class OutputFormat<K, V> {
//创建一个写入器
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException;
// 检查结果输出的存储空间是否有效
public abstract void checkOutputSpecs(JobContext context) throws IOException, InterruptedException;
//创建一个任务提交器
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;
getRecordWriter和checkOutputSpecs都很好理解,前置就是将Mapper或Reducer传来的key-value写入到文件里面,写入的姿势由它来决定。而checkOutputSpecs一般就是用来检查输出规范,判断输出文件是否存在,如果已经存在,则抛出异常。  OutputCommitter稍稍难理解一点,里面定义了这几个方法:
//Job开始被执行之前,框架会调用setupJob()为Job创建一个输出路径
void setupJob(JobContext jobContext);
//如果Job成功完成,框架会调用commitJob()提交Job的输出
void commitJob(JobContext jobContext);
//如果Job失败,框架会调用OutputCommitter.abortJob()撤销Job的输出
void abortJob(JobContext jobContext, JobStatus.State state);
//Task执行前的准备工作,类似setupJob
void setupTask(TaskAttemptContext taskContext);
//Task可能没有输出,也就不需要提交,通过needsTaskCommit()来判断
boolean needsTaskCommit(TaskAttemptContext taskContext);
//Task成功执行,提交输出
void commitTask(TaskAttemptContext taskContext);
//Task执行失败,清理
void abortTask(TaskAttemptContext taskContext);
一般是这样一个过程:Job执行的时候,Task的输出放到Output路径下的_temporary目录的以TaskAttemptID命名的子目录中。只有当 Task成功了,相应的输出才会被提交到Output路径下。而只有当整个Job都成功了,才会在Output路径下放置_SUCCESS文件。 _SUCCESS文件的存在表明了Output路径下的输出信息是正确且完整的;而如果_SUCCESS文件不存在,Output下的信息也依然是正确的 (这已经由commitTask保证了),但是不一定是完整的(可能只包含部分Reduce的输出)。
camus的实现
camus重写了inputFormat和outputFormat类。
重写getSplits
camus是以map-reduce的方式执行的,实际上,它没有设置reduce任务,map完成后,直接将文件写入到了目标目录下。  camus的主要工作都是在InputFormat和OutputFormat中完成的。前面说到过自定义InputFormat,camus正是通过自定义的EtlInputFormat类完成了读取任务的分割。  EtlInputFormat的getSplits函数需要将抽数任务分割为几个独立的splits,而kafka的partition之间的消息没有顺序关系,正好符合这种条件。分割流程如下:  1.获取topic下各个partition的leader信息,得到一个映射关系为LeaderInfo -> List 的map,也就是各个partition的leader连接信息,再加上leader所在的partition列表。关键核心代码如下:
//获取kafka集群TopicMetadata信息,这些信息包括各个partition的leaderId,replicas以及Isr,存在于每一个broker上,所以即便brokerList不完整或者有broker挂了也不会影响。
List<TopicMetadata> topicMetadataList = getKafkaMetadata(context, new ArrayList<String>());
//对于每一个topic下的每一个partition,执行操作
for (TopicMetadata topicMetadata : topicMetadataList) {
for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
//为每个partition的leader创建LeaderInfo对象
LeaderInfo leader =new LeaderInfo(new URI("tcp://" + partitionMetadata.leader().getConnectionString()),partitionMetadata.leader().id());
//下面这一段,就是将leader相同的partition放在一起
//最终得到Map<LeaderInfo,TopicAndPartition>
if (offsetRequestInfo.containsKey(leader)) {
ArrayList<TopicAndPartition> topicAndPartitions = offsetRequestInfo.get(leader);
topicAndPartitions.add(new TopicAndPartition(topicMetadata.topic(),partitionMetadata.partitionId()));
offsetRequestInfo.put(leader, topicAndPartitions);
} else {
//和上面一样,只是在offsetRequestInfo中没有leader时有个新建List的操作
.....
2.根据LeaderInfo -> List的映射关系,向每个partition的leader请求earliest offset和lastest offset。kafka的每个broker上的log会定期删除,earliest offset就是leader上保存的最早的log偏移量,而lastest offset则是最新log偏移量。在这之后,camus还会读取上次执行信息,其中包含上次任务每个partition读取完成的offset,作为本次任务读取的起始点。上次执行执行信息保存在配置项etl.execution.base.path对应的路径下。根据每个partition的earliest offset、lastest offset、offset组装成List,即请求列表。  CamusRequest是一个接口,主要信息如下:
//获取当前请求的topic
public abstract String getTopic();
//获取当前请求的URI
public abstract URI getURI();
//获取当前请求的partition
public abstract int getPartition();
//获取请求的起始offset
public abstract long getOffset();
public abstract long getEarliestOffset();
//获取partition的最大偏移量
public abstract long getLastOffset();
public abstract long estimateDataSize();
有了这些信息,就可以开始分派request到各个mapper了。  3.默认使用BaseAllocator分派request,其中的关键函数如下:
@Override
public List<InputSplit> allocateWork(List<CamusRequest> requests, JobContext context) throws IOException {
//获取配置项中mapred.map.tasks的map数目,一个mapper可以执行多个partition的抽数任务
int numTasks = context.getConfiguration().getInt("mapred.map.tasks", 30);
//按数据量由大到小给每个partition的request排序
reverseSortRequests(requests);
//初始化splists,这时候每个splits里面的request数目为0
List<InputSplit> kafkaETLSplits = new ArrayList<InputSplit>();
for (int i = 0; i < numTasks; i++) {
if (requests.size() > 0) {
kafkaETLSplits.add(new EtlSplit());
//分配request到各个splits,算法很简单,当前哪个splits的数据量最少,就给那个splits添加request,因为是按由大到小排序的,所以任务比较均匀
for (CamusRequest r : requests) {
getSmallestMultiSplit(kafkaETLSplits).addRequest(r);
return kafkaETLSplits;
allocateWork函数返回的就是List类型,也就是EtlInputFormat.getSplits的返回值。
从InputSplit中读取message
每个InputSplit中可能包含多个CamusRequest,每个CamusRequest是不同partition的读数请求,其中包含Topic,Partition,leader URI,offset,lastest offset等信息。前面说到过,InputFormat需要重写createRecordReader函数,也就是定义如何从split中读取key-value。  EtlInputFormat.createRecordReader返回一个EtlRecordReader实例,由该实例读取kafka message发送给mapper。  RecordReader需要重写函数boolean nextKeyValue(),在数据读完之前,该函数一直返回true。  EtlRecordReader.nextKeyValue 核心代码(有删减)如下:
while (true) {
//reader为KafkaReader类型,该类专门负责逐条读取kafka消息
if (reader == null || !reader.hasNext()) {
//从InputSplit中弹出request
EtlRequest request = (EtlRequest) split.popRequest();
//传给mapper的key,这里是类成员变量,后面直接由getCurrentKey()返回
key.set(request.getTopic(), request.getLeaderId(), request.getPartition(), request.getOffset(),request.getOffset(), 0);
//在reader为null,或者一个partition中的数据已经读完的情况下,新建reader,reader负责从kafka中读取数据
reader =new KafkaReader(inputFormat, context, request, CamusJob.getKafkaTimeoutValue(mapperContext),CamusJob.getKafkaBufferSize(mapperContext));
//创建消息decoder,由配置camus.message.decoder.class指定,从kafka中读取出的为byte[]类型,decoder类负责将字节转换为最终在hdfs上存放的格式
decoder = createDecoder(request.getTopic());
while ((message = reader.getNext(key)) != null) {
//用decoder解码消息,封装在CamusWrapper中
CamusWrapper wrapper = getWrappedRecord(message);
value = wrapper;
return true;
EtlRecordReader.getCurrentKey和EtlRecordReader.getCurrentValue直接返回nextKeyValue中的key,value。Mapper正是直接将这两个函数的返回值作为入参。至此,InputFormat的工作就做完了。Mapper收到key-value后什么也没做,直接调用context.write(key, val)写入,因为没有reducer,所以结果会直接由OutputFormat输出。
OutputFormat输出message
EtlMultiOutputFormat重写了OutputFormat的getRecordWriter和getOutputCommitter函数。getRecordWriter返回一个EtlMultiOutputRecordWriter类的实例,它负责将Task的输出放到Output路径下,而由getOutputCommitter获取的EtlMultiOutputCommitter类实例在Task完成后将输出拷贝到最终目的地。  EtlMultiOutputRecordWriter.write(EtlKey key, Object val)负责执行写入key-value工作,核心代码如下:
//key中包含的是关于kafka的信息,如当前消息的offset,topic,parition等
//val中是具体的message,也就是前面从kafka 消息的bytes[]中获取的
public void write(EtlKey key, Object val) throws IOException, InterruptedException {
//当前message的key是否和前面message的一致,因为一个split可能包含不同paritition的数据,也就是一个mapper就可能处理不同partition
if (!key.getTopic().equals(currentTopic)) {
//如果topic不一致,则需要清除writer,重新创建新的topic的,之所有有很多writer,是因为topic下会有多个partition,不同partition写入到不同的文件中
for (RecordWriter<IEtlKey, CamusWrapper> writer : dataWriters.values()) {
writer.close(context);
dataWriters.clear();
//更新当前topic
currentTopic = key.getTopic();
//将当前的key的offset存入到 (topic+partition)-> offset 的map中,因为同一partition消息是顺序的,所以最后存入的肯定是offset最大的消息,committer会将这个map写入到执行记录目录中,下一次运行时以这个offset为起点
committer.addCounts(key);
CamusWrapper value = (CamusWrapper) val;
//获取写入文件名
String workingFileName =EtlMultiOutputFormat.getWorkingFileName(context, key);
//如果现有的writer中不包含对应文件的,则新建一个
if (!dataWriters.containsKey(workingFileName)) {
dataWriters.put(workingFileName, getDataRecordWriter(context,workingFileName, value));
log.info("Writing to data file: " + workingFileName);
//将key-value写入到Taks的输出目录下临时文件中
dataWriters.get(workingFileName).write(key, value);
大部分的信息都在注释中写了,这个理解起来也简单,就是将key-value写到临时文件里,同时把每个key的offset按partition存放起来,在任务成功时写到文件里。  EtlMultiOutputCommitter只实现了commitTask函数,该函数在任务(不是整个Job)完成后被调用。前面已经将输入写到输出目录下的临时文件里了,commitTask要做的就是将文件拷贝到配置etl.destination.path对应的路径下,拷贝完后再将 write函数中写入的(topic+partition)-> offset 键值对写入到etl.execution.base.path目录下,下次任务执行时,会从这个目录下读取offset,作为起始offset。如果某个Task失败,offset也不会写入,下次读取时还是会重新读一遍,而其他Task不受影响。  还是贴一部分核心代码:  
//获取配置etl.destination.path对应的目录,作为最终输出目录
Path baseOutDir = EtlMultiOutputFormat.getDestinationPath(context);
//这里fs是前面write函数写入的目录,也就是Task的output路径
//列举其中的文件,数据文件的文件名都是以data开头的
for (FileStatus f : fs.listStatus(workPath)) {
String file = f.getPath().getName();
if (file.startsWith("data")) {
//数据文件名中包含topic,partition,offset,time等信息
//根据文件名,消息计数等信息获得最终的输出路径和输出文件名
//输出路径是以小时分割的,basePath/year/month/day/hour/filename
String partitionedFile =
getPartitionedPath(context, file, count.getEventCount(), count.getLastKey().getOffset());
//如果路径不存在,创建路径
Path dest = new Path(baseOutDir, partitionedFile);
if (!fs.exists(dest.getParent())) {
mkdirs(fs, dest.getParent());
//拷贝文件,从Task输出目录下拷贝到目标文件
commitFile(context, f.getPath(), dest);
//后面还有一段写入offset的,只有写入数据成功后才会写入offset
转:http://www.aboutyun.com/thread-20701-1-1.html
天地不仁以万物为刍狗
关注
关注
点赞
14
收藏
评论
数据从kafka到hive
背景公司的系统是一个对外提供服务的接口,每一次调用日志都需要保存到hive中,以便后期做数据分析。每天的调用量在亿级,日志数据量100G以上,在量级还没有这么大的时候,采取的办法比较原始:直接通过log4j打印到日志文件,然后通过抽数工具同步到hive中,每天凌晨同步前一天的数据。随着量级增大,日志文件越来越大,每天抽数就要抽好几个小时,而且偶尔还由于网络问题等原因失败。方案日志数据不...
复制链接
扫一扫
专栏目录
Flink1.11.0读取kafka数据动态写入hive中(更新-解决hive查询不到数据问题)
m0_37592814的博客
08-16
7752
一、主要流程
flink 1.11.0 hive 2.3.4 kafka 2.11 hadoop 2.7.2 scala 2.1.11
流批混合,读取kafka 数据量写入到hive中
二、主要步骤
1.在flink sql 客户端中创建hive 分区表
flink sql 客户端配置在上一篇文章https://blog.csdn.net/m0_37592814/article/details/108038823
建表语句
use wm;
CREATE TABLE ods_...
flink 消费Kafka写到hive
最新发布
wjj108的博客
11-12
542
flink消费Kafka写到hive,个人笔记
参与评论
您还未登录,请先
登录
后发表或查看评论
日志数据从kafka到hive是如何实现的
大数据技术杂谈
01-11
8387
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20701
背景
公司的系统是一个对外提供服务的接口,每一次调用日志都需要保存到hive中,以便后期做数据分析。每天的调用量在亿级,日志数据量100G以上,在量级还没有这么大的时候,采取的办法比较原始:直接通过log4j打印到日志文件,然后通过抽数工具同步到hive中,每天凌晨同步前一
flume接kafka数据入hive(亲测好用)
song_quan_的博客
12-17
947
0x01 需求背景
将Kafka中的JSON数据持久化存储到Hive表中,以供后期有查找的需求。
(看了很多讲解的博文,出了各种bug!饶了很多弯路!总结出来的经验就是一定要仔细看Flume的官方文档!!!!!!)
Kafka中的数据示例:
>{"id":1,"name":"snowty","age":25}
Hive表示例:
hive> desc hivetable;
OK
id int ...
spark把kafka数据写到hive
Sivan
06-05
1825
写入分区表:
准备工作:先建好分区表
方法一:(使用dataframe)
写数据到数据所在的位置,因为hive分区的本质就是分文件夹,先用spark把数据写到文件夹位置,然后执行sql添加分区
1.写数据到文件夹
//df为DataFrame
df.write.mode(SaveMode.Overwrite).format("parquet")
.partitionBy("day" , "dev_platform" ).save(outputPath)
2.寻找刚刚数据新建的.
2020-12-26
wubaoyu123的博客
12-26
218
博客园Logo
首页
新闻
博问
专区
闪存
班级
代码改变世界
搜索
注册
登录
返回主页 哥不是小萝莉
博客园 首页 新随笔 联系 订阅 管理随笔 - 145 文章 - 0 评论 - 425
Kafka数据每5分钟同步到Hive
1.概述
最近有同学留言咨询Kafka数据落地到Hive的一些问题,今天笔者将为大家来介绍一种除Flink流批一体以外的方式(流批一体下次再单独写一篇给大家分享)。
2.内容
首先,我们简单来描述一下数据场景,比如有这样一个数据场景,有一批实时流数据实时写入Kafka,然后
Flink读取Kafka数据写入Hive
weixin_40278610的博客
03-14
5837
本文针对数据库CDC(change data capture)场景设计,探讨基于Flink1.12最新版本提供的实时写入Hive的技术可行性,下面为本地IDEA程序案例可供参考。
kafka到hive的解决方案
执着ASP.NET2.0
04-10
983
对于那些想要把数据快速摄取到Hadoop中的企业来讲,Kafka是一个很好的选择。Kafka是什么?Kafka是一个分布式、可伸缩、可信赖的消息传递系统,利用发布-订阅模型来集成应用程序/数据流。同时,Kafka还是Hadoop技术堆栈中的关键组件,能够很好地支持实时数据分析或者货币化的物联网数据。
下面就图解Kafka是如何把数据流从RDBMS(关系数据库管理系统)导入Hive,同时借助一个实...
1.30.Flink SQL案例将Kafka数据写入hive
涂作权的博客
07-05
1203
1.30.Flink SQL案例将Kafka数据写入hive
1.30.1.1.场景,环境,配置准备
1.30.1.2.案例代码
1.30.1.2.1.编写pom.xml文件
1.30.1.2.2.Maven工程resources下编写配置文件log4j2.properties
1.30.1.2.3.Maven工程resources下编写配置文件logback.xml
1.30.1.2.4.Maven工程resources下编写配置文件project-config-test.properties
1.30.
sparkStreaming读取kafka写入hive分区表
W_Little_lion的博客
09-20
808
sparkStreaming读取kafka写入hive分区表
使用版本: hadoop-3.1.3,hive-3.1.2。
开始这个spark不是很熟悉,但是项目要用到,这就要临阵磨枪了。开始写入hive的时候一直在报一个错误,的不是很懂,就是说我没有 .enableHiveSupport() 我就很蒙我明明有用到,但是他就是说我没用到。
不多说了上代码。
def test:Unit={
//设置用户名
System.setProperty("HADOOP_USER_NAME", "root")
写配置信息
Hive理论知识汇总
zzll_forever的博客
10-28
788
Hive理论知识汇总
1、 Hive和数据库比较
Hive 和数据库除了拥有类似的查询语言,再无类似之处。
1)数据存储位置
Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
2)数据更新
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
3)执行延迟
Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
4)数据规模
Hive支持很大规模的数据计算
【SDC】StreamSets实战之路-22-实战篇- 如何使用StreamSets实时采集Kafka数据并写入Hive表
菜鸟叔叔的博客
02-21
446
本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Hive,StreamSets的流程处理如下:
flink1.13读kafka写入hive
qq_36062467的博客
07-29
2522
flink1.13读kafka写入hive
第一次写博客,flink自学了一下,网上查找案例,群里问大神,勉强把kafka写hive的流程走通了,在此记录一下,给后来者做个参考 ,其中有考虑不周或者错误的地方,请大家多多包涵和指正,嘻嘻。
代码
package cn.bywin
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api
FlinkSQL消费Kafka写入Hive表
qq_32068809的博客
10-13
900
flinkSQL消费kafka实时写入hive表
Flink实战之Kafka To Hive
weixin_41608066的博客
11-24
1529
背景
传统的入库任务一般借助于MapReduce或者Spark来写hive表,一般都是天级别最多小时级别的任务。随着实时性要求越来越高,传统的入库不太能满足需求。Flink完全基于流式处理,同时也支持了写Hive表。本文介绍一下如果通过FlinkSQL实现kafka数据入库hive,并能够实时可查。
Hive Catalog
由于写hive表必须基于hive catalog,所以需要注册hive catalog。同时可以在一个job内切换catalog,如果我们不想把kafka的source table注册
hive数据迁移到clickhouse+kafka数据写入clickhouse
张小小凡
12-16
3211
hive数据迁移到clickhouse
文章目录hive数据迁移到clickhouse1.使用命令导入2.使用waterdropkafka数据写入clickhouse
1.使用命令导入
#1.hive数据导出成csv格式
hive -e "select id,name,age from xxx" | tr "\t" "," > /otp/data/test.csv
#2.导入clickhouse
clickhouse-client -h hadoop102 --query='INSERT INTO
python 读取kafka 写hive_使用 spark 从 kafka 消费数据写入 hive 动态分区表(一)
weixin_39806413的博客
12-19
465
使用 spark 从 kafka 消费数据写入 hive 动态分区表最近因为业务需求,需要把 kafka 内的数据写入 hive 动态分区表,进入 kafka 的数据是保证不会重复的,同一条业务数据仅会进入 kafka 一次。这就保证数据到了 hive 基本不会发生 update 操作,可以对 hive 进行统计生成静态表的形式将统计数据写入 MySQL。咱也不说那么多废话了,开整。直接写入从 k...
数仓工具—Hive集成篇之Kafka(03)
06-12
7399
这个方案很多,随便举几个例子,但是在此之前建议你先阅读优化实战篇—UDAF批量调用外部请求(02) 在这篇文章中我们实现了在UDAF 中实现了多线程。不过今天我们介绍两种我们我们不需要引入其他组件就可以搞定的方案下面是我们的代码,主要是UDAF 进行批量数据发送
下面是我们的使用,这样我们就记录下了每一批数据的发送情况
这里需要处理的就是我们如何保证每个批次的大小,其实这个很简单,可以参考我们以前的文章,这里需要注意的是,我们一个批次不应该太大,如果太大重试成本就很高了。要实现KafkaStorageHan
数据从kafka到hive(1)
热门推荐
c395318621的专栏
09-07
1万+
数据从kafka到hive(1)背景公司的系统是一个对外提供服务的接口,每一次调用日志都需要保存到hive中,以便后期做数据分析。每天的调用量在亿级,日志数据量100G以上,在量级还没有这么大的时候,采取的办法比较原始:直接通过log4j打印到日志文件,然后通过抽数工具同步到hive中,每天凌晨同步前一天的数据。随着量级增大,日志文件越来越大,每天抽数就要抽好几个小时,而且偶尔还由于网络问题等原因失
Flume案例分享01(Kafka -> Hive)
thankstonica的博客
09-24
499
常用ETL组合,Flume数据采集,Kafka Source 和 Hive Sink
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:编程工作室
设计师:CSDN官方博客
返回首页
天地不仁以万物为刍狗
CSDN认证博客专家
CSDN认证企业博客
码龄18年
暂无认证
137
原创
3万+
周排名
60万+
总排名
61万+
访问
等级
6893
积分
171
粉丝
169
获赞
69
评论
810
收藏
私信
关注
热门文章
经验笔记 - java中常见的异常 java.lang.ClassCastException
66530
Redis - Redis 哈希槽的概念
30290
Kylin - 框架介绍
21998
数据中台与数据仓库的区别?
19392
什么是MPP数据库?
17705
分类专栏
Big Data
5篇
AWS大数据
4篇
阿里云大数据
8篇
java
1篇
数据治理
3篇
简单至极
1篇
linux 环境
2篇
环境
2篇
K8S
3篇
数仓
1篇
yml
1篇
docker
2篇
devops
1篇
Spark
68篇
BlockChain
2篇
Azure
1篇
tool
Apache Nifi
Java及Java相关源码分析
区块链
6篇
Go
3篇
java及相关源码分析
14篇
系统架构
14篇
Spring
6篇
概念
38篇
并发编程
8篇
环境构筑
16篇
编程思想
23篇
工具
3篇
Spark
88篇
Kafka
25篇
Redis
6篇
Hbase
18篇
linux
13篇
Book
ElasticSearch
4篇
Big Data 每日一题
104篇
分布式 与 高并发
4篇
spark 源码解读
2篇
AI 及 相关
1篇
通讯协议
Zookeeper
Nginx
1篇
微服务
2篇
原理
4篇
分布式解决方案
4篇
系统级总结
7篇
NIO
1篇
HDFS
4篇
Swift(OpenStack)
8篇
JVM
12篇
Log
2篇
java 面试
28篇
Openstack
3篇
Mark 问题
3篇
经验
31篇
待续
1篇
Hadoop
7篇
ambari
3篇
方法论
2篇
翻译
2篇
疑问
1篇
性能调优
6篇
工具类代码
3篇
hive
4篇
IBM DEV
1篇
重点
1篇
yarn
1篇
电影推荐
协议
1篇
Issue Mark
1篇
Calcite
3篇
5篇
SpringCloud
1篇
Kylin
2篇
大数据分析
2篇
时序数据库
33篇
思想
3篇
antrl
1篇
maven
3篇
Apache
2篇
PAAS
1篇
OpenTSDB
39篇
Ganglia
1篇
jmxtrans
1篇
Grafana
3篇
HUE
NiFi
2篇
gobblin
2篇
解决方案
7篇
Oozie
1篇
代实施
Flume
4篇
Gradle
1篇
经典
3篇
Structured
2篇
Apache Camel
1篇
Prometheus
1篇
git
1篇
数据中台
4篇
Azkaban
1篇
sqoop
4篇
slf4j
2篇
Phoenix
ETL
1篇
最新评论
OpenTSDB分布式集群安装
wawuwuwuwu:
那其实这个集群也是没有高可用的,如果写入的那台opentsdb服务挂了,那么其实整个集群就无法写入数据了。
java面试 : JAVA中 常量的储存位置
我不是秃神:
常量保存在常量池中,这点可以通过字节码可以看出。常量池在JVM加载后存到 运行时常量池,运行时常量池保存在方法区中。
而至于 不同JDK仅针对静态变量和 StringTable 做出了改变,至于运行时常量池并未做改变。
什么是MPP数据库?
Adataer:
第二代是MPP分析型数据库,包括Greenplum和Teradata等,仍然保持跟传统事务型数据库一样优秀的SQL兼容性,虽然MPP数据库的存储和计算没有完全分离,但凭这样的架构已经能扩展至上百个节点。MPP架构跟传统事务型数据库一样,对云的支持并不友好。第三代是SQL-on-Hadoop架构,代表产品包括SparkSQL和Cloudera,集群规模可以达到上千个节点。并且对云有一定的支持。但是跟传统的MPP相比,在性能和SQL兼容性上都不尽如人意。
推荐一个数据库 偶数科技的 OushuDB,它有很强的性能优势,领先的 SIMD 性能优化技术,相比MPP和SQL-on-Hadoop快一个数量级。全新设计的执行器让性能提升5~10倍,显著降低批处理和即席查询所需的时间。
经验 - javassist【动态改字节码】 javassist.CannotCompileException: by java.lang.LinkageError
初学者1191:
楼主解决这个问题了吗,我也碰到了
java面试 : JAVA中 常量的储存位置
coder2104:
字符串常量池和静态变量确实在堆中,运行时常量池在方法区,使用元空间实现
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
【华为云技术分享】快速理解spark-on-k8s中的external-shuffle-service
什么是MPP数据库?
2020-10-20
2021年2篇
2020年19篇
2019年295篇
2018年254篇
2017年9篇
目录
目录
分类专栏
Big Data
5篇
AWS大数据
4篇
阿里云大数据
8篇
java
1篇
数据治理
3篇
简单至极
1篇
linux 环境
2篇
环境
2篇
K8S
3篇
数仓
1篇
yml
1篇
docker
2篇
devops
1篇
Spark
68篇
BlockChain
2篇
Azure
1篇
tool
Apache Nifi
Java及Java相关源码分析
区块链
6篇
Go
3篇
java及相关源码分析
14篇
系统架构
14篇
Spring
6篇
概念
38篇
并发编程
8篇
环境构筑
16篇
编程思想
23篇
工具
3篇
Spark
88篇
Kafka
25篇
Redis
6篇
Hbase
18篇
linux
13篇
Book
ElasticSearch
4篇
Big Data 每日一题
104篇
分布式 与 高并发
4篇
spark 源码解读
2篇
AI 及 相关
1篇
通讯协议
Zookeeper
Nginx
1篇
微服务
2篇
原理
4篇
分布式解决方案
4篇
系统级总结
7篇
NIO
1篇
HDFS
4篇
Swift(OpenStack)
8篇
JVM
12篇
Log
2篇
java 面试
28篇
Openstack
3篇
Mark 问题
3篇
经验
31篇
待续
1篇
Hadoop
7篇
ambari
3篇
方法论
2篇
翻译
2篇
疑问
1篇
性能调优
6篇
工具类代码
3篇
hive
4篇
IBM DEV
1篇
重点
1篇
yarn
1篇
电影推荐
协议
1篇
Issue Mark
1篇
Calcite
3篇
5篇
SpringCloud
1篇
Kylin
2篇
大数据分析
2篇
时序数据库
33篇
思想
3篇
antrl
1篇
maven
3篇
Apache
2篇
PAAS
1篇
OpenTSDB
39篇
Ganglia
1篇
jmxtrans
1篇
Grafana
3篇
HUE
NiFi
2篇
gobblin
2篇
解决方案
7篇
Oozie
1篇
代实施
Flume
4篇
Gradle
1篇
经典
3篇
Structured
2篇
Apache Camel
1篇
Prometheus
1篇
git
1篇
数据中台
4篇
Azkaban
1篇
sqoop
4篇
slf4j
2篇
Phoenix
ETL
1篇
目录
评论
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值