使用Solrj管理Solr索引

Solrj是Solr搜索服务器的一个比较基础的客户端工具,可以非常方便地与Solr搜索服务器进行交互,最基本的功能就是管理Solr索引,包 括添加、更新、删除和查询等。对于一些比较基础的应用,用Solj基本够用,而且你可以非常容易地通过使用Solrj的API实现与Solr搜索服务器进 行交互,实现对Solr的基本管理功能。如果你的应用比较复杂,可以扩展Solrj来满足需要。

下面是一个使用Solrj的API实现与Solr服务器交互的工具类SolrPostServer,能够实现索引的添加、更新、删除和查询功能。SolrPostServer类中两个内部类是与访问MongoDB的配置和工具。

在实际应用中,对于是否进行commit,可以有两种方式:

  • 一种是直接在客户端进行计算,亦即,进行索引时计算添加的文档数,满足设置的值则进行手动commit,这种方式比较灵活,你可以根据搜索服务器的运行状况选择合理的commit文档数量;
  • 另一种是,直接在Solr搜索服务器上进行配置,一般来说,对索引进行大批量更新,一般不会选择在搜索服务器业务繁忙的时候进行,所以能够自动进行commit也便利了对索引的管理,更新文档可以完全可以实现自动化处理。
在Solr服务器端进行配置有关commit的功能,可以在requestHandler中进行配置,示例如下:
[html] view plaincopy
  1. <requestHandler name="/update" class="solr.XmlUpdateRequestHandler">
  2. <maxPendingDeletes>10000</maxPendingDeletes>
  3. <autoCommit>
  4. <maxDocs>20</maxDocs>
  5. <maxTime>86000</maxTime>
  6. </autoCommit>
  7. </requestHandler>

上 面autoCommit中的maxDocs指定的pending多少个文档后执行一次commit,而maxTime指定了多长时间间隔进行一次 commit,一般这两个选项只需要配置一个即可满足需要。另外,每次commit会将最近的更新生效,但是如果一次commit操作尚未完成,又达到了 下一次commit的时刻,这样做会严重影响索引的吞吐量。

在Solr 4.0将会实现一种基于“软自动提交”(soft auto commit)的功能,它会根据当前的系统上下文决定是否提交(简单的情况就是,确保每次commit完成,也就是最近的索引数据更新已经更新同步到磁盘上之后再自动执行下一次commit)。

实现代码如下所示:

[java] view plaincopy
  1. package org.shirdrn.solr.solrj;
  2. import java.io.IOException;
  3. import java.io.Serializable;
  4. import java.net.MalformedURLException;
  5. import java.util.ArrayList;
  6. import java.util.Collection;
  7. import java.util.HashMap;
  8. import java.util.Iterator;
  9. import java.util.List;
  10. import java.util.Map;
  11. import org.apache.commons.httpclient.HttpClient;
  12. import org.apache.log4j.Logger;
  13. import org.apache.lucene.document.Document;
  14. import org.apache.solr.client.solrj.ResponseParser;
  15. import org.apache.solr.client.solrj.SolrServerException;
  16. import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
  17. import org.apache.solr.client.solrj.impl.XMLResponseParser;
  18. import org.apache.solr.common.SolrDocument;
  19. import org.apache.solr.common.SolrDocumentList;
  20. import org.apache.solr.common.SolrInputDocument;
  21. import org.apache.solr.common.params.SolrParams;
  22. import com.mongodb.BasicDBObject;
  23. import com.mongodb.DBCollection;
  24. import com.mongodb.DBCursor;
  25. import com.mongodb.DBObject;
  26. import com.mongodb.Mongo;
  27. import com.mongodb.MongoException;
  28. /**
  29. * Solr server for indexes operations.
  30. *
  31. * @author shirdrn
  32. * @date   2011-12-20
  33. */
  34. public class SolrPostServer {
  35. private static final Logger LOG = Logger.getLogger(SolrPostServer.class);
  36. private CommonsHttpSolrServer server;
  37. private ResponseParser responseParser;
  38. private MongoConfig mongoConfig;
  39. private String[] collectionNames;
  40. private int maxCommitCount = 100;
  41. private boolean manualOptimize = true;
  42. private boolean manualCommit = false;
  43. private Collection<SolrInputDocument> docContainer = new ArrayList<SolrInputDocument>();
  44. private static int totalCount = 0;
  45. public SolrPostServer(String url, HttpClient httpClient, MongoConfig mongoConfig) {
  46. try {
  47. if(httpClient==null) {
  48. server = new CommonsHttpSolrServer(url);
  49. server.setSoTimeout(500000); // socket read timeout
  50. server.setConnectionTimeout(5000);
  51. server.setDefaultMaxConnectionsPerHost(10);
  52. server.setMaxTotalConnections(100);
  53. server.setAllowCompression(true);
  54. server.setMaxRetries(1); // defaults to 0.  > 1 not recommended.
  55. else {
  56. server = new CommonsHttpSolrServer(url, httpClient);
  57. }
  58. catch (MalformedURLException e) {
  59. e.printStackTrace();
  60. }
  61. this.mongoConfig = mongoConfig;
  62. initialize();
  63. }
  64. /**
  65. * Initialize the {@link CommonsHttpSolrServer}'s basic parameters.
  66. */
  67. private void initialize() {
  68. if(responseParser!=null) {
  69. server.setParser(responseParser);
  70. else {
  71. server.setParser(new XMLResponseParser());
  72. }
  73. }
  74. @SuppressWarnings("unchecked")
  75. public void postUpdate() {
  76. DBCursor cursor = null;
  77. try {
  78. for (String c : collectionNames) {
  79. LOG.info("MongoDB collection name: " + c);
  80. DBCollection collection = MongoHelper.newHelper(mongoConfig).getCollection(c);
  81. DBObject q = new BasicDBObject();
  82. cursor = collection.find(q);
  83. while(cursor.hasNext()) {
  84. try {
  85. Map<Object, Object> m = cursor.next().toMap();
  86. if(manualCommit) {
  87. add(m, true);
  88. else {
  89. add(m, false);
  90. }
  91. ++totalCount;
  92. LOG.info("Add fragment: _id = " + m.get("_id").toString());
  93. catch (IOException e) {
  94. e.printStackTrace();
  95. }
  96. }
  97. cursor.close();
  98. }
  99. LOG.info("Add totalCount: " + totalCount);
  100. finallyCommit();
  101. optimize(manualOptimize);
  102. catch (MongoException e) {
  103. e.printStackTrace();
  104. catch (SolrServerException e) {
  105. e.printStackTrace();
  106. catch (IOException e) {
  107. e.printStackTrace();
  108. }
  109. }
  110. /**
  111. * Detele lucene {@link Document} by IDs.
  112. * @param strings
  113. */
  114. public void deleteById(List<String> strings) {
  115. try {
  116. server.deleteById(strings);
  117. catch (SolrServerException e) {
  118. e.printStackTrace();
  119. catch (IOException e) {
  120. e.printStackTrace();
  121. }
  122. }
  123. /**
  124. * Detele lucene {@link Document} by query.
  125. * @param query
  126. */
  127. public void deleteByQuery(String query) {
  128. try {
  129. server.deleteByQuery(query);
  130. catch (SolrServerException e) {
  131. e.printStackTrace();
  132. catch (IOException e) {
  133. e.printStackTrace();
  134. }
  135. }
  136. /**
  137. * Query.
  138. * @param params
  139. * @param fields
  140. * @return
  141. */
  142. public List<Map<String, Object>> query(SolrParams params, String[] fields) {
  143. List<Map<String, Object>> results = new ArrayList<Map<String, Object>>();
  144. try {
  145. SolrDocumentList documents = server.query(params).getResults();
  146. Iterator<SolrDocument> iter = documents.iterator();
  147. while(iter.hasNext()) {
  148. SolrDocument doc = iter.next();
  149. Map<String, Object> map = new HashMap<String, Object>();
  150. for(String field : fields) {
  151. map.put(field, doc.getFieldValue(field));
  152. }
  153. results.add(map);
  154. }
  155. catch (SolrServerException e) {
  156. e.printStackTrace();
  157. }
  158. return results;
  159. }
  160. /**
  161. * When controlling the committing action at client side, finally execute committing.
  162. * @throws SolrServerException
  163. * @throws IOException
  164. */
  165. private void finallyCommit() throws SolrServerException, IOException {
  166. if(!docContainer.isEmpty()) {
  167. server.add(docContainer);
  168. commit(false, false);
  169. }
  170. }
  171. /**
  172. * Commit.
  173. * @param waitFlush
  174. * @param waitSearcher
  175. * @throws SolrServerException
  176. * @throws IOException
  177. */
  178. public void commit(boolean waitFlush, boolean waitSearcher) {
  179. try {
  180. server.commit(waitFlush, waitSearcher);
  181. catch (SolrServerException e) {
  182. e.printStackTrace();
  183. catch (IOException e) {
  184. e.printStackTrace();
  185. }
  186. }
  187. /**
  188. * When controlling the optimizing action at client side, finally execute optimizing.
  189. * @param waitFlush
  190. * @param waitSearcher
  191. * @throws SolrServerException
  192. * @throws IOException
  193. */
  194. public void optimize(boolean waitFlush, boolean waitSearcher) {
  195. try {
  196. server.optimize(waitFlush, waitSearcher);
  197. commit(waitFlush, waitSearcher);
  198. catch (Exception e) {
  199. LOG.error("Encounter error when optimizing.",  e);
  200. try {
  201. server.rollback();
  202. catch (SolrServerException e1) {
  203. e1.printStackTrace();
  204. catch (IOException e1) {
  205. e1.printStackTrace();
  206. }
  207. }
  208. }
  209. /**
  210. * Optimize.
  211. * @param optimize
  212. * @throws SolrServerException
  213. * @throws IOException
  214. */
  215. private void optimize(boolean optimize) {
  216. if(optimize) {
  217. optimize(true, true);
  218. }
  219. }
  220. /**
  221. * Add a {@link SolrInputDocument} or collect object and add to the a collection for batch updating
  222. * from a mongodb's recored, a Map object.
  223. * @param m
  224. * @param oneByOne
  225. * @throws SolrServerException
  226. * @throws IOException
  227. */
  228. private void add(Map<Object, Object> m, boolean oneByOne) throws SolrServerException, IOException {
  229. SolrInputDocument doc = createDocument(m);
  230. if(oneByOne) {
  231. server.add(doc);
  232. else {
  233. docContainer.add(doc);
  234. if(docContainer.size()>maxCommitCount) {
  235. server.add(docContainer);
  236. server.commit(false, false);
  237. docContainer = new ArrayList<SolrInputDocument>();
  238. }
  239. }
  240. }
  241. /**
  242. * Create a {@link SolrInputDocument} object.
  243. * @param record
  244. * @return
  245. */
  246. private SolrInputDocument createDocument(Map<Object, Object> record) {
  247. String id = record.get("_id").toString();
  248. String articleId = (String) record.get("articleId");
  249. String title = (String) record.get("title");
  250. String url = (String) record.get("url");
  251. String spiderName = (String) record.get("spiderName");
  252. String fragment = makeFragment((BasicDBObject) record.get("fragment"));
  253. String word = (String) record.get("word");
  254. int pictureCount = (Integer) record.get("pictureCount");
  255. int selectedCount = (Integer) record.get("selectedCount");
  256. int fragmentSize = (Integer) record.get("fragmentSize");
  257. SolrInputDocument doc = new SolrInputDocument();
  258. doc.addField("_id", id, 1.0f);
  259. doc.addField("articleId", articleId, 1.0f);
  260. doc.addField("title", title, 1.0f);
  261. doc.addField("url", url, 1.0f);
  262. doc.addField("spiderName", spiderName, 1.0f);
  263. doc.addField("fragment", fragment, 1.0f);
  264. doc.addField("word", word, 1.0f);
  265. // Additional processing for lucene payload metadata.
  266. doc.addField("pictureCount", word + "|" + pictureCount);
  267. doc.addField("coverage", word + "|" + (float)selectedCount/fragmentSize);
  268. return doc;
  269. }
  270. @SuppressWarnings("unchecked")
  271. private String makeFragment(BasicDBObject fragment) {
  272. StringBuilder builder = new StringBuilder();
  273. Iterator<Map.Entry<Integer, String>> iter = fragment.toMap().entrySet().iterator();
  274. while(iter.hasNext()) {
  275. Map.Entry<Integer, String> entry = iter.next();
  276. builder.append(entry.getValue().trim()).append("<br>");
  277. }
  278. return builder.toString();
  279. }
  280. /**
  281. * Set {@link ResponseParser}, default value is {@link XMLResponseParser}.
  282. * @param responseParser
  283. */
  284. public void setResponseParser(ResponseParser responseParser) {
  285. this.responseParser = responseParser;
  286. }
  287. /**
  288. * Pulling document resource from multiple collections of MongoDB.
  289. * @param collectionNames
  290. */
  291. public void setCollectionNames(String[] collectionNames) {
  292. this.collectionNames = collectionNames;
  293. }
  294. public void setMaxCommitCount(int maxCommitCount) {
  295. this.maxCommitCount = maxCommitCount;
  296. }
  297. public void setManualCommit(boolean manualCommit) {
  298. this.manualCommit = manualCommit;
  299. }
  300. public void setManualOptimize(boolean manualOptimize) {
  301. this.manualOptimize = manualOptimize;
  302. }
  303. /**
  304. * Mongo database configuration.
  305. *
  306. * @author shirdrn
  307. * @date   2011-12-20
  308. */
  309. public static class MongoConfig implements Serializable {
  310. private static final long serialVersionUID = -3028092758346115702L;
  311. private String host;
  312. private int port;
  313. private String dbname;
  314. private String collectionName;
  315. public MongoConfig(String host, int port, String dbname, String collectionName) {
  316. super();
  317. this.host = host;
  318. this.port = port;
  319. this.dbname = dbname;
  320. this.collectionName = collectionName;
  321. }
  322. @Override
  323. public boolean equals(Object obj) {
  324. MongoConfig other = (MongoConfig) obj;
  325. return host.equals(other.host) && port==other.port
  326. && dbname.equals(other.dbname) && collectionName.equals(other.collectionName);
  327. }
  328. }
  329. /**
  330. * Mongo database utility.
  331. *
  332. * @author shirdrn
  333. * @date   2011-12-20
  334. */
  335. static class MongoHelper {
  336. private static Mongo mongo;
  337. private static MongoHelper helper;
  338. private MongoConfig mongoConfig;
  339. private MongoHelper(MongoConfig mongoConfig) {
  340. super();
  341. this.mongoConfig = mongoConfig;
  342. }
  343. public synchronized static MongoHelper newHelper(MongoConfig mongoConfig) {
  344. try {
  345. if(helper==null) {
  346. helper = new MongoHelper(mongoConfig);
  347. mongo = new Mongo(mongoConfig.host, mongoConfig.port);
  348. Runtime.getRuntime().addShutdownHook(new Thread() {
  349. @Override
  350. public void run() {
  351. if(mongo!=null) {
  352. mongo.close();
  353. }
  354. }
  355. });
  356. }
  357. catch (Exception e) {
  358. e.printStackTrace();
  359. }
  360. return helper;
  361. }
  362. public DBCollection getCollection(String collectionName) {
  363. DBCollection c = null;
  364. try {
  365. c = mongo.getDB(mongoConfig.dbname).getCollection(collectionName);
  366. catch (Exception e) {
  367. e.printStackTrace();
  368. }
  369. return c;
  370. }
  371. }
  372. }

下面,我们可以通过写一个测试用例测试一下。

首先,我的Solr搜索服务器已经部署好并启动成功,对应的url为http://192.168.0.197:8080/server/fragment/。测试用例如下所示:

[java] view plaincopy
  1. package org.shirdrn.solr.solrj;
  2. import java.util.ArrayList;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import junit.framework.TestCase;
  7. import org.apache.solr.common.params.CommonParams;
  8. import org.apache.solr.common.params.SolrParams;
  9. import org.apache.solr.request.MapSolrParams;
  10. import org.shirdrn.solr.solrj.SolrPostServer.MongoConfig;
  11. @SuppressWarnings("deprecation")
  12. public class TestSolrPostServer extends TestCase {
  13. SolrPostServer myServer;
  14. MongoConfig config;
  15. String url;
  16. String[] collectionNames;
  17. @Override
  18. protected void setUp() throws Exception {
  19. url = "http://192.168.0.197:8080/server/fragment/";
  20. config = new MongoConfig("192.168.0.184", 27017, "fragment", "");
  21. myServer = new SolrPostServer(url, null, config);
  22. myServer.setMaxCommitCount(100);
  23. }
  24. @Override
  25. protected void tearDown() throws Exception {
  26. super.tearDown();
  27. }
  28. public void testPostUpdate() {
  29. collectionNames = new String[] {
  30. "sina",
  31. "lvping",
  32. "daodao",
  33. "go2eu",
  34. "mafengwo",
  35. "lotour",
  36. "17u",
  37. "sohu",
  38. "baseSe",
  39. "bytravel"
  40. };
  41. myServer.setCollectionNames(collectionNames);
  42. myServer.setManualCommit(true);
  43. myServer.setManualOptimize(false);
  44. myServer.postUpdate();
  45. }
  46. public void testPostDelete() {
  47. List<String> strings = new ArrayList<String>();
  48. strings.add("4ef051342c4117a38f63ee97");
  49. strings.add("4ef051322c4117a38f63ee36");
  50. strings.add("4ef051a42c4117a38f63fb51");
  51. strings.add("4ef050d92c4117a38f63dda4");
  52. strings.add("4ef051fe2c4117a38f640bc8");
  53. strings.add("4ef048ef2c4117a38f6207ce");
  54. strings.add("4ef049062c4117a38f620e13");
  55. strings.add("4ef046f12c4117a38f6185c0");
  56. myServer.deleteById(strings);
  57. myServer.commit(false, false);
  58. myServer.optimize(true, false);
  59. }
  60. @SuppressWarnings({ "rawtypes", "unchecked" })
  61. public void testQuery() {
  62. Map map = new HashMap();
  63. map.put(CommonParams.Q, "法国");
  64. map.put(CommonParams.START, "0");
  65. map.put(CommonParams.ROWS, "10");
  66. map.put(CommonParams.FQ, "word:卢浮宫");
  67. SolrParams params = new MapSolrParams(map);
  68. List<Map<String, Object>> results = myServer.query(params, new String[] {"_id", "title", "url"});
  69. assertEquals(10, results.size());
  70. }
  71. }

在实际开发的过程中,使用Solrj客户端可以非常容易为测试做一些基本操作,如创建索引,测试Solr基本参数及其开发定制Solr相关接口(Retrieval、Highlighting、Faceted Search、Clustering等等)。

发表评论